1 /**
2  Buffer handling for iopipe.
3 
4 Copyright: Copyright Steven Schveighoffer 2011-.
5 License:   Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy
6            at http://www.boost.org/LICENSE_1_0.txt)
7 Authors:   Steven Schveighoffer, Dmitry Olshansky
8  */
9 module iopipe.buffer;
10 import std.experimental.allocator : IAllocator;
11 import std.experimental.allocator.common : platformAlignment;
12 
13 
14 /**
15  * GC allocator that creates blocks of non-pointer data (unscanned). This also
16  * does not support freeing data, relying on the GC to do so.
17  */
18 struct GCNoPointerAllocator
19 {
20     @safe:
21     import std.experimental.allocator.gc_allocator : GCAllocator;
22     enum alignment = GCAllocator.alignment;
23 
24     /// Allocate some data
25     static void[] allocate(size_t size) @trusted
26     {
27         import core.memory : GC;
28         auto p = GC.malloc(size, GC.BlkAttr.NO_SCAN);
29         return p ? (() @trusted => p[0 .. size])() : null;
30     }
31 
32     /// Determine an appropriate size for allocation to hold the given size data
33     static size_t goodAllocSize(size_t size)
34     {
35         // mimic GCAllocator
36         return GCAllocator.instance.goodAllocSize(size);
37     }
38 
39     /// Expand some data
40     static bool expand(ref void[] original, size_t size)
41     {
42         // mimic GCAllocator
43         return GCAllocator.instance.expand(original, size);
44     }
45 
46     /// The shared instance
47     static shared GCNoPointerAllocator instance;
48 }
49 
50 @safe unittest
51 {
52     import std.experimental.allocator.common: stateSize;
53     static assert(!stateSize!GCNoPointerAllocator);
54     import core.memory: GC;
55     auto arr = GCNoPointerAllocator.instance.allocate(100);
56     @trusted static noScan(void *ptr)
57     {
58         return (GC.getAttr(ptr) & GC.BlkAttr.NO_SCAN) != 0;
59     }
60     assert(noScan(&arr[0]));
61 
62     // not much reason to test of it, as it's just a wrapper for GCAllocator.
63 }
64 
65 /**
66  * Array based buffer manager. Uses custom allocator to get the data. Limits
67  * growth to doubling.
68  *
69  * Params:
70  *    T = The type of the elements the buffer will use
71  *    Allocator = The allocator to use for adding more elements
72  *    floorSize = The size that can be freely allocated before growth is restricted to 2x.
73  *
74  * Based on concept by Dmitry Olshansky
75  */
76 struct AllocatedBuffer(T, Allocator = GCNoPointerAllocator, size_t floorSize = 8192)
77 {
78     import std.experimental.allocator.common: stateSize;
79     import std.experimental.allocator: IAllocator, theAllocator;
80 
81     /**
82      * Construct a buffer manager with a given allocator.
83      */
84     static if (stateSize!Allocator)
85     {
86         private Allocator _allocator;
87         static if (is(Allocator == IAllocator))
88         {
89             private @property Allocator allocator()
90             {
91                 if (_allocator is null) _allocator = theAllocator;
92                 return _allocator;
93             }
94         }
95         else
96         {
97             private alias allocator = _allocator;
98         }
99         this(Allocator alloc) {
100             _allocator = alloc;
101         }
102     }
103     else // no state size
104     {
105         private alias allocator = Allocator.instance;
106     }
107 
108     /**
109      * Give bytes back to the buffer manager from the front of the buffer.
110      * These bytes can be removed in this operation or further operations and
111      * should no longer be used.
112      *
113      * Params: elements = number of elements to release.
114      */
115     void releaseFront(size_t elements)
116     {
117         assert(released + elements <= valid);
118         released += elements;
119     }
120 
121     /**
122      * Give bytes back to the buffer manager from the back of the buffer.
123      * These bytes can be removed in this operation or further operations and
124      * should no longer be used.
125      *
126      * Params: elements = number of elements to release.
127      */
128     void releaseBack(size_t elements)
129     {
130         assert(released + elements <= valid);
131         valid -= elements;
132     }
133 
134     /**
135      * The window of currently valid data
136      */
137     T[] window() @trusted
138     {
139         return buffer.ptr[released .. valid];
140     }
141 
142     /**
143      * Returns: The number of unused elements that can be extended without
144      * needing to fetch more data from the allocator.
145      */
146     size_t avail()
147     {
148         return buffer.length - (valid - released);
149     }
150 
151     /**
152      * Returns: The total number of elements currently managed.
153      */
154     size_t capacity()
155     {
156         return buffer.length;
157     }
158 
159     /**
160      * Add more data to the window of currently valid data. To avoid expensive
161      * reallocation, use avail to tune this call.
162      *
163      * Params: request = The number of additional elements to add to the valid window.
164      * Returns: The number of elements that were actually added to the valid
165      * window. Note that this may be less than the request if more elements
166      * could not be attained from the allocator.
167      */
168     size_t extend(size_t request)
169     {
170         import std.algorithm.mutation : copy;
171         import std.algorithm.comparison : max, min;
172         import std.traits : hasMember;
173 
174         // check to see if we can "move" the data for free.
175         auto validElems = valid - released;
176         if(validElems == 0)
177             valid = released = 0;
178 
179         if(buffer.length - valid >= request)
180         {
181             // buffer has enough free space to accomodate.
182             valid += request;
183             return request;
184         }
185 
186         if(buffer.length - validElems >= request)
187         {
188             // buffer has enough space if we move the data to the front.
189             copy(buffer[released .. valid], buffer[0 .. validElems]);
190             released = 0;
191             valid = validElems + request;
192             return request;
193         }
194 
195         // otherwise, we must allocate/extend a new buffer
196         // limit growth to 2x.
197         immutable maxBufSize = max(buffer.length * 2, INITIAL_LENGTH);
198         static if(hasMember!(Allocator, "expand"))
199         {
200             // try expanding, no further copying required
201             if(buffer.ptr)
202             {
203                 void[] buftmp = buffer;
204                 auto reqSize = min(maxBufSize - buffer.length,  request - (buffer.length - valid));
205                 if(allocator.expand(buftmp, reqSize * T.sizeof))
206                 {
207                     auto newElems = buffer.length - valid + reqSize;
208                     valid += newElems;
209                     buffer = (()@trusted => cast(T[])buftmp)();
210                     return newElems;
211                 }
212             }
213         }
214 
215         // copy and allocate a new buffer
216         auto oldLen = buffer.length;
217         // grow by at least 1.4, but not more than maxBufSize
218         request = min(request, maxBufSize - validElems);
219         auto newLen = max(validElems + request, oldLen * 14 / 10, INITIAL_LENGTH);
220         static if(hasMember!(Allocator, "goodAllocSize"))
221             newLen = allocator.goodAllocSize(newLen * T.sizeof) / T.sizeof;
222 
223         static if(hasMember!(Allocator, "reallocate"))
224         {
225             if(released == 0 && validElems > 0)
226             {
227                 // try using allocator's reallocate member
228                 void[] buf = buffer;
229                 if(allocator.reallocate(buf, newLen * T.sizeof))
230                 {
231                     buffer = (()@trusted => cast(T[])buf)();
232                     valid += request;
233                     return request;
234                 }
235             }
236         }
237         auto newbuf = (()@trusted => cast(T[])allocator.allocate(newLen * T.sizeof))();
238         if(!newbuf.ptr)
239             return 0;
240         if (validElems > 0) {
241             copy(buffer[released .. valid], newbuf[0 .. validElems]);
242         }
243         valid = validElems + request;
244         released = 0;
245 
246         // TODO: should we do this? using a GC allocator this is unsafe.
247         static if(hasMember!(Allocator, "deallocate"))
248             allocator.deallocate(buffer);
249         buffer = newbuf;
250 
251         return request;
252     }
253 private:
254     enum size_t INITIAL_LENGTH = (128 < floorSize ? 128 : floorSize);
255     T[] buffer;
256     size_t valid;
257     size_t released;
258 }
259 
260 @safe unittest
261 {
262     static struct OOMAllocator
263     {
264         void[] remaining;
265         enum alignment = 1;
266         void[] allocate(size_t bytes)
267         {
268             if(remaining.length >= bytes)
269             {
270                 scope(exit) remaining = remaining[bytes .. $];
271                 return remaining[0 .. bytes];
272             }
273             return null;
274         }
275     }
276 
277     auto arr = new void[128 + 200];
278     auto buf = AllocatedBuffer!(ubyte, OOMAllocator)(OOMAllocator(arr));
279     assert(buf.extend(100) == 100);
280     assert(buf.avail == 28);
281     assert(buf.capacity == 128);
282     assert(buf.window.ptr == arr.ptr);
283 
284     buf.releaseFront(50);
285     assert(buf.avail == 78);
286     assert(buf.capacity == 128);
287     assert(&buf.window[0] == &arr[50]);
288 
289     assert(buf.extend(50) == 50);
290     assert(buf.capacity == 128);
291     assert(&buf.window[0] == &arr[0]);
292 
293     assert(buf.extend(500) == 0);
294     assert(buf.capacity == 128);
295     assert(&buf.window[0] == &arr[0]);
296 
297     assert(buf.extend(100) == 100);
298     assert(&buf.window[0] == &arr[128]);
299     assert(buf.avail == 0);
300     assert(buf.capacity == 200);
301 }
302 
303 // The type allocated MUST be a power of 2
304 import std.math : isPowerOf2;
305 
306 /**
307  * A RingBuffer uses the underlying memory management system to avoid any
308  * copying of data (unless expanding).
309  *
310  * It works by using the OS's mechanisms that map memory (mmap or VirtualAlloc)
311  * to map the same region to 2 consecutive addresses. This allows one to use a
312  * buffer simply as an array, even when the data wraps around the end of the
313  * buffer.
314  *
315  * Like AllocatedBuffer, the growth is limited to doubling, but this has an
316  * extra restriction that the buffer must be a multiple of the page size. Note
317  * that this does NOT add any memory to the GC, so do not store GC pointers in
318  * this buffer.
319  *
320  * Unlike AllocatedBuffer, this buffer is NOT copyable, so it must be
321  * refcounted if you are to pass it around. See rbufd which does this
322  * automatically for you. The reason for this is that it must unmap the memory
323  * on destruction.
324  *
325  * Note that this buffer is not @safe, since it is possible on reallocation to
326  * have dangling pointers (if anything keeps a reference to the original
327  * memory).
328  *
329  * Params:
330  *    T = The type of the elements the buffer will use. Must be sized as a power of 2.
331  *    floorSize = The size that can be freely allocated before growth is
332  *      restricted to 2x. Note that the OS imposes a floor size of one page in
333  *      addition to this.
334  */
335 struct RingBuffer(T, size_t floorSize = 8192) if (isPowerOf2(T.sizeof))
336 {
337     @disable this(this); // we can't copy RingBuffer because otherwise it will deallocate the memory
338     /**
339      * Give bytes back to the buffer from the front of the buffer.
340      * These bytes can be removed in this operation or further operations and
341      * should no longer be used.
342      *
343      * Params: elements = number of elements to release.
344      */
345     void releaseFront(size_t elements)
346     {
347         assert(released + elements <= valid);
348         released += elements;
349         auto half = buffer.length / 2;
350         if(released >= half)
351         {
352             released -= half;
353             valid -= half;
354         }
355     }
356 
357     /**
358      * Give bytes back to the buffer from the back of the buffer.
359      * These bytes can be removed in this operation or further operations and
360      * should no longer be used.
361      *
362      * Params: elements = number of elements to release.
363      */
364     void releaseBack(size_t elements)
365     {
366         assert(released + elements <= valid);
367         valid -= elements;
368     }
369 
370     /**
371      * The window of currently valid data.
372      */
373     T[] window() @system
374     {
375         assert(released <= buffer.length && valid <= buffer.length);
376         return buffer.ptr[released .. valid];
377     }
378 
379     /**
380      * Returns: The number of unused elements that can be extended without
381      * needing to reallocate the buffer.
382      */
383     size_t avail()
384     {
385         return buffer.length / 2 - (valid - released);
386     }
387 
388     /**
389      * Returns: The total number of elements currently managed.
390      */
391     size_t capacity()
392     {
393         return buffer.length / 2;
394     }
395 
396     /**
397      * Add more data to the window of currently valid data. To avoid expensive
398      * reallocation, use avail to tune this call.
399      *
400      * Params: request = The number of additional elements to add to the valid window.
401      * Returns: The number of elements that were actually added to the valid
402      * window. Note that this may be less than the request if more elements
403      * could not be attained from the OS.
404      */
405     size_t extend(size_t request) @system
406     {
407         import std.algorithm.mutation : copy;
408         import std.algorithm.comparison : max, min;
409         import core.sys.posix.unistd;
410         version (Posix) import core.sys.posix.sys.mman;
411         version (FreeBSD) import core.sys.freebsd.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON;
412         version (NetBSD) import core.sys.netbsd.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON;
413         version (linux) import core.sys.linux.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON;
414         version (OSX) import core.sys.darwin.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON;
415         import core.sys.posix.fcntl;
416 
417 
418         // check to see if we can "move" the data for free.
419         auto validElems = valid - released;
420         if(validElems == 0)
421             valid = released = 0;
422 
423         // we should never have to move data
424         immutable cap = buffer.length / 2;
425         assert(valid + cap - released <= buffer.length);
426         if(cap - validElems >= request)
427         {
428             // buffer has enough free space to accomodate.
429             valid += request;
430             return request;
431         }
432 
433         // otherwise, we must allocate/extend a new buffer
434         // limit growth to 2x.
435         immutable maxBufSize = max(cap * 2, floorSize);
436 
437         // copy and allocate a new buffer
438         auto oldLen = buffer.length;
439         // grow by at least 1.4, but not more than maxBufSize
440         request = min(request, maxBufSize - validElems);
441         auto fullSize = max(validElems + request, oldLen * 14 / 10, floorSize) * T.sizeof;
442         // round up to PAGESIZE
443         fullSize = (fullSize + PAGESIZE - 1) / PAGESIZE * PAGESIZE;
444 
445         // mmap space to reserve the address space. We won't actually wire this
446         // to any memory until we open the shared memory and map it.
447         auto addr = mmap(null, fullSize * 2, PROT_NONE, MAP_SHARED | MAP_ANON, -1, 0);
448         if(addr == MAP_FAILED)
449             return 0;
450 
451         // attempt to make a name that won't conflict with other processes.
452         // This is really sucky, but is required on posix systems, even though
453         // we aren't really sharing memory.
454         enum basename = "/iopipe_map_";
455         char[basename.length + 8 + 1] shm_name = void;
456         shm_name[0 .. basename.length] = basename;
457         shm_name[basename.length .. $-1] = 'A';
458         // get the process id
459         import std.process: thisProcessID;
460         uint pid = thisProcessID();
461         auto idx = basename.length;
462         while(pid)
463         {
464             shm_name[idx++] = cast(char)('A' + (pid & 0x0f));
465             pid >>= 4;
466         }
467         shm_name[$-1] = 0;
468 
469         import std.conv: octal;
470         import std.exception;
471         int shfd = -1;
472         idx = 0;
473         while(shfd < 0)
474         {
475             // try 4 times to make this happen, if it doesn't, give up and
476             // return 0. This helps solve any possible race conditions with
477             // other threads. It's not perfect, but it should work reasonably
478             // well.
479             if(idx++ > 4)
480             {
481                 munmap(addr, fullSize * 2);
482                 return 0;
483             }
484             shfd = shm_open(shm_name.ptr, O_RDWR | O_CREAT | O_EXCL, octal!"600");
485             // immediately remove the name link, we don't really want to share anything here.
486             shm_unlink(shm_name.ptr);
487         }
488 
489         // after this function, we don't need the file descriptor.
490         scope(exit) close(shfd);
491 
492         // create enough memory to hold the entire buffer.
493         if(ftruncate(shfd, fullSize) < 0)
494         {
495             munmap(addr, fullSize * 2);
496             return 0;
497         }
498 
499         // map the shared memory into the reserved space twice, each half sees
500         // the same memory.
501         if(mmap(addr, fullSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shfd, 0) == MAP_FAILED)
502         {
503             munmap(addr, fullSize * 2);
504             return 0;
505         }
506         if(mmap(addr + fullSize, fullSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shfd, 0) == MAP_FAILED)
507         {
508             munmap(addr, fullSize * 2);
509             return 0;
510         }
511         auto newbuf = cast(T[])(addr[0 .. fullSize * 2]);
512         if (validElems > 0) {
513             copy(buffer[released .. valid], newbuf[0 .. validElems]);
514         }
515         valid = validElems + request;
516         assert(valid <= newbuf.length / 2);
517         released = 0;
518         // UNSAFE -- only use this in system code
519         if(buffer.length)
520             munmap(buffer.ptr, buffer.length * T.sizeof); // unmap the original memory
521         buffer = newbuf;
522 
523         return request;
524     }
525 
526     ~this() @system
527     {
528         if(buffer.ptr)
529         {
530             version (Posix)
531             {
532                 import core.sys.posix.sys.mman;
533                 munmap(buffer.ptr, buffer.length * T.sizeof);
534             }
535         }
536     }
537 
538 private:
539     // Note: the buffer is 2 mmaps to the same memory page.
540     T[] buffer;
541     // We will only ever use 1/2 of the buffer at the most, 
542     size_t valid;
543     size_t released;
544 }
545 
546 @system unittest
547 {
548     RingBuffer!(ubyte, 8192) buf;
549     assert(buf.extend(4096) == 4096);
550     assert(buf.avail == 8192 - 4096);
551     assert(buf.capacity == 8192);
552     buf.window[0] = 0;
553     assert(buf.buffer.length == 8192 * 2);
554 
555     assert(buf.extend(4096) == 4096);
556     assert(buf.avail == 0);
557     assert(buf.capacity == 8192);
558 
559     buf.releaseFront(4096);
560     assert(buf.avail == 4096);
561     assert(buf.capacity == 8192);
562     assert(buf.extend(4096) == 4096);
563     assert(buf.avail == 0);
564     assert(buf.capacity == 8192);
565     import std.algorithm : copy, map, equal;
566     import std.range : iota;
567     iota(8192).map!(a => cast(ubyte)a).copy(buf.window);
568     assert(equal(iota(8192).map!(a => cast(ubyte)a), buf.window));
569     buf.releaseFront(4096);
570     assert(equal(iota(4096, 8192).map!(a => cast(ubyte)a), buf.window));
571     assert(buf.released == 0); // assure we wrap around
572     assert(buf.extend(8192) == 8192);
573     assert(equal(iota(4096, 8192).map!(a => cast(ubyte)a), buf.window[0 .. 4096]));
574 }
575 
576 package static immutable size_t PAGESIZE;
577 
578 // unfortunately, this is the only way to do it for now. Copied from
579 // core.thread
580 shared static this()
581 {
582     version (Windows)
583     {
584         import core.sys.windows.windows;
585         SYSTEM_INFO info;
586         GetSystemInfo(&info);
587 
588         PAGESIZE = info.dwPageSize;
589         assert(PAGESIZE < int.max);
590     }
591     else version (Posix)
592     {
593         import core.sys.posix.unistd;
594         PAGESIZE = cast(size_t)sysconf(_SC_PAGESIZE);
595     }
596     else
597     {
598         static assert(0, "unimplemented");
599     }
600 }