1 /** 2 Core functionality for iopipe. Defines the base types for manipulating and 3 processing data. 4 Copyright: Copyright Steven Schveighoffer 2011-. 5 License: Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy 6 at http://www.boost.org/LICENSE_1_0.txt) 7 Authors: Steven Schveighoffer 8 */ 9 module iopipe.bufpipe; 10 import iopipe.buffer; 11 import iopipe.traits; 12 import std.traits : isDynamicArray, hasMember; 13 import std.range.primitives; 14 15 16 /** 17 * An example processor. This demonstrates the required items for implementing 18 * an iopipe. 19 * 20 * SimplePipe will only extend exactly the elements requested (from what is 21 * availble), so it can be used for testing with a static buffer to simulate 22 * data coming in at any rate. 23 */ 24 struct SimplePipe(Chain, size_t extendElementsDefault = 1) if(isIopipe!Chain) 25 { 26 /** 27 * The upstream data. This can be any iopipe. Throughout the library, the 28 * upstream data is generally saved as a member called "chain" as a matter 29 * of convention. This is not required or expected. 30 */ 31 Chain chain; 32 33 // how many elements are we looking at. 34 private size_t downstreamElements; 35 36 /** 37 * Build on top of an existing chain 38 */ 39 this(Chain c) 40 { 41 this.chain = c; 42 } 43 44 /** 45 * Get the current window of elements for the pipe. This is the data that 46 * can be used at this moment in time. 47 */ 48 auto window() { return chain.window[0..downstreamElements]; } 49 50 /** 51 * Get more data from the pipe. The parameter indicates the desired number 52 * of elements to add to the end of the window. If 0 is passed, then it is 53 * up to the implementation of the pipe to determine the optimal number of 54 * elements to add. 55 * 56 * Params: elements = Number of elements requested. 57 * Returns: The number of elements added. This can be less than or more 58 * than the parameter, but will only be 0 when no more elements 59 * can be added. This signifies EOF. 60 */ 61 size_t extend(size_t elements) 62 { 63 auto left = chain.window.length - downstreamElements; 64 if(elements == 0) 65 { 66 elements = extendElementsDefault; 67 // special case 68 if(elements <= left) 69 { 70 downstreamElements += elements; 71 return elements; 72 } 73 else 74 { 75 elements = chain.extend(0); 76 downstreamElements += left + elements; 77 return left + elements; 78 } 79 } 80 else if(elements <= left) 81 { 82 downstreamElements += elements; 83 return elements; 84 } 85 else 86 { 87 elements -= left; 88 left += chain.extend(elements); 89 downstreamElements += left; 90 return left; 91 } 92 } 93 94 /** 95 * Release the given number of elements from the front of the window. After 96 * calling this, make sure to update any tracking indexes for the window 97 * that you are maintaining. 98 * 99 * Params: elements = The number of elements to release. 100 */ 101 void release(size_t elements) 102 { 103 assert(elements <= downstreamElements); 104 downstreamElements -= elements; 105 chain.release(elements); 106 } 107 108 static if(hasValve!(Chain)) 109 { 110 /** 111 * Implement the required valve function. If the pipe you are wrapping 112 * has a valve, you must provide ref access to the valve. 113 * 114 * Note, the correct boilerplate implementation can be inserted by 115 * adding the following line to your pipe structure: 116 * 117 * ------ 118 * mixin implementValve!(nameOfUpstreamPipe); 119 * ------ 120 * 121 * Returns: A valve inlet that allows you to control flow of the data 122 * through this pipe. 123 * 124 * See Also: iopipe.valve 125 */ 126 ref valve() { return chain.valve; } 127 } 128 } 129 130 /// 131 @safe unittest 132 { 133 // any array is a valid iopipe source. 134 auto source = "hello, world!"; 135 136 auto pipe = SimplePipe!(string)(source); 137 138 // SimplePipe holds back data until you extend it. 139 assert(pipe.window.length == 0); 140 141 // Note: elements of narrow strings are code units for purposes of iopipe 142 // library. 143 assert(pipe.extend(5) == 5); 144 assert(pipe.window == "hello"); 145 146 // Release data to inform the pipe you are done with it 147 pipe.release(3); 148 assert(pipe.window == "lo"); 149 150 // you can request "some data" by extending with 0, letting the pipe define 151 // what is the best addition of data. This is useful for optimizing OS 152 // system call reads. 153 assert(pipe.extend(0) == 1); 154 assert(pipe.window == "lo,"); 155 156 // you aren't guaranteed to get all the data you ask for. 157 assert(pipe.extend(100) == 7); 158 assert(pipe.window == "lo, world!"); 159 160 pipe.release(pipe.window.length); 161 162 // this signifies EOF. 163 assert(pipe.extend(1) == 0); 164 } 165 166 @safe unittest 167 { 168 import std.range : iota, array; 169 import std.algorithm : equal; 170 auto buf = iota(100).array; 171 auto p = SimplePipe!(typeof(buf))(buf); 172 173 assert(p.window.length == 0); 174 assert(p.extend(50) == 50); 175 assert(p.window.length == 50); 176 assert(p.window.equal(iota(50))); 177 p.release(20); 178 assert(p.window.length == 30); 179 assert(p.window.equal(iota(20, 50))); 180 assert(p.extend(100) == 50); 181 assert(p.window.length == 80); 182 assert(p.window.equal(iota(20, 100))); 183 p.release(80); 184 assert(p.extend(1) == 0); 185 assert(p.window.length == 0); 186 } 187 188 private void swapBytes(R)(R data) if(typeof(R.init[0]).sizeof == 4 || typeof(R.init[0]).sizeof == 2) 189 { 190 enum width = typeof(R.init[0]).sizeof; 191 if(data.length == 0) 192 // no reason to byteswap no data 193 return; 194 static if(width == 2) 195 { 196 // TODO: this only works for arrays. 197 ushort[] sdata = cast(ushort[])data; 198 () @trusted { 199 assert(sdata.length > 0); 200 if((cast(size_t)sdata.ptr & 0x03) != 0) 201 { 202 // first element not 4-byte aligned, do that one by hand 203 *sdata.ptr = ((*sdata.ptr << 8) & 0xff00) | 204 ((*sdata.ptr >> 8) & 0x00ff); 205 sdata.popFront(); 206 } 207 208 // handle misaligned last element 209 if(sdata.length % 2 != 0) 210 { 211 const last = sdata.length - 1; 212 sdata.ptr[last] = ((sdata.ptr[last] << 8) & 0xff00) | 213 ((sdata.ptr[last] >> 8) & 0x00ff); 214 sdata.popBack(); 215 } 216 }(); 217 218 // rest of the data is 4-byte multiple and aligned 219 // TODO: see if this can be optimized further, or if there are 220 // better options for 64-bit. 221 uint[] idata = cast(uint[])sdata; 222 foreach(ref t; idata) 223 { 224 t = ((t << 8) & 0xff00ff00) | 225 ((t >> 8) & 0x00ff00ff); 226 } 227 } 228 else 229 { 230 import core.bitop : bswap; 231 // swap every 4 bytes 232 foreach(ref t; cast(uint[])data) 233 { 234 t = bswap(t); 235 } 236 } 237 } 238 239 @safe unittest 240 { 241 void doIt(T)(T[] t) @safe 242 { 243 import std.algorithm; 244 import std.range; 245 auto compareTo = t.map!(a => (a << ((T.sizeof-1) * 8))).array; 246 swapBytes(t); 247 assert(t == compareTo); 248 } 249 250 doIt(cast(ushort[])[1, 2, 3, 4, 5]); 251 doIt(cast(uint[])[6, 7, 8, 9, 10]); 252 } 253 254 // should be a voldemort type, but that results in template bloat 255 private struct ByteSwapProcessor(Chain) 256 { 257 Chain chain; 258 259 auto window() { return chain.window; } 260 size_t extend(size_t elements) 261 { 262 auto newData = chain.extend(elements); 263 auto data = chain.window; 264 swapBytes(data[$-newData..$]); 265 return newData; 266 } 267 268 void release(size_t elements) { chain.release(elements); } 269 270 mixin implementValve!(chain); 271 } 272 273 version(LittleEndian) 274 private enum IsLittleEndian = true; 275 else 276 private enum IsLittleEndian = false; 277 278 /** 279 * Swap the bytes of every element before handing to next processor. The 280 * littleEndian compile-time parameter indicates what endianness the data is 281 * in. If it matches the platform's endianness, then nothing is done (no byte 282 * swap occurs). Otherwise, a byte swap processor is returned wrapping the io 283 * pipe. 284 * 285 * Note, the width of the elements in the iopipe's window must be 2 or 4 bytes 286 * wide, and mutable. 287 * 288 * Params: littleEndian = true if the data arrives in little endian mode, false 289 * if in big endian mode. 290 * c = Source pipe chain for the byte swapper. 291 * Returns: If endianness of the source matches platform, this returns c, 292 * otherwise, it returns a byte swapping iopipe wrapper that performs 293 * the byte swaps. 294 */ 295 auto byteSwapper(bool littleEndian = !IsLittleEndian, Chain)(Chain c) if(isIopipe!(Chain) && is(typeof(swapBytes(c.window)))) 296 { 297 version(LittleEndian) 298 { 299 static if(littleEndian) 300 return c; 301 else 302 { 303 swapBytes(c.window); 304 return ByteSwapProcessor!Chain(c); 305 } 306 } 307 else 308 { 309 static if(littleEndian) 310 { 311 // need to byteswap existing data, since we only byteswap on extend 312 swapBytes(c.window); 313 return ByteSwapProcessor!Chain(c); 314 } 315 else 316 return c; 317 } 318 } 319 320 @safe unittest 321 { 322 import std.algorithm; 323 auto arr = [1, 2, 3, 4, 5]; 324 auto c = arr.dup.byteSwapper; 325 assert(c.window.equal(arr.map!(a => (a << 24)))); 326 } 327 328 private struct ArrayCastPipe(Chain, T) if(isIopipe!(Chain) && isDynamicArray!(WindowType!(Chain))) 329 { 330 Chain chain; 331 // upstream element type 332 alias UE = typeof(Chain.init.window()[0]); 333 334 static if(UE.sizeof > T.sizeof) 335 { 336 // needed to keep track of partially released elements. 337 ubyte offset; 338 enum Ratio = UE.sizeof / T.sizeof; 339 static assert(UE.sizeof % T.sizeof == 0); // only support multiples 340 } 341 else 342 { 343 enum Ratio = T.sizeof / UE.sizeof; 344 static assert(T.sizeof % UE.sizeof == 0); 345 } 346 347 auto window() @trusted 348 { 349 static if(UE.sizeof > T.sizeof) 350 { 351 // note, we avoid a cast of arrays because that invokes a runtime call 352 auto w = chain.window; 353 return (cast(T*)w.ptr)[offset .. w.length * Ratio]; 354 } 355 else static if(UE.sizeof == T.sizeof) 356 // ok to cast array here, because it's the same size (no runtime call) 357 return cast(T[])chain.window; 358 else 359 { 360 // note, we avoid a cast of arrays because that invokes a runtime call 361 auto w = chain.window; 362 return (cast(T*)w.ptr)[0 .. w.length / Ratio]; 363 } 364 } 365 366 size_t extend(size_t elements) 367 { 368 static if(UE.sizeof < T.sizeof) 369 { 370 // need a minimum number of items 371 auto win = chain.window; 372 immutable origLength = win.length / Ratio; 373 auto targetLength = win.length + Ratio - win.length % Ratio; 374 while(win.length < targetLength) 375 { 376 if(chain.extend(elements * Ratio) == 0) 377 { 378 return 0; 379 } 380 win = chain.window; 381 } 382 return window.length - origLength; 383 } 384 else 385 { 386 // need to round up to the next UE. 387 immutable translatedElements = (elements + Ratio - 1) / Ratio; 388 return chain.extend(translatedElements) * Ratio; 389 } 390 } 391 392 void release(size_t elements) 393 { 394 static if(UE.sizeof <= T.sizeof) 395 { 396 chain.release(elements * Ratio); 397 } 398 else 399 { 400 // may need to keep one of the upstream elements because we only 401 // released part of it 402 elements += offset; 403 offset = elements % Ratio; 404 chain.release(elements / Ratio); 405 } 406 } 407 408 mixin implementValve!(chain); 409 } 410 411 /** 412 * Given a pipe chain whose window is a straight array, create a pipe chain that 413 * converts the array to another array type. 414 * 415 * Note: This new pipe chain handles any alignment issues when partial 416 * elements have been extended/released. Also, the size of the new 417 * element type must be a multiple of, or divide evenly into, the 418 * original array. 419 * 420 * Params: T = Element type for new pipe chain window 421 * c = Source pipe chain to use for new chain. 422 * 423 * Returns: New pipe chain with new array type. 424 */ 425 auto arrayCastPipe(T, Chain)(Chain c) if(isIopipe!(Chain) && isDynamicArray!(WindowType!(Chain))) 426 { 427 static if(is(typeof(c.window[0]) == T)) 428 return c; 429 else 430 return ArrayCastPipe!(Chain, T)(c); 431 } 432 433 @safe unittest 434 { 435 // test going from int to ubyte 436 auto arr = [1, 2, 3, 4, 5]; 437 auto arr2 = cast(ubyte[])arr; 438 auto p = arr.arrayCastPipe!(ubyte); 439 assert(p.window == arr2); 440 p.release(3); 441 assert(p.window == arr2[3 .. $]); 442 // we can only release when all 4 bytes of the array are gone 443 assert(p.chain == arr); 444 p.release(1); 445 assert(p.chain == arr[1 .. $]); 446 447 // test going from ubyte to int, but shave off one byte 448 assert(arr2[0 .. $-1].arrayCastPipe!(int).window == arr[0 .. $-1]); 449 } 450 451 /** 452 * Extend a pipe until it has a minimum number of elements. If the minimum 453 * elements are already present, does nothing. 454 * 455 * This is useful if you need a certain number of elements in the pipe before 456 * you can process any more data. 457 * 458 * Params: chain = The pipe to work on. 459 * elems = The number of elements to ensure are in the window. If 460 * omitted, all elements are extended. 461 * Returns: The resulting number of elements in the window. This may be less 462 * than the requested elements if the pipe ran out of data. 463 */ 464 size_t ensureElems(Chain)(ref Chain chain, size_t elems = size_t.max) 465 { 466 while(chain.window.length < elems) 467 { 468 if(chain.extend(elems - chain.window.length) == 0) 469 break; 470 } 471 return chain.window.length; 472 } 473 474 @safe unittest 475 { 476 auto p = SimplePipe!(string)("hello, world"); 477 assert(p.ensureElems(5) == 5); 478 assert(p.window == "hello"); 479 assert(p.ensureElems(3) == 5); 480 assert(p.window == "hello"); 481 assert(p.ensureElems(100) == 12); 482 assert(p.window == "hello, world"); 483 } 484 485 // bug #11 486 @safe unittest 487 { 488 auto x = "hello, world".iosrc!((ref c, b) { 489 if(b.length > c.window.length) 490 b = b[0 .. c.window.length]; 491 b[] = c.window[0 .. b.length]; 492 c.release(b.length); 493 return b.length; }) 494 .bufd!(char); 495 auto elems = x.ensureElems(); 496 assert(elems == 12); 497 } 498 499 struct BufferedInputSource(BufferManager, Source, size_t optimalReadSize) 500 { 501 Source dev; 502 BufferManager buffer; 503 auto window() 504 { 505 return buffer.window; 506 } 507 508 void release(size_t elements) 509 { 510 buffer.releaseFront(elements); 511 } 512 513 size_t extend(size_t elements) 514 { 515 import std.algorithm.comparison : max, min; 516 auto oldLen = buffer.window.length; 517 518 if(elements == 0 || (elements < optimalReadSize && buffer.capacity == 0)) 519 { 520 // optimal read, or first read. Use optimal read size 521 elements = optimalReadSize; 522 } 523 else 524 { 525 // requesting a specific amount. Don't want to over-allocate the 526 // buffer, limit the request to 2x current elements, or optimal 527 // read size, whatever is larger. 528 immutable cap = max(optimalReadSize, oldLen * 2); 529 if(elements > cap) 530 elements = cap; 531 } 532 533 // ensure we maximize buffer use. 534 elements = max(elements, buffer.avail()); 535 536 if(buffer.extend(elements) == 0) 537 { 538 // could not extend; 539 return 0; 540 } 541 542 auto nread = dev.read(buffer.window[oldLen .. $]); 543 // give back data we did not read. 544 buffer.releaseBack(buffer.window.length - oldLen - nread); 545 return nread; 546 } 547 548 // need to forward valves if we are going through rebuffered data. 549 static if(hasValve!Source) 550 mixin implementValve!dev; 551 } 552 553 /** 554 * Create a buffer to manage the data from the given source, and wrap into an iopipe. 555 * 556 * Params: T = The type of element to allocate with the allocator 557 * Allocator = The allocator to use for managing the buffer 558 * Source = The type of the input stream. This must have a function 559 * `read` that can read into the buffer's window. 560 * dev = The input stream to use. If not specified, then a NullDev source is assumed. 561 * args = Arguments passed to the allocator (for allocators that need initialization) 562 * 563 * Returns: An iopipe that uses the given buffer to read data from the given device source. 564 * The version which takes no parameter uses a NullDev as a source. 565 */ 566 auto bufd(T=ubyte, Allocator = GCNoPointerAllocator, size_t optimalReadSize = 8 * 1024 / T.sizeof, Source, Args...)(Source dev, Args args) 567 if(hasMember!(Source, "read") && is(typeof(dev.read(T[].init)) == size_t)) 568 { 569 alias BM = AllocatedBuffer!(T, Allocator, optimalReadSize); 570 static if(Args.length > 0) 571 return BufferedInputSource!(BM, Source, optimalReadSize)(dev, BM(Allocator(args))); 572 else 573 return BufferedInputSource!(BM, Source, optimalReadSize)(dev); 574 } 575 576 /// ditto 577 auto bufd(T=ubyte, Allocator = GCNoPointerAllocator, size_t optimalReadSize = (T.sizeof > 4 ? 8 : 32 / T.sizeof), Args...)(Args args) 578 { 579 import iopipe.stream: nullDev; 580 return nullDev.bufd!(T, Allocator, optimalReadSize)(args); 581 } 582 583 /** 584 * Create a ring buffer to manage the data from the given source, and wrap into an iopipe. 585 * 586 * The iopipe RingBuffer type uses virtual memory mapping to have the same 587 * segment of data mapped to consecutive addresses. This allows true zero-copy 588 * usage. However, it does require use of resources that may possibly be 589 * limited, so you may want to justify that it's needed before using instead of 590 * bufd. 591 * 592 * Note also that a RingBuffer is not copyable (its destructor will unmap the 593 * memory), so this must use RefCounted to properly work. 594 * 595 * Params: T = The type of element to allocate with the allocator 596 * Source = The type of the input stream. This must have a function 597 * `read` that can read into the buffer's window. 598 * dev = The input stream to use. If not specified, then a NullDev source is assumed. 599 * 600 * Returns: An iopipe that uses a RingBuffer to read data from the given device source. 601 */ 602 auto rbufd(T=ubyte, size_t optimalReadSize = 8 * 1024 / T.sizeof, Source)(Source dev) 603 if(hasMember!(Source, "read") && is(typeof(dev.read(T[].init)) == size_t)) 604 { 605 // need to refcount the ring buffer, since it's not copyable 606 import std.typecons : refCounted; 607 auto buffer = refCounted(RingBuffer!T()); 608 return BufferedInputSource!(typeof(buffer), Source, optimalReadSize)(dev, buffer); 609 } 610 611 /// Ditto 612 auto rbufd(T=ubyte, size_t optimalReadSize = 8 * 1024 / T.sizeof)() 613 { 614 import iopipe.stream: nullDev; 615 return nullDev.rbufd!(T, optimalReadSize)(); 616 } 617 618 // allocate using a region buffer (convenience) 619 auto lbufd(T=ubyte, size_t optimalReadSize = 128, Source)(Source dev, ubyte[] buf) 620 { 621 import std.experimental.allocator.building_blocks.region; 622 // for now, we only support buffers at least large enough to hold the 623 // initial read. 624 assert(buf.length >= optimalReadSize); 625 return bufd!(T, Region!(), optimalReadSize)(dev, buf); 626 } 627 628 // allocate using a region buffer (convenience) 629 auto lbufd(T=ubyte, size_t optimalReadSize = 128)(ubyte[] buf) 630 { 631 import iopipe.stream : nullDev; 632 return lbufd!(T, optimalReadSize)(nullDev, buf); 633 } 634 635 @safe unittest 636 { 637 // simple struct that "reads" data from a pre-defined string array into a char buffer. 638 static struct ArrayReader 639 { 640 string _src; 641 size_t read(char[] data) 642 { 643 auto ntoread = data.length; 644 if(ntoread > _src.length) 645 ntoread = _src.length; 646 data[0 .. ntoread] = _src[0 .. ntoread]; 647 _src = _src[ntoread .. $]; 648 return ntoread; 649 } 650 } 651 652 void test(P)(P p) 653 { 654 assert(p.window.length == 0); 655 assert(p.extend(0) == 13); 656 assert(p.window == "hello, world!"); 657 assert(p.extend(0) == 0); 658 } 659 test(ArrayReader("hello, world!").bufd!char); 660 // ring buffers are @system 661 () @trusted { test(ArrayReader("hello, world!").rbufd!char); }(); 662 } 663 664 private struct OutputPipe(Chain, Sink) 665 { 666 Sink dev; 667 Chain chain; 668 669 auto window() 670 { 671 return chain.window(); 672 } 673 674 size_t extend(size_t elements) 675 { 676 // get new elements, and then write them to the file 677 auto newData = chain.extend(elements); 678 ensureWritten(newData); 679 return newData; 680 } 681 682 void release(size_t elements) 683 { 684 // just upstream this 685 chain.release(elements); 686 } 687 688 size_t flush(size_t elements) 689 { 690 // extend and then release all data 691 extend(elements); 692 auto result = window.length; 693 release(window.length); 694 return result; 695 } 696 697 private void ensureWritten(size_t dataToWrite) 698 { 699 while(dataToWrite) 700 { 701 auto nwritten = dev.write(chain.window[$-dataToWrite .. $]); 702 dataToWrite -= nwritten; 703 } 704 } 705 706 mixin implementValve!(chain); 707 } 708 709 /** 710 * An output pipe writes all its data to a given sink stream. Any data in the 711 * output pipe's window has been written to the stream. 712 * 713 * The returned iopipe has a function "flush" that will extend a chunk of data 714 * and then release it immediately. 715 * 716 * Params: c = The input data to write to the stream. 717 * dev = The output stream to write data to. This must have a function 718 * `write` that can write a c.window. 719 * 720 * Returns: An iopipe that gives a view of the written data. Note that you 721 * don't have to do anything with the data. 722 * 723 */ 724 auto outputPipe(Chain, Sink)(Chain c, Sink dev) if(isIopipe!Chain && is(typeof(dev.write(c.window)) == size_t)) 725 { 726 auto result = OutputPipe!(Chain, Sink)(dev, c); 727 result.ensureWritten(result.window.length); 728 return result; 729 } 730 731 @safe unittest 732 { 733 // shim that simply verifies the data is correct 734 static struct OutputStream 735 { 736 string verifyAgainst; 737 size_t write(const(char)[] data) 738 { 739 assert(data.length <= verifyAgainst.length && data == verifyAgainst[0 .. data.length], verifyAgainst ~ " != " ~ data); 740 verifyAgainst = verifyAgainst[data.length .. $]; 741 return data.length; 742 } 743 } 744 745 auto pipe = "hello, world!".SimplePipe!(string, 5).outputPipe(OutputStream("hello, world!")); 746 do 747 { 748 pipe.release(pipe.window.length); 749 } while(pipe.extend(0) != 0); 750 } 751 752 /** 753 * Process a given iopipe chain until it has reached EOF. This is accomplished 754 * by extending and releasing continuously until extend returns 0. 755 * 756 * Params: c = The iopipe to process 757 * Returns: The number of elements processed. 758 */ 759 size_t process(Chain)(auto ref Chain c) 760 { 761 size_t result = 0; 762 do 763 { 764 auto elementsInChain = c.window.length; 765 result += elementsInChain; 766 c.release(elementsInChain); 767 } while(c.extend(0) != 0); 768 769 return result; 770 } 771 772 @safe unittest 773 { 774 assert("hello, world!".SimplePipe!(string, 5).process() == 13); 775 } 776 777 private struct IoPipeRange(Chain) 778 { 779 Chain chain; 780 private size_t extendRequestSize; 781 bool empty() { return chain.window.length == 0; } 782 auto front() { return chain.window; } 783 void popFront() 784 { 785 chain.release(chain.window.length); 786 chain.extend(extendRequestSize); 787 } 788 } 789 790 /** 791 * Convert an io pipe into a range, with each popFront releasing all the 792 * current data and extending a specified amount. 793 * 794 * Note that the function may call extend once before returning, depending on 795 * whether there is any data present or not. 796 * 797 * Params: extendRequestSize = The value to pass to c.extend when calling popFront 798 * c = The chain to use as backing for this range. 799 */ 800 auto asInputRange(size_t extendRequestSize = 0, Chain)(Chain c) if (isIopipe!Chain) 801 { 802 if(c.window.length == 0) 803 // attempt to prime the range, since empty will be true right away! 804 c.extend(extendRequestSize); 805 return IoPipeRange!Chain(c, extendRequestSize); 806 } 807 808 @safe unittest 809 { 810 auto str = "abcdefghijklmnopqrstuvwxyz"; 811 foreach(elem; str.SimplePipe!(string, 5).asInputRange) 812 { 813 assert(elem == str[0 .. elem.length]); 814 str = str[elem.length .. $]; 815 } 816 assert(str.length == 0); 817 } 818 819 private struct IoPipeElemRange(Chain) 820 { 821 Chain chain; 822 private size_t extendRequestSize; 823 bool empty() { return chain.window.length == 0; } 824 auto front() { return chain.window[0]; } 825 void popFront() 826 { 827 chain.release(1); 828 if(chain.window.length == 0) 829 chain.extend(extendRequestSize); 830 } 831 } 832 833 /** 834 * Convert an io pipe into a range of elements of the pipe. This effectively 835 * converts an iopipe range of T into a range of T. Note that auto-decoding 836 * does NOT happen still, so converting a string into an input range produces a 837 * range of char. The range is extended when no more data is in the window. 838 * 839 * Note that the function may call extend once before returning, depending on 840 * whether there is any data present or not. 841 * 842 * Params: extendRequestSize = The value to pass to c.extend when calling in 843 * popFront 844 * c = The chain to use as backing for this range. 845 */ 846 auto asElemRange(size_t extendRequestSize = 0, Chain)(Chain c) if (isIopipe!Chain) 847 { 848 if(c.window.length == 0) 849 c.extend(extendRequestSize); 850 return IoPipeElemRange!Chain(c, extendRequestSize); 851 } 852 853 @safe unittest { 854 auto str = "abcdefghijklmnopqrstuvwxyz"; 855 import std.algorithm : equal; 856 import std.utf : byCodeUnit; 857 assert(equal(str.byCodeUnit, SimplePipe!(string, 5)(str).asElemRange)); 858 } 859 860 /** 861 * Create an input source from a given Chain, and a given translation function/template. 862 * 863 * It is advisable to use a template or lambda that does not require a closure, 864 * and is not a delegate from a struct that might move. 865 * 866 * The result is also alias-this'd to the chain, so it can be used as an iopipe also. 867 * 868 * Params: 869 * fun = Function that accepts as its first parameter the input chain (of 870 * type Chain), and as its second parameter, the buffer to read into. Only 871 * buffer types that are supported are used. 872 * c = The chain to read from 873 */ 874 template iosrc(alias fun, Chain) 875 { 876 struct IOSource 877 { 878 Chain chain; 879 size_t read(T)(T buf) if (is(typeof(fun(chain, buf)) == size_t)) 880 { 881 return fun(chain, buf); 882 } 883 884 // We are just wrapping the chain, so allow usage of it completely 885 alias chain this; 886 } 887 888 auto iosrc(Chain c) 889 { 890 return IOSource(c); 891 } 892 } 893 894 // TODO: need to deal with general ranges. 895 import std.typecons; 896 alias ReleaseOnWrite = Flag!"releaseOnWrite"; 897 /** 898 * Write data from a random access range or character array into the given 899 * iopipe. If relOnWrite is set to true (ReleaseOnWrite.yes), then all data 900 * before the provided offset, and any new data written to the pipe is always 901 * released. This is mainly useful for output buffers where you do not wish to 902 * allocate extra space in the buffer, and wish to flush the buffer when it's 903 * full. 904 * 905 * If relOnWrite is false, then the pipe data is not released, and you should 906 * consider the "written" part to be the offset + the return value. 907 * 908 * Params: c = The iopipe chain to write to. 909 * data = The range to write to the chain. 910 * offset = The starting point to write the data. 911 * relOnWrite = If true, data is released as it is written, otherwise, 912 * it's not released. 913 * 914 * Returns: The number of elements written. This should match the elements of 915 * the range, but could potentially be less if there wasn't a way to extend 916 * more space and more space was needed. 917 */ 918 size_t writeBuf(ReleaseOnWrite relOnWrite = ReleaseOnWrite.yes, Chain, Range)(ref Chain c, Range data, size_t offset = 0) 919 if (isIopipe!Chain && __traits(compiles, (c.window[0 .. 0] = data[0 .. 0]))) 920 { 921 assert(offset <= c.window.length); 922 static if(relOnWrite) 923 { 924 // always release the offset bytes 925 if(offset) 926 c.release(offset); 927 enum offsetVal = 0; 928 } 929 else 930 { 931 // define an alias to help write the common code. 932 alias offsetVal = offset; 933 } 934 // trivial case 935 if(data.length == 0) 936 return 0; 937 938 size_t result = data.length; 939 940 if(c.window.length == offsetVal) 941 c.extend(0); 942 943 while(true) 944 { 945 const dlen = data.length; 946 const wlen = c.window.length - offsetVal; 947 if(wlen == 0) 948 return result - dlen; 949 if(wlen >= dlen) 950 { 951 c.window[offsetVal .. offsetVal + dlen] = data[]; 952 static if(relOnWrite) 953 { 954 c.release(dlen); 955 } 956 return result; 957 } 958 else 959 { 960 c.window[offsetVal .. $] = data[0 .. wlen]; 961 data = data[wlen .. $]; 962 static if(relOnWrite) 963 { 964 c.release(wlen); 965 } 966 else 967 { 968 offsetVal += wlen; 969 } 970 // no more available buffer to write. Need to fetch more. 971 c.extend(0); 972 } 973 } 974 } 975 976 // TODO: need unittests for writeBuf