1 /**
2   Core functionality for iopipe. Defines the base types for manipulating and
3   processing data.
4 Copyright: Copyright Steven Schveighoffer 2011-.
5 License:   Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy
6            at http://www.boost.org/LICENSE_1_0.txt)
7 Authors:   Steven Schveighoffer
8  */
9 module iopipe.bufpipe;
10 import iopipe.buffer;
11 import iopipe.traits;
12 import std.traits : isDynamicArray, hasMember;
13 import std.range.primitives;
14 
15 
16 /**
17  * An example processor. This demonstrates the required items for implementing
18  * an iopipe.
19  *
20  * SimplePipe will only extend exactly the elements requested (from what is
21  * availble), so it can be used for testing with a static buffer to simulate
22  * data coming in at any rate.
23  */
24 struct SimplePipe(Chain, size_t extendElementsDefault = 1) if(isIopipe!Chain)
25 {
26     /**
27      * The upstream data. This can be any iopipe. Throughout the library, the
28      * upstream data is generally saved as a member called "chain" as a matter
29      * of convention. This is not required or expected.
30      */
31     Chain chain;
32 
33     // how many elements are we looking at.
34     private size_t downstreamElements;
35 
36     /**
37      * Build on top of an existing chain 
38      */
39     this(Chain c)
40     {
41         this.chain = c;
42     }
43 
44     /**
45      * Get the current window of elements for the pipe. This is the data that
46      * can be used at this moment in time.
47      */
48     auto window() { return chain.window[0..downstreamElements]; }
49 
50     /**
51      * Get more data from the pipe. The parameter indicates the desired number
52      * of elements to add to the end of the window. If 0 is passed, then it is
53      * up to the implementation of the pipe to determine the optimal number of
54      * elements to add.
55      *
56      * Params: elements = Number of elements requested.
57      * Returns: The number of elements added. This can be less than or more
58      *          than the parameter, but will only be 0 when no more elements
59      *          can be added. This signifies EOF.
60      */
61     size_t extend(size_t elements)
62     {
63         auto left = chain.window.length - downstreamElements;
64         if(elements == 0)
65         {
66             elements = extendElementsDefault;
67             // special case
68             if(elements <= left)
69             {
70                 downstreamElements += elements;
71                 return elements;
72             }
73             else
74             {
75                 elements = chain.extend(0);
76                 downstreamElements += left + elements;
77                 return left + elements;
78             }
79         }
80         else if(elements <= left)
81         {
82             downstreamElements += elements;
83             return elements;
84         }
85         else
86         {
87             elements -= left;
88             left += chain.extend(elements);
89             downstreamElements += left;
90             return left;
91         }
92     }
93 
94     /**
95      * Release the given number of elements from the front of the window. After
96      * calling this, make sure to update any tracking indexes for the window
97      * that you are maintaining.
98      *
99      * Params: elements = The number of elements to release.
100      */
101     void release(size_t elements)
102     {
103         assert(elements <= downstreamElements);
104         downstreamElements -= elements;
105         chain.release(elements);
106     }
107 
108     static if(hasValve!(Chain))
109     {
110         /**
111          * Implement the required valve function. If the pipe you are wrapping
112          * has a valve, you must provide ref access to the valve.
113          *
114          * Note, the correct boilerplate implementation can be inserted by
115          * adding the following line to your pipe structure:
116          *
117          * ------
118          * mixin implementValve!(nameOfUpstreamPipe);
119          * ------
120          *
121          * Returns: A valve inlet that allows you to control flow of the data
122          * through this pipe.
123          *
124          * See Also: iopipe.valve
125          */
126         ref valve() { return chain.valve; }
127     }
128 }
129 
130 ///
131 @safe unittest
132 {
133     // any array is a valid iopipe source.
134     auto source = "hello, world!";
135 
136     auto pipe = SimplePipe!(string)(source);
137 
138     // SimplePipe holds back data until you extend it.
139     assert(pipe.window.length == 0);
140 
141     // Note: elements of narrow strings are code units for purposes of iopipe
142     // library.
143     assert(pipe.extend(5) == 5);
144     assert(pipe.window == "hello");
145 
146     // Release data to inform the pipe you are done with it
147     pipe.release(3);
148     assert(pipe.window == "lo");
149 
150     // you can request "some data" by extending with 0, letting the pipe define
151     // what is the best addition of data. This is useful for optimizing OS
152     // system call reads.
153     assert(pipe.extend(0) == 1);
154     assert(pipe.window == "lo,");
155 
156     // you aren't guaranteed to get all the data you ask for.
157     assert(pipe.extend(100) == 7);
158     assert(pipe.window == "lo, world!");
159 
160     pipe.release(pipe.window.length);
161 
162     // this signifies EOF.
163     assert(pipe.extend(1) == 0);
164 }
165 
166 @safe unittest
167 {
168     import std.range : iota, array;
169     import std.algorithm : equal;
170     auto buf = iota(100).array;
171     auto p = SimplePipe!(typeof(buf))(buf);
172 
173     assert(p.window.length == 0);
174     assert(p.extend(50) == 50);
175     assert(p.window.length == 50);
176     assert(p.window.equal(iota(50)));
177     p.release(20);
178     assert(p.window.length == 30);
179     assert(p.window.equal(iota(20, 50)));
180     assert(p.extend(100) == 50);
181     assert(p.window.length == 80);
182     assert(p.window.equal(iota(20, 100)));
183     p.release(80);
184     assert(p.extend(1) == 0);
185     assert(p.window.length == 0);
186 }
187 
188 private void swapBytes(R)(R data) if(typeof(R.init[0]).sizeof == 4 || typeof(R.init[0]).sizeof == 2)
189 {
190     enum width = typeof(R.init[0]).sizeof;
191     if(data.length == 0)
192         // no reason to byteswap no data
193         return;
194     static if(width == 2)
195     {
196         // TODO: this only works for arrays.
197         ushort[] sdata = cast(ushort[])data;
198         () @trusted {
199             assert(sdata.length > 0);
200             if((cast(size_t)sdata.ptr & 0x03) != 0)
201             {
202                 // first element not 4-byte aligned, do that one by hand
203                 *sdata.ptr = ((*sdata.ptr << 8) & 0xff00) |
204                     ((*sdata.ptr >> 8) & 0x00ff);
205                 sdata.popFront();
206             }
207 
208             // handle misaligned last element
209             if(sdata.length % 2 != 0)
210             {
211                 const last = sdata.length - 1;
212                 sdata.ptr[last] = ((sdata.ptr[last] << 8) & 0xff00) |
213                     ((sdata.ptr[last] >> 8) & 0x00ff);
214                 sdata.popBack();
215             }
216         }();
217 
218         // rest of the data is 4-byte multiple and aligned
219         // TODO:  see if this can be optimized further, or if there are
220         // better options for 64-bit.
221         uint[] idata = cast(uint[])sdata;
222         foreach(ref t; idata)
223         {
224             t = ((t << 8) & 0xff00ff00) |
225                 ((t >> 8) & 0x00ff00ff);
226         }
227     }
228     else
229     {
230         import core.bitop : bswap;
231         // swap every 4 bytes
232         foreach(ref t; cast(uint[])data)
233         {
234             t = bswap(t);
235         }
236     }
237 }
238 
239 @safe unittest
240 {
241     void doIt(T)(T[] t) @safe
242     {
243         import std.algorithm;
244         import std.range;
245         auto compareTo = t.map!(a => (a << ((T.sizeof-1) * 8))).array;
246         swapBytes(t);
247         assert(t == compareTo);
248     }
249 
250     doIt(cast(ushort[])[1, 2, 3, 4, 5]);
251     doIt(cast(uint[])[6, 7, 8, 9, 10]);
252 }
253 
254 // should be a voldemort type, but that results in template bloat
255 private struct ByteSwapProcessor(Chain)
256 {
257     Chain chain;
258 
259     auto window() { return chain.window; }
260     size_t extend(size_t elements)
261     {
262         auto newData = chain.extend(elements);
263         auto data = chain.window;
264         swapBytes(data[$-newData..$]);
265         return newData;
266     }
267 
268     void release(size_t elements) { chain.release(elements); }
269 
270     mixin implementValve!(chain);
271 }
272 
273 version(LittleEndian)
274     private enum IsLittleEndian = true;
275 else
276     private enum IsLittleEndian = false;
277 
278 /**
279  * Swap the bytes of every element before handing to next processor. The
280  * littleEndian compile-time parameter indicates what endianness the data is
281  * in. If it matches the platform's endianness, then nothing is done (no byte
282  * swap occurs). Otherwise, a byte swap processor is returned wrapping the io
283  * pipe.
284  *
285  * Note, the width of the elements in the iopipe's window must be 2 or 4 bytes
286  * wide, and mutable.
287  *
288  * Params: littleEndian = true if the data arrives in little endian mode, false
289  *             if in big endian mode.
290  *         c = Source pipe chain for the byte swapper.
291  * Returns: If endianness of the source matches platform, this returns c,
292  *          otherwise, it returns a byte swapping iopipe wrapper that performs
293  *          the byte swaps.
294  */
295 auto byteSwapper(bool littleEndian = !IsLittleEndian, Chain)(Chain c) if(isIopipe!(Chain) && is(typeof(swapBytes(c.window))))
296 {
297     version(LittleEndian)
298     {
299         static if(littleEndian)
300             return c;
301         else
302         {
303             swapBytes(c.window);
304             return ByteSwapProcessor!Chain(c); 
305         }
306     }
307     else
308     {
309         static if(littleEndian)
310         {
311             // need to byteswap existing data, since we only byteswap on extend
312             swapBytes(c.window);
313             return ByteSwapProcessor!Chain(c);
314         }
315         else
316             return c;
317     }
318 }
319 
320 @safe unittest
321 {
322     import std.algorithm;
323     auto arr = [1, 2, 3, 4, 5];
324     auto c = arr.dup.byteSwapper;
325     assert(c.window.equal(arr.map!(a => (a << 24))));
326 }
327 
328 private struct ArrayCastPipe(Chain, T) if(isIopipe!(Chain) && isDynamicArray!(WindowType!(Chain)))
329 {
330     Chain chain;
331     // upstream element type
332     alias UE = typeof(Chain.init.window()[0]);
333 
334     static if(UE.sizeof > T.sizeof)
335     {
336         // needed to keep track of partially released elements.
337         ubyte offset;
338         enum Ratio = UE.sizeof / T.sizeof;
339         static assert(UE.sizeof % T.sizeof == 0); // only support multiples
340     }
341     else
342     {
343         enum Ratio = T.sizeof / UE.sizeof;
344         static assert(T.sizeof % UE.sizeof == 0);
345     }
346 
347     auto window() @trusted
348     {
349         static if(UE.sizeof > T.sizeof)
350         {
351             // note, we avoid a cast of arrays because that invokes a runtime call
352             auto w = chain.window;
353             return (cast(T*)w.ptr)[offset .. w.length * Ratio];
354         }
355         else static if(UE.sizeof == T.sizeof)
356             // ok to cast array here, because it's the same size (no runtime call)
357             return cast(T[])chain.window;
358         else
359         {
360             // note, we avoid a cast of arrays because that invokes a runtime call
361             auto w = chain.window;
362             return (cast(T*)w.ptr)[0 .. w.length / Ratio];
363         }
364     }
365 
366     size_t extend(size_t elements)
367     {
368         static if(UE.sizeof < T.sizeof)
369         {
370             // need a minimum number of items
371             auto win = chain.window;
372             immutable origLength = win.length / Ratio;
373             auto targetLength = win.length + Ratio - win.length % Ratio;
374             while(win.length < targetLength)
375             {
376                 if(chain.extend(elements * Ratio) == 0)
377                 {
378                     return 0;
379                 }
380                 win = chain.window;
381             }
382             return window.length - origLength;
383         }
384         else
385         {
386             // need to round up to the next UE.
387             immutable translatedElements = (elements + Ratio - 1) / Ratio;
388             return chain.extend(translatedElements) * Ratio;
389         }
390     }
391 
392     void release(size_t elements)
393     {
394         static if(UE.sizeof <= T.sizeof)
395         {
396             chain.release(elements * Ratio);
397         }
398         else
399         {
400             // may need to keep one of the upstream elements because we only
401             // released part of it
402             elements += offset;
403             offset = elements % Ratio;
404             chain.release(elements / Ratio);
405         }
406     }
407 
408     mixin implementValve!(chain);
409 }
410 
411 /**
412  * Given a pipe chain whose window is a straight array, create a pipe chain that
413  * converts the array to another array type.
414  *
415  * Note: This new pipe chain handles any alignment issues when partial
416  *       elements have been extended/released. Also, the size of the new
417  *       element type must be a multiple of, or divide evenly into, the
418  *       original array.
419  *
420  * Params: T = Element type for new pipe chain window
421  *         c = Source pipe chain to use for new chain.
422  *
423  * Returns: New pipe chain with new array type.
424  */
425 auto arrayCastPipe(T, Chain)(Chain c) if(isIopipe!(Chain) && isDynamicArray!(WindowType!(Chain)))
426 {
427     static if(is(typeof(c.window[0]) == T))
428         return c;
429     else
430         return ArrayCastPipe!(Chain, T)(c);
431 }
432 
433 @safe unittest
434 {
435     // test going from int to ubyte
436     auto arr = [1, 2, 3, 4, 5];
437     auto arr2 = cast(ubyte[])arr;
438     auto p = arr.arrayCastPipe!(ubyte);
439     assert(p.window == arr2);
440     p.release(3);
441     assert(p.window == arr2[3 .. $]);
442     // we can only release when all 4 bytes of the array are gone
443     assert(p.chain == arr);
444     p.release(1);
445     assert(p.chain == arr[1 .. $]);
446 
447     // test going from ubyte to int, but shave off one byte
448     assert(arr2[0 .. $-1].arrayCastPipe!(int).window == arr[0 .. $-1]);
449 }
450 
451 /**
452  * Extend a pipe until it has a minimum number of elements. If the minimum
453  * elements are already present, does nothing.
454  *
455  * This is useful if you need a certain number of elements in the pipe before
456  * you can process any more data.
457  *
458  * Params: chain = The pipe to work on.
459  *         elems = The number of elements to ensure are in the window. If
460  *         omitted, all elements are extended.
461  * Returns: The resulting number of elements in the window. This may be less
462  *          than the requested elements if the pipe ran out of data.
463  */
464 size_t ensureElems(Chain)(ref Chain chain, size_t elems = size_t.max)
465 {
466     while(chain.window.length < elems)
467     {
468         if(chain.extend(elems - chain.window.length) == 0)
469             break;
470     }
471     return chain.window.length;
472 }
473 
474 @safe unittest
475 {
476     auto p = SimplePipe!(string)("hello, world");
477     assert(p.ensureElems(5) == 5);
478     assert(p.window == "hello");
479     assert(p.ensureElems(3) == 5);
480     assert(p.window == "hello");
481     assert(p.ensureElems(100) == 12);
482     assert(p.window == "hello, world");
483 }
484 
485 // bug #11
486 @safe unittest
487 {
488     auto x = "hello, world".iosrc!((ref c, b) {
489                                    if(b.length > c.window.length)
490                                       b = b[0 .. c.window.length];
491                                    b[] = c.window[0 .. b.length];
492                                    c.release(b.length);
493                                    return b.length; })
494                                    .bufd!(char);
495     auto elems = x.ensureElems();
496     assert(elems == 12);
497 }
498 
499 struct BufferedInputSource(BufferManager, Source, size_t optimalReadSize)
500 {
501     Source dev;
502     BufferManager buffer;
503     auto window()
504     {
505         return buffer.window;
506     }
507 
508     void release(size_t elements)
509     {
510         buffer.releaseFront(elements);
511     }
512 
513     size_t extend(size_t elements)
514     {
515         import std.algorithm.comparison : max, min;
516         auto oldLen = buffer.window.length;
517 
518         if(elements == 0 || (elements < optimalReadSize && buffer.capacity == 0))
519         {
520             // optimal read, or first read. Use optimal read size
521             elements = optimalReadSize;
522         }
523         else
524         {
525             // requesting a specific amount. Don't want to over-allocate the
526             // buffer, limit the request to 2x current elements, or optimal
527             // read size, whatever is larger.
528             immutable cap = max(optimalReadSize, oldLen * 2);
529             if(elements > cap)
530                 elements = cap;
531         }
532 
533         // ensure we maximize buffer use.
534         elements = max(elements, buffer.avail());
535 
536         if(buffer.extend(elements) == 0)
537         {
538             // could not extend;
539             return 0;
540         }
541 
542         auto nread = dev.read(buffer.window[oldLen .. $]);
543         // give back data we did not read.
544         buffer.releaseBack(buffer.window.length - oldLen - nread);
545         return nread;
546     }
547 
548     // need to forward valves if we are going through rebuffered data.
549     static if(hasValve!Source)
550         mixin implementValve!dev;
551 }
552 
553 /**
554  * Create a buffer to manage the data from the given source, and wrap into an iopipe.
555  *
556  * Params: T = The type of element to allocate with the allocator
557  *         Allocator = The allocator to use for managing the buffer
558  *         Source = The type of the input stream. This must have a function
559  *         `read` that can read into the buffer's window.
560  *         dev = The input stream to use. If not specified, then a NullDev source is assumed.
561  *         args = Arguments passed to the allocator (for allocators that need initialization)
562  *
563  * Returns: An iopipe that uses the given buffer to read data from the given device source.
564  * The version which takes no parameter uses a NullDev as a source.
565  */
566 auto bufd(T=ubyte, Allocator = GCNoPointerAllocator, size_t optimalReadSize = 8 * 1024 / T.sizeof, Source, Args...)(Source dev, Args args)
567     if(hasMember!(Source, "read") && is(typeof(dev.read(T[].init)) == size_t))
568 {
569     alias BM = AllocatedBuffer!(T, Allocator, optimalReadSize);
570     static if(Args.length > 0)
571         return BufferedInputSource!(BM, Source, optimalReadSize)(dev, BM(Allocator(args)));
572     else
573         return BufferedInputSource!(BM, Source, optimalReadSize)(dev);
574 }
575 
576 /// ditto
577 auto bufd(T=ubyte, Allocator = GCNoPointerAllocator, size_t optimalReadSize = (T.sizeof > 4 ?  8 : 32 / T.sizeof), Args...)(Args args)
578 {
579     import iopipe.stream: nullDev;
580     return nullDev.bufd!(T, Allocator, optimalReadSize)(args);
581 }
582 
583 /**
584  * Create a ring buffer to manage the data from the given source, and wrap into an iopipe.
585  *
586  * The iopipe RingBuffer type uses virtual memory mapping to have the same
587  * segment of data mapped to consecutive addresses. This allows true zero-copy
588  * usage. However, it does require use of resources that may possibly be
589  * limited, so you may want to justify that it's needed before using instead of
590  * bufd.
591  *
592  * Note also that a RingBuffer is not copyable (its destructor will unmap the
593  * memory), so this must use RefCounted to properly work.
594  *
595  * Params: T = The type of element to allocate with the allocator
596  *         Source = The type of the input stream. This must have a function
597  *         `read` that can read into the buffer's window.
598  *         dev = The input stream to use. If not specified, then a NullDev source is assumed.
599  *
600  * Returns: An iopipe that uses a RingBuffer to read data from the given device source.
601  */
602 auto rbufd(T=ubyte, size_t optimalReadSize = 8 * 1024 / T.sizeof, Source)(Source dev)
603     if(hasMember!(Source, "read") && is(typeof(dev.read(T[].init)) == size_t))
604 {
605     // need to refcount the ring buffer, since it's not copyable
606     import std.typecons : refCounted;
607     auto buffer = refCounted(RingBuffer!T());
608     return BufferedInputSource!(typeof(buffer), Source, optimalReadSize)(dev, buffer);
609 }
610 
611 /// Ditto
612 auto rbufd(T=ubyte, size_t optimalReadSize = 8 * 1024 / T.sizeof)()
613 {
614     import iopipe.stream: nullDev;
615     return nullDev.rbufd!(T, optimalReadSize)();
616 }
617 
618 // allocate using a region buffer (convenience)
619 auto lbufd(T=ubyte, size_t optimalReadSize = 128, Source)(Source dev, ubyte[] buf)
620 {
621     import std.experimental.allocator.building_blocks.region;
622     // for now, we only support buffers at least large enough to hold the
623     // initial read.
624     assert(buf.length >= optimalReadSize);
625     return bufd!(T, Region!(), optimalReadSize)(dev, buf);
626 }
627 
628 // allocate using a region buffer (convenience)
629 auto lbufd(T=ubyte, size_t optimalReadSize = 128)(ubyte[] buf)
630 {
631     import iopipe.stream : nullDev;
632     return lbufd!(T, optimalReadSize)(nullDev, buf);
633 }
634 
635 @safe unittest
636 {
637     // simple struct that "reads" data from a pre-defined string array into a char buffer.
638     static struct ArrayReader
639     {
640         string _src;
641         size_t read(char[] data)
642         {
643             auto ntoread = data.length;
644             if(ntoread > _src.length)
645                 ntoread = _src.length;
646             data[0 .. ntoread] = _src[0 .. ntoread];
647             _src = _src[ntoread .. $];
648             return ntoread;
649         }
650     }
651 
652     void test(P)(P p)
653     {
654         assert(p.window.length == 0);
655         assert(p.extend(0) == 13);
656         assert(p.window == "hello, world!");
657         assert(p.extend(0) == 0);
658     }
659     test(ArrayReader("hello, world!").bufd!char);
660     // ring buffers are @system
661     () @trusted { test(ArrayReader("hello, world!").rbufd!char); }();
662 }
663 
664 private struct OutputPipe(Chain, Sink)
665 {
666     Sink dev;
667     Chain chain;
668 
669     auto window()
670     {
671         return chain.window();
672     }
673 
674     size_t extend(size_t elements)
675     {
676         // get new elements, and then write them to the file
677         auto newData = chain.extend(elements);
678         ensureWritten(newData);
679         return newData;
680     }
681 
682     void release(size_t elements)
683     {
684         // just upstream this
685         chain.release(elements);
686     }
687 
688     size_t flush(size_t elements)
689     {
690         // extend and then release all data
691         extend(elements);
692         auto result = window.length;
693         release(window.length);
694         return result;
695     }
696 
697     private void ensureWritten(size_t dataToWrite)
698     {
699         while(dataToWrite)
700         {
701             auto nwritten = dev.write(chain.window[$-dataToWrite .. $]);
702             dataToWrite -= nwritten;
703         }
704     }
705 
706     mixin implementValve!(chain);
707 }
708 
709 /**
710  * An output pipe writes all its data to a given sink stream.  Any data in the
711  * output pipe's window has been written to the stream.
712  *
713  * The returned iopipe has a function "flush" that will extend a chunk of data
714  * and then release it immediately.
715  *
716  * Params: c = The input data to write to the stream.
717  *         dev = The output stream to write data to. This must have a function
718  *               `write` that can write a c.window.
719  *
720  * Returns: An iopipe that gives a view of the written data. Note that you
721  *          don't have to do anything with the data.
722  *
723  */
724 auto outputPipe(Chain, Sink)(Chain c, Sink dev) if(isIopipe!Chain && is(typeof(dev.write(c.window)) == size_t))
725 {
726     auto result = OutputPipe!(Chain, Sink)(dev, c);
727     result.ensureWritten(result.window.length);
728     return result;
729 }
730 
731 @safe unittest
732 {
733     // shim that simply verifies the data is correct
734     static struct OutputStream
735     {
736         string verifyAgainst;
737         size_t write(const(char)[] data)
738         {
739             assert(data.length <= verifyAgainst.length && data == verifyAgainst[0 .. data.length], verifyAgainst ~ " != " ~ data);
740             verifyAgainst = verifyAgainst[data.length .. $];
741             return data.length;
742         }
743     }
744 
745     auto pipe = "hello, world!".SimplePipe!(string, 5).outputPipe(OutputStream("hello, world!"));
746     do
747     {
748         pipe.release(pipe.window.length);
749     } while(pipe.extend(0) != 0);
750 }
751 
752 /**
753  * Process a given iopipe chain until it has reached EOF. This is accomplished
754  * by extending and releasing continuously until extend returns 0.
755  *
756  * Params: c = The iopipe to process
757  * Returns: The number of elements processed.
758  */
759 size_t process(Chain)(auto ref Chain c)
760 {
761     size_t result = 0;
762     do
763     {
764         auto elementsInChain = c.window.length;
765         result += elementsInChain;
766         c.release(elementsInChain);
767     } while(c.extend(0) != 0);
768 
769     return result;
770 }
771 
772 @safe unittest
773 {
774     assert("hello, world!".SimplePipe!(string, 5).process() == 13);
775 }
776 
777 private struct IoPipeRange(Chain)
778 {
779         Chain chain;
780         private size_t extendRequestSize;
781         bool empty() { return chain.window.length == 0; }
782         auto front() { return chain.window; }
783         void popFront()
784         {
785             chain.release(chain.window.length);
786             chain.extend(extendRequestSize);
787         }
788 }
789 
790 /**
791  * Convert an io pipe into a range, with each popFront releasing all the
792  * current data and extending a specified amount.
793  *
794  * Note that the function may call extend once before returning, depending on
795  * whether there is any data present or not.
796  *
797  * Params: extendRequestSize = The value to pass to c.extend when calling popFront
798  *         c = The chain to use as backing for this range.
799  */
800 auto asInputRange(size_t extendRequestSize = 0, Chain)(Chain c) if (isIopipe!Chain)
801 {
802     if(c.window.length == 0)
803         // attempt to prime the range, since empty will be true right away!
804         c.extend(extendRequestSize);
805     return IoPipeRange!Chain(c, extendRequestSize);
806 }
807 
808 @safe unittest
809 {
810     auto str = "abcdefghijklmnopqrstuvwxyz";
811     foreach(elem; str.SimplePipe!(string, 5).asInputRange)
812     {
813         assert(elem == str[0 .. elem.length]);
814         str = str[elem.length .. $];
815     }
816     assert(str.length == 0);
817 }
818 
819 private struct IoPipeElemRange(Chain)
820 {
821         Chain chain;
822         private size_t extendRequestSize;
823         bool empty() { return chain.window.length == 0; }
824         auto front() { return chain.window[0]; }
825         void popFront()
826         {
827             chain.release(1);
828             if(chain.window.length == 0)
829                 chain.extend(extendRequestSize);
830         }
831 }
832 
833 /**
834  * Convert an io pipe into a range of elements of the pipe. This effectively
835  * converts an iopipe range of T into a range of T. Note that auto-decoding
836  * does NOT happen still, so converting a string into an input range produces a
837  * range of char. The range is extended when no more data is in the window.
838  *
839  * Note that the function may call extend once before returning, depending on
840  * whether there is any data present or not.
841  *
842  * Params: extendRequestSize = The value to pass to c.extend when calling in
843  *             popFront
844  *         c = The chain to use as backing for this range.
845  */
846 auto asElemRange(size_t extendRequestSize = 0, Chain)(Chain c) if (isIopipe!Chain)
847 {
848     if(c.window.length == 0)
849         c.extend(extendRequestSize);
850     return IoPipeElemRange!Chain(c, extendRequestSize);
851 }
852 
853 @safe unittest {
854     auto str = "abcdefghijklmnopqrstuvwxyz";
855     import std.algorithm : equal;
856     import std.utf : byCodeUnit;
857     assert(equal(str.byCodeUnit, SimplePipe!(string, 5)(str).asElemRange));
858 }
859 
860 /**
861  * Create an input source from a given Chain, and a given translation function/template.
862  *
863  * It is advisable to use a template or lambda that does not require a closure,
864  * and is not a delegate from a struct that might move.
865  *
866  * The result is also alias-this'd to the chain, so it can be used as an iopipe also.
867  *
868  * Params:
869  *    fun = Function that accepts as its first parameter the input chain (of
870  *    type Chain), and as its second parameter, the buffer to read into. Only
871  *    buffer types that are supported are used.
872  *    c = The chain to read from
873  */
874 template iosrc(alias fun, Chain)
875 {
876     struct IOSource
877     {
878         Chain chain;
879         size_t read(T)(T buf) if (is(typeof(fun(chain, buf)) == size_t))
880         {
881             return fun(chain, buf);
882         }
883 
884         // We are just wrapping the chain, so allow usage of it completely
885         alias chain this;
886     }
887 
888     auto iosrc(Chain c)
889     {
890         return IOSource(c);
891     }
892 }
893 
894 // TODO: need to deal with general ranges.
895 import std.typecons;
896 alias ReleaseOnWrite = Flag!"releaseOnWrite";
897 /**
898  * Write data from a random access range or character array into the given
899  * iopipe. If relOnWrite is set to true (ReleaseOnWrite.yes), then all data
900  * before the provided offset, and any new data written to the pipe is always
901  * released. This is mainly useful for output buffers where you do not wish to
902  * allocate extra space in the buffer, and wish to flush the buffer when it's
903  * full.
904  *
905  * If relOnWrite is false, then the pipe data is not released, and you should
906  * consider the "written" part to be the offset + the return value.
907  *
908  * Params: c = The iopipe chain to write to.
909  *         data = The range to write to the chain.
910  *         offset = The starting point to write the data.
911  *         relOnWrite = If true, data is released as it is written, otherwise,
912  *            it's not released.
913  *
914  * Returns: The number of elements written. This should match the elements of
915  * the range, but could potentially be less if there wasn't a way to extend
916  * more space and more space was needed.
917  */
918 size_t writeBuf(ReleaseOnWrite relOnWrite = ReleaseOnWrite.yes, Chain, Range)(ref Chain c, Range data, size_t offset = 0)
919     if (isIopipe!Chain && __traits(compiles, (c.window[0 .. 0] = data[0 .. 0])))
920 {
921     assert(offset <= c.window.length);
922     static if(relOnWrite)
923     {
924         // always release the offset bytes
925         if(offset)
926             c.release(offset);
927         enum offsetVal = 0;
928     }
929     else
930     {
931         // define an alias to help write the common code.
932         alias offsetVal = offset;
933     }
934     // trivial case
935     if(data.length == 0)
936         return 0;
937 
938     size_t result = data.length;
939 
940     if(c.window.length == offsetVal)
941         c.extend(0);
942 
943     while(true)
944     {
945         const dlen = data.length;
946         const wlen = c.window.length - offsetVal;
947         if(wlen == 0)
948             return result - dlen;
949         if(wlen >= dlen)
950         {
951             c.window[offsetVal .. offsetVal + dlen] = data[];
952             static if(relOnWrite)
953             {
954                 c.release(dlen);
955             }
956             return result;
957         }
958         else
959         {
960             c.window[offsetVal .. $] = data[0 .. wlen];
961             data = data[wlen .. $];
962             static if(relOnWrite)
963             {
964                 c.release(wlen);
965             }
966             else
967             {
968                 offsetVal += wlen;
969             }
970             // no more available buffer to write. Need to fetch more.
971             c.extend(0);
972         }
973     }
974 }
975 
976 // TODO: need unittests for writeBuf