1 /**
2   Compression/decompression with iopipes.
3 
4 Copyright: Copyright Steven Schveighoffer 2017.
5 License:   Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy
6            at http://www.boost.org/LICENSE_1_0.txt)
7 Authors:   Steven Schveighoffer
8  */
9 module iopipe.zip;
10 import iopipe.traits;
11 import iopipe.buffer;
12 import etc.c.zlib;
13 
14 // separate these out, to avoid having to deal with unnecessary out of bounds
15 // checks
16 // note, zlib uses uint, so we need to deal with issues with larger
17 // than uint.max window length.
18 private @trusted void setInput(ref z_stream str, const(ubyte)[] win) @nogc nothrow pure
19 {
20     str.avail_in = win.length > uint.max ? uint.max : cast(uint)win.length;
21     str.next_in = win.ptr;
22 }
23 
24 private @trusted void setOutput(ref z_stream str, ubyte[] win) @nogc nothrow pure
25 {
26     str.avail_out = win.length > uint.max ? uint.max : cast(uint)win.length;
27     str.next_out = win.ptr;
28 }
29 
30 /**
31  * Enum for specifying the desired or expected compression format.
32  */
33 enum CompressionFormat
34 {
35     /// GZIP format
36     gzip,
37     /// Deflate (zip) format
38     deflate,
39     /// Auto-detect the format by reading the data (unzip only)
40     determineFromData
41 }
42 
43 private struct ZipSrc(Chain)
44 {
45     import iopipe.refc;
46     Chain chain;
47     // zstream cannot be moved once initialized, as it has internal pointers to itself.
48     RefCounted!(z_stream) zstream;
49     int flushMode;
50 
51     // convenience, because this is so long and painful!
52     private @property @system z_stream *zstrptr()
53     {
54         return &zstream._get();
55     }
56 
57     this(Chain c, CompressionFormat format)
58     {
59         chain = c;
60         zstream = z_stream().refCounted;
61         zstream.setInput(chain.window);
62         flushMode = Z_NO_FLUSH;
63         int windowbits = 15;
64         switch(format) with(CompressionFormat)
65         {
66         case gzip:
67             windowbits += 16;
68             break;
69         case deflate:
70         default:
71             // use 15
72             break;
73         }
74 
75         if((() @trusted => deflateInit2(zstrptr, Z_DEFAULT_COMPRESSION,
76                         Z_DEFLATED, windowbits, 8, Z_DEFAULT_STRATEGY))() != Z_OK)
77         {
78             throw new Exception("Error initializing zip deflation");
79         }
80 
81         // just in case inflateinit consumed some bytes.
82         chain.release(chain.window.length - zstream.avail_in);
83     }
84 
85     size_t read(ubyte[] target)
86     {
87         if(target.length == 0 || zstream.zalloc == null)
88             // no data requested, or stream is closed
89             return 0;
90 
91         // zlib works with 32-bit lengths ONLY, so truncate here to avoid math
92         // issues.
93         if(target.length > uint.max)
94             target = target[0 .. uint.max];
95         zstream.setOutput(target);
96 
97         while(zstream.avail_out == target.length) // while we haven't written anything yet
98         {
99             // ensure we have some data to zip
100             if(flushMode == Z_NO_FLUSH && chain.window.length == 0)
101             {
102                 if(chain.extend(0) == 0)
103                 {
104                     flushMode = Z_FINISH;
105                 }
106             }
107             zstream.setInput(chain.window);
108             auto deflate_result = (() @trusted => deflate(zstrptr, flushMode))();
109             chain.release(chain.window.length - zstream.avail_in);
110 
111             if(deflate_result == Z_OK)
112             {
113                 if(flushMode == Z_FINISH)
114                 {
115                     // zlib doesn't have enough data to make progress
116                     break;
117                 }
118             }
119             else if(deflate_result == Z_BUF_ERROR)
120             {
121                 // zlib needs more space to compress, or more data to read.
122                 if(flushMode != Z_FINISH && chain.extend(0) == 0)
123                 {
124                     flushMode = Z_FINISH;
125                 }
126                 // need more write space
127                 break;
128             }
129             else if(deflate_result == Z_STREAM_END)
130             {
131                 // finished with the stream
132                 auto result = target.length - zstream.avail_out;
133                 () @trusted {deflateEnd(zstrptr);}();
134                 zstream = z_stream.init;
135                 return result;
136             }
137             else
138             {
139                 import std.conv : to;
140                 throw new Exception("unhandled zip condition " ~ to!string(deflate_result));
141             }
142         }
143         return target.length - zstream.avail_out;
144     }
145 
146     mixin implementValve!chain;
147 }
148 
149 private struct UnzipSrc(Chain)
150 {
151     import iopipe.refc;
152     Chain chain;
153     // zstream cannot be moved once initialized, as it has internal pointers to itself.
154     RefCounted!(z_stream) zstream;
155     private CompressionFormat openedFormat;
156 
157     // convenience, because this is so long and painful!
158     private @property @system z_stream *zstrptr()
159     {
160         return &zstream._get();
161     }
162 
163     private void ensureMoreData(bool setupInput = false)
164     {
165         if(chain.window.length < 4096) // don't overallocate
166         {
167             cast(void)chain.extend(0);
168             setupInput = true;
169         }
170         if(setupInput)
171         {
172             zstream.setInput(chain.window);
173         }
174     }
175 
176     private void initializeStream()
177     {
178         int windowbits = 15;
179         switch(openedFormat) with(CompressionFormat)
180         {
181         case gzip:
182             windowbits += 16;
183             break;
184         case determineFromData:
185             windowbits += 32;
186             break;
187         case deflate:
188         default:
189             // use 15
190             break;
191         }
192         if((() @trusted => inflateInit2(zstrptr, windowbits))() != Z_OK)
193         {
194             throw new Exception("Error initializing zip inflation");
195         }
196 
197         // just in case inflateinit consumed some bytes.
198         chain.release(chain.window.length - zstream.avail_in);
199         ensureMoreData();
200     }
201 
202     this(Chain c, CompressionFormat format)
203     {
204         chain = c;
205         openedFormat = format;
206         zstream = z_stream().refCounted;
207         ensureMoreData(true);
208         initializeStream();
209     }
210 
211     size_t read(ubyte[] target)
212     {
213         if(target.length == 0)
214             // no data requested
215             return 0;
216 
217         if(zstream.zalloc == null)
218         {
219             // stream not opened. Try opening it if there is data available.
220             // This happens for concatenated streams.
221             ensureMoreData(true);
222             if(chain.window.length == 0)
223                 // no more data left
224                 return 0;
225             initializeStream();
226         }
227 
228 
229         // zlib works with 32-bit lengths ONLY, so truncate here to avoid math
230         // issues.
231         if(target.length > uint.max)
232             target = target[0 .. uint.max];
233         zstream.setOutput(target);
234 
235         // now, unzip the data into the buffer. Stop when we have done at most
236         // 2 extends on the input data.
237         foreach(i; 0 .. 2)
238         {
239             ensureMoreData();
240             auto inflate_result = (() @trusted => inflate(zstrptr, Z_NO_FLUSH))();
241             chain.release(chain.window.length - zstream.avail_in);
242             if(inflate_result == Z_STREAM_END)
243             {
244                 // all done?
245                 size_t result = target.length - zstream.avail_out;
246                 () @trusted {inflateEnd(zstrptr);}();
247                 zstream = z_stream.init;
248                 if(result == 0)
249                 {
250                     // for some reason we had an open stream, but Z_STREAM_END
251                     // happened without any more data coming out. In this case,
252                     // returning 0 would indicate the end of the stream, but
253                     // there may be more data if we try again (for concatenated
254                     // streams).
255                     return read(target);
256                 }
257                 return result;
258             }
259             else if(inflate_result == Z_OK)
260             {
261                 // no more space available
262                 if(zstream.avail_out == 0)
263                     break;
264             }
265             else
266             {
267                 // error or unsupported condition
268                 import std.conv;
269                 throw new Exception("unhandled unzip condition " ~ to!string(inflate_result));
270             }
271         }
272 
273         // return the number of bytes that were inflated
274         return target.length - zstream.avail_out;
275     }
276 
277     mixin implementValve!chain;
278 }
279 
280 /**
281  * Get a stream source that unzips an iopipe of ubytes. The source stream
282  * should be compressed in the appropriate format.
283  *
284  * This is the source that `unzip` uses to decompress.
285  *
286  * Params:
287  *     c = The input iopipe that provides the compressed data. The window type
288  *         MUST be implicitly convertable to an array of const ubytes.
289  *     format = The specified format of the data, leave the default to autodetect.
290  * Returns:
291  *     An input stream whose `read` method decompresses the input iopipe into
292  *     the given buffer.
293  */
294 auto unzipSrc(Chain)(Chain c, CompressionFormat format = CompressionFormat.determineFromData)
295     if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[]))
296 {
297     if(c.window.length == 0)
298         cast(void)c.extend(0);
299     return UnzipSrc!(Chain)(c, format);
300 }
301 
302 /**
303  * Get a stream source that compresses an iopipe of ubytes with the given format.
304  *
305  * This is the source that `zip` uses to compress data.
306  *
307  * Params:
308  *    c = The input iopipe that provides the data to compress. The window type
309  *        MUST be implicitly convertable to an array of const ubytes.
310  *    format = The specified format of the compressed data.
311  * Returns:
312  *    An input stream whose `read` method compresses the input iopipe data into
313  *    the given buffer.
314  */
315 auto zipSrc(Chain)(Chain c, CompressionFormat format = CompressionFormat.gzip) @safe
316     if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[]))
317 {
318     if(c.window.length == 0)
319         cast(void)c.extend(0);
320     return ZipSrc!(Chain)(c, format);
321 }
322 
323 /**
324  * Wrap an iopipe that contains compressed data into an iopipe containing the
325  * decompressed data. Data is not decompressed in place, so an extra buffer is
326  * created to hold it.
327  *
328  * Params:
329  *     Allocator = The allocator to use for buffering the data.
330  *     c = The input iopipe that provides the compressed data. The window type
331  *         MUST be implicitly convertable to an array of const ubytes.
332  *     format = The format of the input iopipe compressed data. Leave as
333  *     default to detect from the data itself.
334  * Returns:
335  *     An iopipe whose data is the decompressed ubyte version of the input stream.
336  */
337 auto unzip(Allocator = GCNoPointerAllocator, Chain)(Chain c, CompressionFormat format = CompressionFormat.determineFromData)
338     if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[]))
339 {
340     import iopipe.bufpipe: bufd;
341     return unzipSrc(c, format).bufd!(ubyte, Allocator);
342 }
343 
344 /**
345  * Wrap an iopipe of ubytes into an iopipe containing the compressed data from
346  * that input. Data is not compressed in place, so an extra buffer is created
347  * to hold it.
348  *
349  * Params:
350  *     Allocator = The allocator to use for buffering the data.
351  *     c = The input iopipe that provides the input data. The window type
352  *         MUST be implicitly convertable to an array of const ubytes.
353  *     format = The desired format of the compressed data. The default is gzip.
354  * Returns:
355  *     An iopipe whose data is the compressed ubyte version of the input stream.
356  */
357 auto zip(Allocator = GCNoPointerAllocator, Chain)(Chain c, CompressionFormat format = CompressionFormat.init)
358     if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[]))
359 {
360     import iopipe.bufpipe: bufd;
361     return zipSrc(c, format).bufd!(ubyte, Allocator);
362 }
363 
364 // I won't pretend to know what the zip format should look like, so just verify that
365 // we can do some kind of compression and return to the original.
366 @safe unittest
367 {
368     import std.range: cycle, take;
369     import std.array: array;
370     import std.string: representation;
371     import iopipe.bufpipe;
372 
373     auto realData = "hello, world!".representation.cycle.take(100_000).array;
374     // sanity check
375     assert(realData.length == 100_000);
376 
377     // zip the data
378     static struct ByteWriter
379     {
380         ubyte[] *result;
381         this(ref ubyte[] target) @trusted
382         {
383             result = &target;
384         }
385         size_t write(ubyte[] data)
386         {
387             (*result) ~= data;
388             return data.length;
389         }
390     }
391 
392     ubyte[] zipped;
393     realData.zip.outputPipe(ByteWriter(zipped)).process();
394 
395     // zipped contains the zipped data, make sure it's less (it should be,
396     // plenty of opportunity to compress!
397     assert(zipped.length < realData.length);
398 
399     ubyte[] unzipped;
400     zipped.unzip.outputPipe(ByteWriter(unzipped)).process();
401 
402     assert(unzipped == realData);
403 }