1 /**
2  Valve mechanism to allow manipulation of wrapped iopipe pieces.
3 Copyright: Copyright Steven Schveighoffer 2011-.
4 License:   Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy
5            at http://www.boost.org/LICENSE_1_0.txt)
6 Authors:   Steven Schveighoffer
7  */
8 module iopipe.valve;
9 import iopipe.traits;
10 import std.traits : TemplateOf, isType;
11 
12 private struct SimpleValve(Chain)
13 {
14     Chain valve;
15 
16     auto window()
17     {
18         return valve.window;
19     }
20 
21     void release(size_t elements)
22     {
23         valve.release(elements);
24     }
25 
26     size_t extend(size_t elements)
27     {
28         return valve.extend(elements);
29     }
30 }
31 
32 /**
33  * Create a simple valve in an iopipe chain.
34  *
35  * This puts a transparent layer between the given chain and the next downstream iopipe to provide valve access. Calling valve on the resulting iopipe gives access to the chain argument passed in.
36  *
37  * Params: chain = The upstream iopipe chain to provide valve access to.
38  *
39  * Returns: A new iopipe chain that provides a valve access point to the parameter.
40  */
41 auto simpleValve(Chain)(Chain chain) if  (isIopipe!Chain)
42 {
43     return SimpleValve!Chain(chain);
44 }
45 
46 @safe unittest
47 {
48     import iopipe.bufpipe;
49 
50     static struct MyPipe(Chain)
51     {
52         int addend;
53         Chain chain;
54         size_t extend(size_t elements)
55         {
56             auto newElems = chain.extend(elements);
57             chain.window[$-newElems .. $] += addend;
58             return newElems;
59         }
60 
61         auto window() { return chain.window; }
62 
63         void release(size_t elements) { chain.release(elements); }
64     }
65 
66     static auto simplePipe(size_t extendElementsDefault = 1, Chain)(Chain c)
67     {
68         return SimplePipe!(Chain, extendElementsDefault)(c);
69     }
70 
71     // create two pipes strung together with valve access to the mypipe instance
72     auto initialPipe = simplePipe([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
73     auto pipeline =  simplePipe(MyPipe!(typeof(initialPipe))(1, initialPipe).simpleValve);
74 
75     assert(pipeline.window.length == 0);
76     assert(pipeline.extend(5) == 5);
77     assert(pipeline.window == [2, 3, 4, 5, 6]);
78     assert(!__traits(compiles, pipeline.addend = 2));
79     pipeline.valve.addend = 2;
80     assert(pipeline.extend(10) == 5);
81     assert(pipeline.window == [2, 3, 4, 5, 6, 8, 9, 10, 11, 12]);
82 }
83 
84 private struct HoldingValveInlet(Chain)
85 {
86     private
87     {
88         Chain chain;
89         size_t ready;
90         size_t downstream;
91     }
92 
93     auto window()
94     {
95         return chain.window[ready .. $];
96     }
97 
98     // release data to the outlet
99     void release(size_t elements)
100     {
101         assert(ready + elements <= chain.window.length);
102         ready += elements;
103     }
104 
105     size_t extend(size_t elements)
106     {
107         // get more data for inlet
108         return chain.extend(elements);
109     }
110 
111     mixin implementValve!(chain);
112 }
113 
114 /**
115  * Create a valve that uses a holding location to pass data from the inlet to the outlet.
116  *
117  * A holding valve allows one to manually control when data is released downstream. The holding valve consists of 3 parts:
118  *  - An input buffer, controlled by an iopipe called the inlet. This gives access to the input parameter chain.
119  *  - A holding area for data that has been released by the inlet to the outlet. This is basically a FIFO queue.
120  *  - An output buffer, controlled by an iopipe called the outlet. This is the tail end of the holding valve, and provides data downstream.
121  *
122  * The inlet is a bit different than the normal iopipe, because it doesn't release data upstream, but rather downstream into the holding area.
123  *
124  * The outlet, when releasing data goes upstream with the release call, giving the data back to the buffered source.
125  *
126  * One major purpose of the holding valve is to use an autoValve to automate the downstream section, and let the user code interact directly with the inlet.
127  *
128  * For example, this creates effectively an output stream:
129  *
130  * ---------
131  * import std.format;
132  *
133  * auto stream = bufferedSource!(char).holdingValve.outputFile("out.txt").autoValve;
134  * stream.extend(100); // create a buffer of 100 characters (at least)
135  *
136  * void outputRange(const(char)[] str)
137  * {
138  *    if(stream.window.length < str.length)
139  *       stream.extend(0); // extend as needed
140  *    stream.window[0 .. str.length] = str;
141  *    stream.release(str.length);
142  * }
143  * foreach(i; 0 .. 100)
144  * {
145  *    outputRange.formatValue(i);
146  * }
147  *
148  * --------
149  *
150  * Params: chain = The upstream iopipe to which the valve controls access.
151  * Returns: A valve assembly that gives access to the outlet via the return iopipe, and access to the inlet via the valve member.
152  */
153 template holdingValve(Chain) if (isIopipe!Chain)
154 {
155     struct Outlet
156     {
157         HoldingValveInlet!Chain valve;
158 
159         auto window()
160         {
161             return valve.chain.window[0 .. valve.downstream];
162         }
163 
164         void release(size_t elements)
165         {
166             assert(elements <= valve.downstream);
167             valve.chain.release(elements);
168             valve.ready -= elements;
169             valve.downstream -= elements;
170         }
171 
172         size_t extend(size_t elements)
173         {
174             // ignore parameter, we can only push elements that have been released
175             auto result = valve.ready - valve.downstream;
176             valve.downstream = valve.ready;
177 
178             return result;
179         }
180     }
181 
182     auto holdingValve(Chain chain)
183     {
184         return Outlet(HoldingValveInlet!Chain(chain));
185     }
186 }
187 
188 /**
189  * Create an auto-flushing valve loop. This is for use with a chain where the next
190  * valve is a holding valve. What this does is automatically run the outlet of
191  * the holding valve so it seamlessly flushes all data when required.
192  *
193  * Note that this will ONLY work if the first valve in the chain is a holdingValve.
194  *
195  * The valve loop provides the flush function which allows you to flush any
196  * released data through the loop without extending. This function returns the
197  * number of elements flushed.
198  *
199  * See holdingValve for a better explanation.
200  */
201 template holdingLoop(Chain) if(hasValve!(Chain) && __traits(isSame, TemplateOf!(PropertyType!(Chain.init.valve)), HoldingValveInlet))
202 {
203     struct AutoValve
204     {
205         Chain outlet;
206 
207         // needed for implementValve
208         private ref inlet() { return outlet.valve; }
209 
210         auto window()
211         {
212             return inlet.window;
213         }
214 
215         void release(size_t elements)
216         {
217             inlet.release(elements);
218         }
219 
220         size_t extend(size_t elements)
221         {
222             // release any outstanding data that is on the outlet, this allows
223             // the source buffer to reuse the data.
224             flush();
225             return inlet.extend(elements);
226         }
227 
228         mixin implementValve!(inlet);
229 
230         // flush the data waiting in the outlet
231         size_t flush()
232         {
233             outlet.extend(0);
234             auto result = outlet.window.length;
235             outlet.release(result);
236             return result;
237         }
238     }
239 
240     auto holdingLoop(Chain chain)
241     {
242         return AutoValve(chain);
243     }
244 }
245 
246 unittest
247 {
248     char[] destBuf;
249     destBuf.reserve(100);
250 
251 
252     struct writer(Chain)
253     {
254         Chain upstream;
255         auto window() { return upstream.window; }
256         size_t extend(size_t elements)
257         {
258             auto newElems = upstream.extend(elements);
259             destBuf ~= upstream.window[$-newElems .. $];
260             return newElems;
261         }
262 
263         void release(size_t elements) { upstream.release(elements); }
264 
265         mixin implementValve!(upstream);
266     }
267 
268     auto makeWriter(Chain)(Chain c)
269     {
270         return writer!(Chain)(c);
271     }
272 
273 
274     char[] sourceBuf = new char[100];
275     auto pipeline = sourceBuf.simpleValve.push!(c => makeWriter(c));
276 
277     void write(string s)
278     {
279         pipeline.window[0 .. s.length] = s;
280         pipeline.release(s.length);
281     }
282 
283     assert(pipeline.window.length == 100);
284     assert(destBuf.length == 0);
285     // write some data
286     write("hello");
287     assert(pipeline.window.length == 95);
288     assert(pipeline.valve.window.length == 100);
289     assert(destBuf.length == 0);
290     assert(pipeline.flush == 5);
291     assert(destBuf == "hello");
292     assert(pipeline.valve.window.length == 95);
293     write(", world!");
294     assert(pipeline.window.length == 87);
295     assert(pipeline.valve.window.length == 95);
296     assert(destBuf == "hello");
297     assert(pipeline.extend(100) == 0); // cannot extend normal array automatically
298     assert(pipeline.valve.window.length == 87);
299     assert(destBuf == "hello, world!");
300 
301 }
302 
303 template autoFlush(Chain) if (__traits(hasMember, Chain, "flush"))
304 {
305     struct AutoFlusher
306     {
307         Chain c;
308         alias c this;
309         static if(is(typeof((Chain c) @safe { c.flush(); })))
310             ~this() @safe
311             {
312                 c.flush();
313             }
314         else
315             ~this() @system
316             {
317                 c.flush();
318             }
319     }
320 
321     auto autoFlush(Chain c)
322     {
323         import iopipe.refc;
324         return RefCounted!AutoFlusher(c);
325     }
326 }
327 
328 /**
329  * Convenience mechanism to wrap a specified output pipeline with a holding
330  * loop. It avoids having to explicitly specify the loop begin and end.
331  *
332  * Params:
333  *    pipeline = a lambda template used to generate the pipeline that will
334  *    be set up as a push chain.
335  *    autoFlush = true (default) if you wish to auto-flush the push pipeline
336  *    when all references to it are gone. This moves the whole chain into a
337  *    RefCounted struct which automatically flushes any remaining data that
338  *    hasn't been flushed.
339  *    c = An ioPipe to be used as the source for the data being pushed.
340  * Returns: A wrapped chain that will push any data that is released as needed
341  * (i.e. as the buffer fills up).
342  *
343  * Note: If autoFlush is false, you will need to manually call flush on the
344  * pipeline after all processing is done.
345  */
346 auto push(alias pipeline, bool autoFlush = true, Chain)(Chain c) if (isIopipe!(typeof(pipeline(c.holdingValve))))
347 {
348     static if(autoFlush)
349     {
350         import std.typecons: refCounted;
351         return .autoFlush(pipeline(c.holdingValve).holdingLoop);
352     }
353     else
354         return pipeline(c.holdingValve).holdingLoop;
355 }
356 
357 // TODO: need good example to show how to use this.
358 
359 
360 /**
361  * Go down the chain of valves until you find a valve of the given type. This
362  * is useful if you know there is a pipe you are looking for in the chain of valves.
363  *
364  * Params:
365  *     T = type or template of valve you are looking for
366  *     pipe = iopipe you are searching
367  *
368  * Returns:
369  *     a valve of the specified type or template. If such a valve doesn't
370  *     exist, a static error occurs.
371  */
372 auto valveOf(T, Chain)(ref Chain pipe) if (isType!T && isIopipe!Chain && hasValve!Chain)
373 {
374     alias V = PropertyType!(pipe.valve);
375     static if(is(V == T))
376         return pipe.valve;
377     else static if(is(typeof(.valveOf!T(pipe.valve))))
378         return pipe.valve.valveOf!T;
379     else
380         static assert(0, "Pipe type " ~ Chain.stringof ~ " does not have valve of type " ~ T.stringof);
381 }
382 
383 /// ditto
384 auto valveOf(alias X, Chain)(ref Chain pipe) if (!isType!X && isIopipe!Chain && hasValve!Chain)
385 {
386     alias V = PropertyType!(pipe.valve);
387     static if(__traits(isSame, TemplateOf!V, X))
388         return pipe.valve;
389     else static if(is(typeof(pipe.valve.valveOf!X)))
390         return pipe.valve.valveOf!T;
391     else
392         static assert(0, "Pipe type " ~ Chain.stringof ~ " does not have valve based on template " ~ T.stringof);
393 }
394 
395 @safe unittest
396 {
397     string basepipe = "hello world";
398     auto p = basepipe.simpleValve.simpleValve;
399     assert(p.valveOf!string is basepipe);
400     alias T = typeof(p.valveOf!SimpleValve);
401     static assert(is(T == SimpleValve!string));
402 }