/*- * Copyright (c) 2010 Juergen Lock * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Simple threaded .xz compressor (using liblzma from XZ Utils) * Run with -h or --help for a small usage message. * * Example: * xzj -6 -j4 -s4m -g somefile somefile.xz * which is the same as: * xzj -6 --jobs 4 --chunksize 4m --debug somefile somefile.xz * * or (with bsdtar/gtar:) * tar -cvf somedir.tar.xz --use-compress-program xzj /some/dir * * Built using dmd v2.048 as: * dmd -w -g xzj.d lzma/lzma.d -L-llzma * * TODO: * * - Cleanup, turn into proper OO (classes, exceptions...) Probably * should make something like std.zlib for lzma instead of using * C bindings (lzma/lzma.d) directly. * * - Document/fix getopt quirks like needing -- before - or not * grokking spaces before args to single-letter options; also add * in-place file compression (output to .xz, unlink * input file on completion or output file on error.) * * - Autotune compression preset/chunksize according to available RAM, * the current defaults provide for good compression but can use * more than 250 MB resident with 4 threads... * * - Add threaded decompression for .xz files with multiple streams * like this program outputs. This will have to read the stream * headers like zx --list does which only works on seekable input * because it has to start at EOF (or maybe one could brute-force * search for trailer/header byte sequences in non-seekable input * too?) Otherwise it will have fall back to a single thread, * also if loading/decompressing a single stream in RAM needs too * much of it like when its a big file with only one stream (i.e. * not from this program), or when the decompressing box has much * less RAM than the one that did the compression. * * - Add pigz-like mode that keeps dictionary across streams to * get better compression with smaller stream (chunk) sizes? * (is this possible with liblzma?) * * - Add -v for ETA and prettier statistics than -g now does... * * - Port to Windows(?), test on non-FreeBSD/Linux. * * XZ Utils home page: * D 2.0 home page: */ import std.stdio; import std.stream; import std.concurrency; import std.getopt; import std.conv; import std.path; import core.stdc.signal; import core.stdc.stdlib; import core.stdc.stdio; import core.stdc.string; import core.sys.posix.unistd; import lzma.lzma; version(FreeBSD) { extern (C) int sysctlbyname(const(char) *, void *, size_t *, void *, size_t); } // Get number of cores int get_ncores() { version(FreeBSD) { int cpus; size_t cpus_size = cpus.sizeof; if (sysctlbyname("hw.ncpu", &cpus, &cpus_size, null, 0) != -1) return cpus; return 1; } else version(linux) { return cast(int) sysconf(_SC_NPROCESSORS_ONLN); } else version(Solaris) { return cast(int) sysconf(_SC_NPROCESSORS_ONLN); } else { // XXX port me! return 1; } } // use malloc() and slicing for output buffers (this is // to avoid GC to reduce memory use with larg(ish) files) bool MALLOC_SLICE = true; // Worker thread that runs on each cpu and compresses chunks sent to it void worker(Tid owner, int idxthread, int preset, size_t chunksize) { // XXX MALLOC_SLICE assumes compressed chunks are never larger // than this auto out_buf_size = chunksize + 0x1000; ubyte *out_buf_p = cast(ubyte *)malloc(out_buf_size); lzma_stream strm; lzma_options_lzma opt_lzma; lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_lzma_preset(&opt_lzma, preset); filters[0].id = LZMA_FILTER_LZMA2; filters[0].options = &opt_lzma; // Terminate the filter options array. filters[1].id = LZMA_VLI_UNKNOWN; lzma_check check = lzma_check.LZMA_CHECK_CRC64; if (!lzma_check_is_supported(check)) check = lzma_check.LZMA_CHECK_CRC32; if (false) { // XXX use this for autotuning writeln("lzma_raw_encoder_memusage = ", lzma_raw_encoder_memusage(filters.ptr)); } try while (42) { // Receive a chunk to compress auto msg = receiveOnly!(immutable(ubyte) *, size_t, ulong)(); auto msg_in_buf_p = msg.field[0]; auto msg_in_buf_size = msg.field[1]; auto msg_seq = msg.field[2]; ubyte[] send_buf; bool src_eof; lzma_action action = lzma_action.LZMA_RUN; strm.next_out = out_buf_p; strm.avail_out = out_buf_size; strm.next_in = msg_in_buf_p; strm.avail_in = msg_in_buf_size; auto ret = lzma_stream_encoder(&strm, filters.ptr, check); if (ret != lzma_ret.LZMA_OK) { if (false) { // XXX to get a real backtrace: raise(SIGABRT); } throw new Exception("lzma error " ~ to!(string)(ret)); } // Loop to compress one chunk, partly copied from xz(1) // source (including some comments) while (42) { if (strm.avail_in == 0 && !src_eof) { src_eof = true; action = lzma_action.LZMA_FINISH; } // Let liblzma do the actual work. ret = lzma_code(&strm, action); if (strm.avail_out == 0) { if (MALLOC_SLICE) { // XXX assume compressed chunks are // never larger than out_buf_size assert(0); } else { if (send_buf != null) { send_buf ~= out_buf_p[0 .. out_buf_size]; } else { send_buf = out_buf_p[0 .. out_buf_size].dup; } } strm.next_out = out_buf_p; strm.avail_out = out_buf_size; } if (ret != lzma_ret.LZMA_OK) { // Determine if the return value indicates that we // won't continue coding. const bool stop = ret != lzma_ret.LZMA_NO_CHECK && ret != lzma_ret.LZMA_UNSUPPORTED_CHECK; if (stop) { // Write the remaining bytes even if something // went wrong, because that way the user gets // as much data as possible, which can be good // when trying to get at least some useful // data out of damaged files. size_t send_size = out_buf_size - strm.avail_out; if (send_buf != null) { if (MALLOC_SLICE) // XXX assume compressed // chunks are never // larger than // out_buf_size assert(0); else send_buf ~= out_buf_p[0 .. send_size]; } else { if (MALLOC_SLICE) { ubyte *send_buf_p = cast(ubyte *)malloc(send_size); memcpy(send_buf_p, out_buf_p, send_size); send_buf = send_buf_p[0 .. send_size]; } else { send_buf = out_buf_p[0 .. send_size].dup; } } } if (ret == lzma_ret.LZMA_STREAM_END) { if (strm.avail_in == 0) { assert(src_eof); break; } // We hadn't reached the end of the file. ret = lzma_ret.LZMA_DATA_ERROR; assert(stop); } if (stop) break; } } free(cast(void *) msg_in_buf_p); // Send back compressed chunk owner.send(thisTid, idxthread, msg_seq, ret, cast(immutable(ubyte)[])send_buf, msg_in_buf_size); } catch (OwnerTerminated) { // We're done } free(out_buf_p); } void main(string[] args) { // --jobs int njobs = get_ncores(); // --extreme bool extreme; // --debug bool debuglogging; // -[0-9] int preset = 6; // --chunksize size_t chunksize = 0x400000; // my name auto argv0 = basename(args[0]); // getopt handlers void presethandler(string option) { preset = to!(int)(option); } void chunksizehandler(string option, string value) { size_t mult = 1024; if (value.length) switch (value[$ - 1]) { case 'm', 'M': mult = 1024 * 1024; // FALLTHRU case 'k', 'K': value = value[0 .. $ - 1]; break; default: } chunksize = mult * to!(size_t)(value); } void usage(string option) { std.stdio.stderr.writeln(argv0, ": Simple threaded .xz compressor (using liblzma from XZ Utils)\n\n", "Usage: ", argv0, " -[0-9] --jobs|j --chunksize|s --extreme|e --debug|g\n", "\t-[0-9] = compression preset (", preset & LZMA_PRESET_LEVEL_MASK, ") - higher = slower and uses more RAM,\n", "\t\tbut better compression\n", "\t-j = number of threads (", njobs, ")\n", "\t-s = size of uncompressed input chunks (", chunksize / 1024, "K), lower also reduces\n", "\t\tcompression (and RAM usage)\n" "\t-e = enable LZMA_PRESET_EXTREME (possibly even better compression at\n" "\t\tthe cost of more cpu)\n", "\t-g = log compressed chunks on stderr\n", "\t-h or --help = this message\n\n", "Can take input and output filenames in that order, or - for stdin/stdout\n", "for use in pipes (which is also default without filenames.)\n\n", "Decompression is not supported yet, for now use xz(1) -d or equivalent.\n\n", "Written by - this is my first (useful) hack trying out\n", "the D Programming language (dmd v2.048 at the time of writing) - I like it! :)\n\n", "XZ Utils home page: \n", "D 2.0 home page: "); exit(0); } getopt(args, "0", &presethandler, "1", &presethandler, "2", &presethandler, "3", &presethandler, "4", &presethandler, "5", &presethandler, "6", &presethandler, "6", &presethandler, "7", &presethandler, "8", &presethandler, "9", &presethandler, "jobs|j", &njobs, "chunksize|s", &chunksizehandler, "extreme|e", &extreme, "debug|g", &debuglogging, "help|h", &usage); if (extreme) preset |= LZMA_PRESET_EXTREME; if (njobs <= 0) { std.stdio.stderr.writeln(argv0, ": Invalid number of --jobs: ", njobs, "."); exit(1); } if (chunksize < 0x10000 || (chunksize & (chunksize - 1))) { std.stdio.stderr.writeln(argv0, ": --chunksize ", chunksize / 1024, "K too small or not a power of two."); exit(1); } int argn = 1; // XXX shouldn't getopt handle this? if (args.length > argn && args[argn] == "--") { ++argn; } bool fromstdin = args.length <= argn || args[argn] == "-"; bool tostdout = args.length <= argn + 1 || args[argn + 1] == "-"; if (tostdout && isatty(STDOUT_FILENO)) { // XXX Don't throw to avoid printing backtrace on Linux std.stdio.stderr.writeln(argv0, ": Will not write compressed data to a terminal -\n\trun with -h or --help for a small usage message."); exit(1); } // Use std.stream.File for input to avoid buffering (input is // already aligned...) auto f_in = fromstdin ? new std.stream.File(STDIN_FILENO, FileMode.In) : new std.stream.File(args[1], FileMode.In); // Use std.stdio for output, with buffering auto f_out = tostdout ? &std.stdio.stdout : new std.stdio.File(args[argn + 1], "w"); // sequence # of input chunk (stream) (counts from 0 while // xz --list counts from 1) ulong seq; // next sequence # of output chunk to be written ulong seqnextw; // Output queue immutable(ubyte)[][] outq; // total bytes in/out (for statistics) ulong tot_in, tot_out; // Receive one compressed chunk from a worker thread and write // to output in order int rcvbuf() { auto msg = receiveOnly!(Tid, int, ulong, lzma_ret, immutable(ubyte)[], size_t)(); auto msg_idxthread = msg.field[1]; auto msg_seq = msg.field[2]; auto msg_lzmaret = msg.field[3]; auto msg_rcvd_buf = msg.field[4]; auto msg_in_size = msg.field[5]; if (MALLOC_SLICE) // buffer will eventually be free()d msg.field[4].clear(); if (debuglogging) { // Print things like sequence numbers, buffer // sizes and (rounded) compression ratios tot_in += msg_in_size; tot_out += msg_rcvd_buf.length; std.stdio.stderr.writeln("thread ", msg_idxthread, " sent ", msg_rcvd_buf.length, " bytes for ", msg_in_size, " bytes (", (msg_rcvd_buf.length * 200 / msg_in_size + 1) / 2, "%, total ", (tot_out * 200 / tot_in + 1) / 2, "%) for stream #", msg_seq + 1); } if (msg_seq == seqnextw) { // This is the next chunk to be written void *msg_rcvd_buf_p = cast(void *)msg_rcvd_buf.ptr; f_out.rawWrite(msg_rcvd_buf); if (MALLOC_SLICE) { msg_rcvd_buf.clear(); free(msg_rcvd_buf_p); } assert(!outq.length || outq[0] == null); if (outq.length) outq = outq[1 .. $]; seqnextw++; } else { // Another chunk before this one is still missing, // queue this one up int relidx = cast(int)(msg_seq - seqnextw); while (relidx >= outq.length) outq ~= null; assert(outq[relidx] == null); outq[relidx] = msg_rcvd_buf; if (MALLOC_SLICE) msg_rcvd_buf.clear(); } // Check for error only here so there's a chance that // parts of the output are still written // XXX would need to queue this up too to be really useful, // worry about that later when we also support decompression // where input file corruption may actually be an issue if (msg_lzmaret != lzma_ret.LZMA_STREAM_END) { throw new Exception("lzma error " ~ to!(string)(msg_lzmaret)); } // See if more output is ready to be written while (outq.length && outq[0] != null) { void *outq0_buf_p = cast(void *)outq[0].ptr; f_out.rawWrite(outq[0]); if (MALLOC_SLICE) { outq[0].clear(); free(outq0_buf_p); } outq = outq[1 .. $]; seqnextw++; } return msg_idxthread; } auto workerthreads = new Tid[njobs]; int nchild; // Main loop, spawns worker threads, sends/receives chunks while (42) { // XXX again use malloc to avoid GC ubyte *one_buf_p = cast(ubyte *)malloc(chunksize); auto nbytes = f_in.readBlock(one_buf_p, chunksize); if (nbytes <= 0) // EOF (usually) break; if (nbytes < chunksize) { // Partial read, attempt to get a full chunk's worth while (nbytes < chunksize) { auto partialbytes = f_in.readBlock(one_buf_p + nbytes, chunksize - nbytes); if (partialbytes <= 0) // EOF (usually) break; nbytes += partialbytes; } } int idxidlethread; if (nchild >= njobs) { // All cores busy, wait for one to finish a chunk idxidlethread = rcvbuf(); } else { // Spawn another worker thread idxidlethread = nchild++; workerthreads[idxidlethread] = spawn(&worker, thisTid, idxidlethread, preset, chunksize); } // Send chunk to next idle worker workerthreads[idxidlethread].send(cast(immutable(ubyte) *)one_buf_p, nbytes, seq++); } // All chunks sent, receive the ones still in flight while (nchild > 0) { --nchild; rcvbuf(); } // Cleanup if (!fromstdin) f_in.close(); if (!tostdout) { f_out.close(); } else { f_out.flush(); } }