1 /** 2 Valve mechanism to allow manipulation of wrapped iopipe pieces. 3 Copyright: Copyright Steven Schveighoffer 2011-. 4 License: Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy 5 at http://www.boost.org/LICENSE_1_0.txt) 6 Authors: Steven Schveighoffer 7 */ 8 module iopipe.valve; 9 import iopipe.traits; 10 import std.traits : TemplateOf, isType; 11 12 private struct SimpleValve(Chain) 13 { 14 Chain valve; 15 16 auto window() 17 { 18 return valve.window; 19 } 20 21 void release(size_t elements) 22 { 23 valve.release(elements); 24 } 25 26 size_t extend(size_t elements) 27 { 28 return valve.extend(elements); 29 } 30 } 31 32 /** 33 * Create a simple valve in an iopipe chain. 34 * 35 * This puts a transparent layer between the given chain and the next downstream iopipe to provide valve access. Calling valve on the resulting iopipe gives access to the chain argument passed in. 36 * 37 * Params: chain = The upstream iopipe chain to provide valve access to. 38 * 39 * Returns: A new iopipe chain that provides a valve access point to the parameter. 40 */ 41 auto simpleValve(Chain)(Chain chain) if (isIopipe!Chain) 42 { 43 return SimpleValve!Chain(chain); 44 } 45 46 @safe unittest 47 { 48 import iopipe.bufpipe; 49 50 static struct MyPipe(Chain) 51 { 52 int addend; 53 Chain chain; 54 size_t extend(size_t elements) 55 { 56 auto newElems = chain.extend(elements); 57 chain.window[$-newElems .. $] += addend; 58 return newElems; 59 } 60 61 auto window() { return chain.window; } 62 63 void release(size_t elements) { chain.release(elements); } 64 } 65 66 static auto simplePipe(size_t extendElementsDefault = 1, Chain)(Chain c) 67 { 68 return SimplePipe!(Chain, extendElementsDefault)(c); 69 } 70 71 // create two pipes strung together with valve access to the mypipe instance 72 auto initialPipe = simplePipe([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 73 auto pipeline = simplePipe(MyPipe!(typeof(initialPipe))(1, initialPipe).simpleValve); 74 75 assert(pipeline.window.length == 0); 76 assert(pipeline.extend(5) == 5); 77 assert(pipeline.window == [2, 3, 4, 5, 6]); 78 assert(!__traits(compiles, pipeline.addend = 2)); 79 pipeline.valve.addend = 2; 80 assert(pipeline.extend(10) == 5); 81 assert(pipeline.window == [2, 3, 4, 5, 6, 8, 9, 10, 11, 12]); 82 } 83 84 private struct HoldingValveInlet(Chain) 85 { 86 private 87 { 88 Chain chain; 89 size_t ready; 90 size_t downstream; 91 } 92 93 auto window() 94 { 95 return chain.window[ready .. $]; 96 } 97 98 // release data to the outlet 99 void release(size_t elements) 100 { 101 assert(ready + elements <= chain.window.length); 102 ready += elements; 103 } 104 105 size_t extend(size_t elements) 106 { 107 // get more data for inlet 108 return chain.extend(elements); 109 } 110 111 mixin implementValve!(chain); 112 } 113 114 /** 115 * Create a valve that uses a holding location to pass data from the inlet to the outlet. 116 * 117 * A holding valve allows one to manually control when data is released downstream. The holding valve consists of 3 parts: 118 * - An input buffer, controlled by an iopipe called the inlet. This gives access to the input parameter chain. 119 * - A holding area for data that has been released by the inlet to the outlet. This is basically a FIFO queue. 120 * - An output buffer, controlled by an iopipe called the outlet. This is the tail end of the holding valve, and provides data downstream. 121 * 122 * The inlet is a bit different than the normal iopipe, because it doesn't release data upstream, but rather downstream into the holding area. 123 * 124 * The outlet, when releasing data goes upstream with the release call, giving the data back to the buffered source. 125 * 126 * One major purpose of the holding valve is to use an autoValve to automate the downstream section, and let the user code interact directly with the inlet. 127 * 128 * For example, this creates effectively an output stream: 129 * 130 * --------- 131 * import std.format; 132 * 133 * auto stream = bufferedSource!(char).holdingValve.outputFile("out.txt").autoValve; 134 * stream.extend(100); // create a buffer of 100 characters (at least) 135 * 136 * void outputRange(const(char)[] str) 137 * { 138 * if(stream.window.length < str.length) 139 * stream.extend(0); // extend as needed 140 * stream.window[0 .. str.length] = str; 141 * stream.release(str.length); 142 * } 143 * foreach(i; 0 .. 100) 144 * { 145 * outputRange.formatValue(i); 146 * } 147 * 148 * -------- 149 * 150 * Params: chain = The upstream iopipe to which the valve controls access. 151 * Returns: A valve assembly that gives access to the outlet via the return iopipe, and access to the inlet via the valve member. 152 */ 153 template holdingValve(Chain) if (isIopipe!Chain) 154 { 155 struct Outlet 156 { 157 HoldingValveInlet!Chain valve; 158 159 auto window() 160 { 161 return valve.chain.window[0 .. valve.downstream]; 162 } 163 164 void release(size_t elements) 165 { 166 assert(elements <= valve.downstream); 167 valve.chain.release(elements); 168 valve.ready -= elements; 169 valve.downstream -= elements; 170 } 171 172 size_t extend(size_t elements) 173 { 174 // ignore parameter, we can only push elements that have been released 175 auto result = valve.ready - valve.downstream; 176 valve.downstream = valve.ready; 177 178 return result; 179 } 180 } 181 182 auto holdingValve(Chain chain) 183 { 184 return Outlet(HoldingValveInlet!Chain(chain)); 185 } 186 } 187 188 /** 189 * Create an auto-flushing valve loop. This is for use with a chain where the next 190 * valve is a holding valve. What this does is automatically run the outlet of 191 * the holding valve so it seamlessly flushes all data when required. 192 * 193 * Note that this will ONLY work if the first valve in the chain is a holdingValve. 194 * 195 * The valve loop provides the flush function which allows you to flush any 196 * released data through the loop without extending. This function returns the 197 * number of elements flushed. 198 * 199 * See holdingValve for a better explanation. 200 */ 201 template holdingLoop(Chain) if(hasValve!(Chain) && __traits(isSame, TemplateOf!(PropertyType!(Chain.init.valve)), HoldingValveInlet)) 202 { 203 struct AutoValve 204 { 205 Chain outlet; 206 207 // needed for implementValve 208 private ref inlet() { return outlet.valve; } 209 210 auto window() 211 { 212 return inlet.window; 213 } 214 215 void release(size_t elements) 216 { 217 inlet.release(elements); 218 } 219 220 size_t extend(size_t elements) 221 { 222 // release any outstanding data that is on the outlet, this allows 223 // the source buffer to reuse the data. 224 flush(); 225 return inlet.extend(elements); 226 } 227 228 mixin implementValve!(inlet); 229 230 // flush the data waiting in the outlet 231 size_t flush() 232 { 233 outlet.extend(0); 234 auto result = outlet.window.length; 235 outlet.release(result); 236 return result; 237 } 238 } 239 240 auto holdingLoop(Chain chain) 241 { 242 return AutoValve(chain); 243 } 244 } 245 246 unittest 247 { 248 char[] destBuf; 249 destBuf.reserve(100); 250 251 252 struct writer(Chain) 253 { 254 Chain upstream; 255 auto window() { return upstream.window; } 256 size_t extend(size_t elements) 257 { 258 auto newElems = upstream.extend(elements); 259 destBuf ~= upstream.window[$-newElems .. $]; 260 return newElems; 261 } 262 263 void release(size_t elements) { upstream.release(elements); } 264 265 mixin implementValve!(upstream); 266 } 267 268 auto makeWriter(Chain)(Chain c) 269 { 270 return writer!(Chain)(c); 271 } 272 273 274 char[] sourceBuf = new char[100]; 275 auto pipeline = sourceBuf.simpleValve.push!(c => makeWriter(c)); 276 277 void write(string s) 278 { 279 pipeline.window[0 .. s.length] = s; 280 pipeline.release(s.length); 281 } 282 283 assert(pipeline.window.length == 100); 284 assert(destBuf.length == 0); 285 // write some data 286 write("hello"); 287 assert(pipeline.window.length == 95); 288 assert(pipeline.valve.window.length == 100); 289 assert(destBuf.length == 0); 290 assert(pipeline.flush == 5); 291 assert(destBuf == "hello"); 292 assert(pipeline.valve.window.length == 95); 293 write(", world!"); 294 assert(pipeline.window.length == 87); 295 assert(pipeline.valve.window.length == 95); 296 assert(destBuf == "hello"); 297 assert(pipeline.extend(100) == 0); // cannot extend normal array automatically 298 assert(pipeline.valve.window.length == 87); 299 assert(destBuf == "hello, world!"); 300 301 } 302 303 template autoFlush(Chain) if (__traits(hasMember, Chain, "flush")) 304 { 305 struct AutoFlusher 306 { 307 Chain c; 308 alias c this; 309 static if(is(typeof((Chain c) @safe { c.flush(); }))) 310 ~this() @safe 311 { 312 c.flush(); 313 } 314 else 315 ~this() @system 316 { 317 c.flush(); 318 } 319 } 320 321 auto autoFlush(Chain c) 322 { 323 import iopipe.refc; 324 return RefCounted!AutoFlusher(c); 325 } 326 } 327 328 /** 329 * Convenience mechanism to wrap a specified output pipeline with a holding 330 * loop. It avoids having to explicitly specify the loop begin and end. 331 * 332 * Params: 333 * pipeline = a lambda template used to generate the pipeline that will 334 * be set up as a push chain. 335 * autoFlush = true (default) if you wish to auto-flush the push pipeline 336 * when all references to it are gone. This moves the whole chain into a 337 * RefCounted struct which automatically flushes any remaining data that 338 * hasn't been flushed. 339 * c = An ioPipe to be used as the source for the data being pushed. 340 * Returns: A wrapped chain that will push any data that is released as needed 341 * (i.e. as the buffer fills up). 342 * 343 * Note: If autoFlush is false, you will need to manually call flush on the 344 * pipeline after all processing is done. 345 */ 346 auto push(alias pipeline, bool autoFlush = true, Chain)(Chain c) if (isIopipe!(typeof(pipeline(c.holdingValve)))) 347 { 348 static if(autoFlush) 349 { 350 import std.typecons: refCounted; 351 return .autoFlush(pipeline(c.holdingValve).holdingLoop); 352 } 353 else 354 return pipeline(c.holdingValve).holdingLoop; 355 } 356 357 // TODO: need good example to show how to use this. 358 359 360 /** 361 * Go down the chain of valves until you find a valve of the given type. This 362 * is useful if you know there is a pipe you are looking for in the chain of valves. 363 * 364 * Params: 365 * T = type or template of valve you are looking for 366 * pipe = iopipe you are searching 367 * 368 * Returns: 369 * a valve of the specified type or template. If such a valve doesn't 370 * exist, a static error occurs. 371 */ 372 auto valveOf(T, Chain)(ref Chain pipe) if (isType!T && isIopipe!Chain && hasValve!Chain) 373 { 374 alias V = PropertyType!(pipe.valve); 375 static if(is(V == T)) 376 return pipe.valve; 377 else static if(is(typeof(.valveOf!T(pipe.valve)))) 378 return pipe.valve.valveOf!T; 379 else 380 static assert(0, "Pipe type " ~ Chain.stringof ~ " does not have valve of type " ~ T.stringof); 381 } 382 383 /// ditto 384 auto valveOf(alias X, Chain)(ref Chain pipe) if (!isType!X && isIopipe!Chain && hasValve!Chain) 385 { 386 alias V = PropertyType!(pipe.valve); 387 static if(__traits(isSame, TemplateOf!V, X)) 388 return pipe.valve; 389 else static if(is(typeof(pipe.valve.valveOf!X))) 390 return pipe.valve.valveOf!T; 391 else 392 static assert(0, "Pipe type " ~ Chain.stringof ~ " does not have valve based on template " ~ T.stringof); 393 } 394 395 @safe unittest 396 { 397 string basepipe = "hello world"; 398 auto p = basepipe.simpleValve.simpleValve; 399 assert(p.valveOf!string is basepipe); 400 alias T = typeof(p.valveOf!SimpleValve); 401 static assert(is(T == SimpleValve!string)); 402 }