1 /** 2 Compression/decompression with iopipes. 3 4 Copyright: Copyright Steven Schveighoffer 2017. 5 License: Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy 6 at http://www.boost.org/LICENSE_1_0.txt) 7 Authors: Steven Schveighoffer 8 */ 9 module iopipe.zip; 10 import iopipe.traits; 11 import iopipe.buffer; 12 import etc.c.zlib; 13 14 // separate these out, to avoid having to deal with unnecessary out of bounds 15 // checks 16 // note, zlib uses uint, so we need to deal with issues with larger 17 // than uint.max window length. 18 private @trusted void setInput(ref z_stream str, const(ubyte)[] win) @nogc nothrow pure 19 { 20 str.avail_in = win.length > uint.max ? uint.max : cast(uint)win.length; 21 str.next_in = win.ptr; 22 } 23 24 private @trusted void setOutput(ref z_stream str, ubyte[] win) @nogc nothrow pure 25 { 26 str.avail_out = win.length > uint.max ? uint.max : cast(uint)win.length; 27 str.next_out = win.ptr; 28 } 29 30 /** 31 * Enum for specifying the desired or expected compression format. 32 */ 33 enum CompressionFormat 34 { 35 /// GZIP format 36 gzip, 37 /// Deflate (zip) format 38 deflate, 39 /// Auto-detect the format by reading the data (unzip only) 40 determineFromData 41 } 42 43 private struct ZipSrc(Chain) 44 { 45 import iopipe.refc; 46 Chain chain; 47 // zstream cannot be moved once initialized, as it has internal pointers to itself. 48 RefCounted!(z_stream) zstream; 49 int flushMode; 50 51 // convenience, because this is so long and painful! 52 private @property @system z_stream *zstrptr() 53 { 54 return &zstream._get(); 55 } 56 57 this(Chain c, CompressionFormat format) 58 { 59 chain = c; 60 zstream = z_stream().refCounted; 61 zstream.setInput(chain.window); 62 flushMode = Z_NO_FLUSH; 63 int windowbits = 15; 64 switch(format) with(CompressionFormat) 65 { 66 case gzip: 67 windowbits += 16; 68 break; 69 case deflate: 70 default: 71 // use 15 72 break; 73 } 74 75 if((() @trusted => deflateInit2(zstrptr, Z_DEFAULT_COMPRESSION, 76 Z_DEFLATED, windowbits, 8, Z_DEFAULT_STRATEGY))() != Z_OK) 77 { 78 throw new Exception("Error initializing zip deflation"); 79 } 80 81 // just in case inflateinit consumed some bytes. 82 chain.release(chain.window.length - zstream.avail_in); 83 } 84 85 size_t read(ubyte[] target) 86 { 87 if(target.length == 0 || zstream.zalloc == null) 88 // no data requested, or stream is closed 89 return 0; 90 91 // zlib works with 32-bit lengths ONLY, so truncate here to avoid math 92 // issues. 93 if(target.length > uint.max) 94 target = target[0 .. uint.max]; 95 zstream.setOutput(target); 96 97 while(zstream.avail_out == target.length) // while we haven't written anything yet 98 { 99 // ensure we have some data to zip 100 if(flushMode == Z_NO_FLUSH && chain.window.length == 0) 101 { 102 if(chain.extend(0) == 0) 103 { 104 flushMode = Z_FINISH; 105 } 106 } 107 zstream.setInput(chain.window); 108 auto deflate_result = (() @trusted => deflate(zstrptr, flushMode))(); 109 chain.release(chain.window.length - zstream.avail_in); 110 111 if(deflate_result == Z_OK) 112 { 113 if(flushMode == Z_FINISH) 114 { 115 // zlib doesn't have enough data to make progress 116 break; 117 } 118 } 119 else if(deflate_result == Z_BUF_ERROR) 120 { 121 // zlib needs more space to compress, or more data to read. 122 if(flushMode != Z_FINISH && chain.extend(0) == 0) 123 { 124 flushMode = Z_FINISH; 125 } 126 // need more write space 127 break; 128 } 129 else if(deflate_result == Z_STREAM_END) 130 { 131 // finished with the stream 132 auto result = target.length - zstream.avail_out; 133 () @trusted {deflateEnd(zstrptr);}(); 134 zstream = z_stream.init; 135 return result; 136 } 137 else 138 { 139 import std.conv : to; 140 throw new Exception("unhandled zip condition " ~ to!string(deflate_result)); 141 } 142 } 143 return target.length - zstream.avail_out; 144 } 145 146 mixin implementValve!chain; 147 } 148 149 private struct UnzipSrc(Chain) 150 { 151 import iopipe.refc; 152 Chain chain; 153 // zstream cannot be moved once initialized, as it has internal pointers to itself. 154 RefCounted!(z_stream) zstream; 155 private CompressionFormat openedFormat; 156 157 // convenience, because this is so long and painful! 158 private @property @system z_stream *zstrptr() 159 { 160 return &zstream._get(); 161 } 162 163 private void ensureMoreData(bool setupInput = false) 164 { 165 if(chain.window.length < 4096) // don't overallocate 166 { 167 cast(void)chain.extend(0); 168 setupInput = true; 169 } 170 if(setupInput) 171 { 172 zstream.setInput(chain.window); 173 } 174 } 175 176 private void initializeStream() 177 { 178 int windowbits = 15; 179 switch(openedFormat) with(CompressionFormat) 180 { 181 case gzip: 182 windowbits += 16; 183 break; 184 case determineFromData: 185 windowbits += 32; 186 break; 187 case deflate: 188 default: 189 // use 15 190 break; 191 } 192 if((() @trusted => inflateInit2(zstrptr, windowbits))() != Z_OK) 193 { 194 throw new Exception("Error initializing zip inflation"); 195 } 196 197 // just in case inflateinit consumed some bytes. 198 chain.release(chain.window.length - zstream.avail_in); 199 ensureMoreData(); 200 } 201 202 this(Chain c, CompressionFormat format) 203 { 204 chain = c; 205 openedFormat = format; 206 zstream = z_stream().refCounted; 207 ensureMoreData(true); 208 initializeStream(); 209 } 210 211 size_t read(ubyte[] target) 212 { 213 if(target.length == 0) 214 // no data requested 215 return 0; 216 217 if(zstream.zalloc == null) 218 { 219 // stream not opened. Try opening it if there is data available. 220 // This happens for concatenated streams. 221 ensureMoreData(true); 222 if(chain.window.length == 0) 223 // no more data left 224 return 0; 225 initializeStream(); 226 } 227 228 229 // zlib works with 32-bit lengths ONLY, so truncate here to avoid math 230 // issues. 231 if(target.length > uint.max) 232 target = target[0 .. uint.max]; 233 zstream.setOutput(target); 234 235 // now, unzip the data into the buffer. Stop when we have done at most 236 // 2 extends on the input data. 237 foreach(i; 0 .. 2) 238 { 239 ensureMoreData(); 240 auto inflate_result = (() @trusted => inflate(zstrptr, Z_NO_FLUSH))(); 241 chain.release(chain.window.length - zstream.avail_in); 242 if(inflate_result == Z_STREAM_END) 243 { 244 // all done? 245 size_t result = target.length - zstream.avail_out; 246 () @trusted {inflateEnd(zstrptr);}(); 247 zstream = z_stream.init; 248 if(result == 0) 249 { 250 // for some reason we had an open stream, but Z_STREAM_END 251 // happened without any more data coming out. In this case, 252 // returning 0 would indicate the end of the stream, but 253 // there may be more data if we try again (for concatenated 254 // streams). 255 return read(target); 256 } 257 return result; 258 } 259 else if(inflate_result == Z_OK) 260 { 261 // no more space available 262 if(zstream.avail_out == 0) 263 break; 264 } 265 else 266 { 267 // error or unsupported condition 268 import std.conv; 269 throw new Exception("unhandled unzip condition " ~ to!string(inflate_result)); 270 } 271 } 272 273 // return the number of bytes that were inflated 274 return target.length - zstream.avail_out; 275 } 276 277 mixin implementValve!chain; 278 } 279 280 /** 281 * Get a stream source that unzips an iopipe of ubytes. The source stream 282 * should be compressed in the appropriate format. 283 * 284 * This is the source that `unzip` uses to decompress. 285 * 286 * Params: 287 * c = The input iopipe that provides the compressed data. The window type 288 * MUST be implicitly convertable to an array of const ubytes. 289 * format = The specified format of the data, leave the default to autodetect. 290 * Returns: 291 * An input stream whose `read` method decompresses the input iopipe into 292 * the given buffer. 293 */ 294 auto unzipSrc(Chain)(Chain c, CompressionFormat format = CompressionFormat.determineFromData) 295 if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[])) 296 { 297 if(c.window.length == 0) 298 cast(void)c.extend(0); 299 return UnzipSrc!(Chain)(c, format); 300 } 301 302 /** 303 * Get a stream source that compresses an iopipe of ubytes with the given format. 304 * 305 * This is the source that `zip` uses to compress data. 306 * 307 * Params: 308 * c = The input iopipe that provides the data to compress. The window type 309 * MUST be implicitly convertable to an array of const ubytes. 310 * format = The specified format of the compressed data. 311 * Returns: 312 * An input stream whose `read` method compresses the input iopipe data into 313 * the given buffer. 314 */ 315 auto zipSrc(Chain)(Chain c, CompressionFormat format = CompressionFormat.gzip) @safe 316 if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[])) 317 { 318 if(c.window.length == 0) 319 cast(void)c.extend(0); 320 return ZipSrc!(Chain)(c, format); 321 } 322 323 /** 324 * Wrap an iopipe that contains compressed data into an iopipe containing the 325 * decompressed data. Data is not decompressed in place, so an extra buffer is 326 * created to hold it. 327 * 328 * Params: 329 * Allocator = The allocator to use for buffering the data. 330 * c = The input iopipe that provides the compressed data. The window type 331 * MUST be implicitly convertable to an array of const ubytes. 332 * format = The format of the input iopipe compressed data. Leave as 333 * default to detect from the data itself. 334 * Returns: 335 * An iopipe whose data is the decompressed ubyte version of the input stream. 336 */ 337 auto unzip(Allocator = GCNoPointerAllocator, Chain)(Chain c, CompressionFormat format = CompressionFormat.determineFromData) 338 if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[])) 339 { 340 import iopipe.bufpipe: bufd; 341 return unzipSrc(c, format).bufd!(ubyte, Allocator); 342 } 343 344 /** 345 * Wrap an iopipe of ubytes into an iopipe containing the compressed data from 346 * that input. Data is not compressed in place, so an extra buffer is created 347 * to hold it. 348 * 349 * Params: 350 * Allocator = The allocator to use for buffering the data. 351 * c = The input iopipe that provides the input data. The window type 352 * MUST be implicitly convertable to an array of const ubytes. 353 * format = The desired format of the compressed data. The default is gzip. 354 * Returns: 355 * An iopipe whose data is the compressed ubyte version of the input stream. 356 */ 357 auto zip(Allocator = GCNoPointerAllocator, Chain)(Chain c, CompressionFormat format = CompressionFormat.init) 358 if(isIopipe!(Chain) && is(WindowType!Chain : const(ubyte)[])) 359 { 360 import iopipe.bufpipe: bufd; 361 return zipSrc(c, format).bufd!(ubyte, Allocator); 362 } 363 364 // I won't pretend to know what the zip format should look like, so just verify that 365 // we can do some kind of compression and return to the original. 366 @safe unittest 367 { 368 import std.range: cycle, take; 369 import std.array: array; 370 import std.string: representation; 371 import iopipe.bufpipe; 372 373 auto realData = "hello, world!".representation.cycle.take(100_000).array; 374 // sanity check 375 assert(realData.length == 100_000); 376 377 // zip the data 378 static struct ByteWriter 379 { 380 ubyte[] *result; 381 this(ref ubyte[] target) @trusted 382 { 383 result = ⌖ 384 } 385 size_t write(ubyte[] data) 386 { 387 (*result) ~= data; 388 return data.length; 389 } 390 } 391 392 ubyte[] zipped; 393 realData.zip.outputPipe(ByteWriter(zipped)).process(); 394 395 // zipped contains the zipped data, make sure it's less (it should be, 396 // plenty of opportunity to compress! 397 assert(zipped.length < realData.length); 398 399 ubyte[] unzipped; 400 zipped.unzip.outputPipe(ByteWriter(unzipped)).process(); 401 402 assert(unzipped == realData); 403 }