1 /** 2 Buffer handling for iopipe. 3 4 Copyright: Copyright Steven Schveighoffer 2011-. 5 License: Boost License 1.0. (See accompanying file LICENSE_1_0.txt or copy 6 at http://www.boost.org/LICENSE_1_0.txt) 7 Authors: Steven Schveighoffer, Dmitry Olshansky 8 */ 9 module iopipe.buffer; 10 import std.experimental.allocator : IAllocator; 11 import std.experimental.allocator.common : platformAlignment; 12 13 14 /** 15 * GC allocator that creates blocks of non-pointer data (unscanned). This also 16 * does not support freeing data, relying on the GC to do so. 17 */ 18 struct GCNoPointerAllocator 19 { 20 @safe: 21 import std.experimental.allocator.gc_allocator : GCAllocator; 22 enum alignment = GCAllocator.alignment; 23 24 /// Allocate some data 25 static void[] allocate(size_t size) @trusted 26 { 27 import core.memory : GC; 28 auto p = GC.malloc(size, GC.BlkAttr.NO_SCAN); 29 return p ? (() @trusted => p[0 .. size])() : null; 30 } 31 32 /// Determine an appropriate size for allocation to hold the given size data 33 static size_t goodAllocSize(size_t size) 34 { 35 // mimic GCAllocator 36 return GCAllocator.instance.goodAllocSize(size); 37 } 38 39 /// Expand some data 40 static bool expand(ref void[] original, size_t size) 41 { 42 // mimic GCAllocator 43 return GCAllocator.instance.expand(original, size); 44 } 45 46 /// The shared instance 47 static shared GCNoPointerAllocator instance; 48 } 49 50 @safe unittest 51 { 52 import std.experimental.allocator.common: stateSize; 53 static assert(!stateSize!GCNoPointerAllocator); 54 import core.memory: GC; 55 auto arr = GCNoPointerAllocator.instance.allocate(100); 56 @trusted static noScan(void *ptr) 57 { 58 return (GC.getAttr(ptr) & GC.BlkAttr.NO_SCAN) != 0; 59 } 60 assert(noScan(&arr[0])); 61 62 // not much reason to test of it, as it's just a wrapper for GCAllocator. 63 } 64 65 /** 66 * Array based buffer manager. Uses custom allocator to get the data. Limits 67 * growth to doubling. 68 * 69 * Params: 70 * T = The type of the elements the buffer will use 71 * Allocator = The allocator to use for adding more elements 72 * floorSize = The size that can be freely allocated before growth is restricted to 2x. 73 * 74 * Based on concept by Dmitry Olshansky 75 */ 76 struct AllocatedBuffer(T, Allocator = GCNoPointerAllocator, size_t floorSize = 8192) 77 { 78 import std.experimental.allocator.common: stateSize; 79 import std.experimental.allocator: IAllocator, theAllocator; 80 81 /** 82 * Construct a buffer manager with a given allocator. 83 */ 84 static if (stateSize!Allocator) 85 { 86 private Allocator _allocator; 87 static if (is(Allocator == IAllocator)) 88 { 89 private @property Allocator allocator() 90 { 91 if (_allocator is null) _allocator = theAllocator; 92 return _allocator; 93 } 94 } 95 else 96 { 97 private alias allocator = _allocator; 98 } 99 this(Allocator alloc) { 100 _allocator = alloc; 101 } 102 } 103 else // no state size 104 { 105 private alias allocator = Allocator.instance; 106 } 107 108 /** 109 * Give bytes back to the buffer manager from the front of the buffer. 110 * These bytes can be removed in this operation or further operations and 111 * should no longer be used. 112 * 113 * Params: elements = number of elements to release. 114 */ 115 void releaseFront(size_t elements) 116 { 117 assert(released + elements <= valid); 118 released += elements; 119 } 120 121 /** 122 * Give bytes back to the buffer manager from the back of the buffer. 123 * These bytes can be removed in this operation or further operations and 124 * should no longer be used. 125 * 126 * Params: elements = number of elements to release. 127 */ 128 void releaseBack(size_t elements) 129 { 130 assert(released + elements <= valid); 131 valid -= elements; 132 } 133 134 /** 135 * The window of currently valid data 136 */ 137 T[] window() @trusted 138 { 139 return buffer.ptr[released .. valid]; 140 } 141 142 /** 143 * Returns: The number of unused elements that can be extended without 144 * needing to fetch more data from the allocator. 145 */ 146 size_t avail() 147 { 148 return buffer.length - (valid - released); 149 } 150 151 /** 152 * Returns: The total number of elements currently managed. 153 */ 154 size_t capacity() 155 { 156 return buffer.length; 157 } 158 159 /** 160 * Add more data to the window of currently valid data. To avoid expensive 161 * reallocation, use avail to tune this call. 162 * 163 * Params: request = The number of additional elements to add to the valid window. 164 * Returns: The number of elements that were actually added to the valid 165 * window. Note that this may be less than the request if more elements 166 * could not be attained from the allocator. 167 */ 168 size_t extend(size_t request) 169 { 170 import std.algorithm.mutation : copy; 171 import std.algorithm.comparison : max, min; 172 import std.traits : hasMember; 173 174 // check to see if we can "move" the data for free. 175 auto validElems = valid - released; 176 if(validElems == 0) 177 valid = released = 0; 178 179 if(buffer.length - valid >= request) 180 { 181 // buffer has enough free space to accomodate. 182 valid += request; 183 return request; 184 } 185 186 if(buffer.length - validElems >= request) 187 { 188 // buffer has enough space if we move the data to the front. 189 copy(buffer[released .. valid], buffer[0 .. validElems]); 190 released = 0; 191 valid = validElems + request; 192 return request; 193 } 194 195 // otherwise, we must allocate/extend a new buffer 196 // limit growth to 2x. 197 immutable maxBufSize = max(buffer.length * 2, INITIAL_LENGTH); 198 static if(hasMember!(Allocator, "expand")) 199 { 200 // try expanding, no further copying required 201 if(buffer.ptr) 202 { 203 void[] buftmp = buffer; 204 auto reqSize = min(maxBufSize - buffer.length, request - (buffer.length - valid)); 205 if(allocator.expand(buftmp, reqSize * T.sizeof)) 206 { 207 auto newElems = buffer.length - valid + reqSize; 208 valid += newElems; 209 buffer = (()@trusted => cast(T[])buftmp)(); 210 return newElems; 211 } 212 } 213 } 214 215 // copy and allocate a new buffer 216 auto oldLen = buffer.length; 217 // grow by at least 1.4, but not more than maxBufSize 218 request = min(request, maxBufSize - validElems); 219 auto newLen = max(validElems + request, oldLen * 14 / 10, INITIAL_LENGTH); 220 static if(hasMember!(Allocator, "goodAllocSize")) 221 newLen = allocator.goodAllocSize(newLen * T.sizeof) / T.sizeof; 222 223 static if(hasMember!(Allocator, "reallocate")) 224 { 225 if(released == 0 && validElems > 0) 226 { 227 // try using allocator's reallocate member 228 void[] buf = buffer; 229 if(allocator.reallocate(buf, newLen * T.sizeof)) 230 { 231 buffer = (()@trusted => cast(T[])buf)(); 232 valid += request; 233 return request; 234 } 235 } 236 } 237 auto newbuf = (()@trusted => cast(T[])allocator.allocate(newLen * T.sizeof))(); 238 if(!newbuf.ptr) 239 return 0; 240 if (validElems > 0) { 241 copy(buffer[released .. valid], newbuf[0 .. validElems]); 242 } 243 valid = validElems + request; 244 released = 0; 245 246 // TODO: should we do this? using a GC allocator this is unsafe. 247 static if(hasMember!(Allocator, "deallocate")) 248 allocator.deallocate(buffer); 249 buffer = newbuf; 250 251 return request; 252 } 253 private: 254 enum size_t INITIAL_LENGTH = (128 < floorSize ? 128 : floorSize); 255 T[] buffer; 256 size_t valid; 257 size_t released; 258 } 259 260 @safe unittest 261 { 262 static struct OOMAllocator 263 { 264 void[] remaining; 265 enum alignment = 1; 266 void[] allocate(size_t bytes) 267 { 268 if(remaining.length >= bytes) 269 { 270 scope(exit) remaining = remaining[bytes .. $]; 271 return remaining[0 .. bytes]; 272 } 273 return null; 274 } 275 } 276 277 auto arr = new void[128 + 200]; 278 auto buf = AllocatedBuffer!(ubyte, OOMAllocator)(OOMAllocator(arr)); 279 assert(buf.extend(100) == 100); 280 assert(buf.avail == 28); 281 assert(buf.capacity == 128); 282 assert(buf.window.ptr == arr.ptr); 283 284 buf.releaseFront(50); 285 assert(buf.avail == 78); 286 assert(buf.capacity == 128); 287 assert(&buf.window[0] == &arr[50]); 288 289 assert(buf.extend(50) == 50); 290 assert(buf.capacity == 128); 291 assert(&buf.window[0] == &arr[0]); 292 293 assert(buf.extend(500) == 0); 294 assert(buf.capacity == 128); 295 assert(&buf.window[0] == &arr[0]); 296 297 assert(buf.extend(100) == 100); 298 assert(&buf.window[0] == &arr[128]); 299 assert(buf.avail == 0); 300 assert(buf.capacity == 200); 301 } 302 303 // The type allocated MUST be a power of 2 304 import std.math : isPowerOf2; 305 306 /** 307 * A RingBuffer uses the underlying memory management system to avoid any 308 * copying of data (unless expanding). 309 * 310 * It works by using the OS's mechanisms that map memory (mmap or VirtualAlloc) 311 * to map the same region to 2 consecutive addresses. This allows one to use a 312 * buffer simply as an array, even when the data wraps around the end of the 313 * buffer. 314 * 315 * Like AllocatedBuffer, the growth is limited to doubling, but this has an 316 * extra restriction that the buffer must be a multiple of the page size. Note 317 * that this does NOT add any memory to the GC, so do not store GC pointers in 318 * this buffer. 319 * 320 * Unlike AllocatedBuffer, this buffer is NOT copyable, so it must be 321 * refcounted if you are to pass it around. See rbufd which does this 322 * automatically for you. The reason for this is that it must unmap the memory 323 * on destruction. 324 * 325 * Note that this buffer is not @safe, since it is possible on reallocation to 326 * have dangling pointers (if anything keeps a reference to the original 327 * memory). 328 * 329 * Params: 330 * T = The type of the elements the buffer will use. Must be sized as a power of 2. 331 * floorSize = The size that can be freely allocated before growth is 332 * restricted to 2x. Note that the OS imposes a floor size of one page in 333 * addition to this. 334 */ 335 struct RingBuffer(T, size_t floorSize = 8192) if (isPowerOf2(T.sizeof)) 336 { 337 @disable this(this); // we can't copy RingBuffer because otherwise it will deallocate the memory 338 /** 339 * Give bytes back to the buffer from the front of the buffer. 340 * These bytes can be removed in this operation or further operations and 341 * should no longer be used. 342 * 343 * Params: elements = number of elements to release. 344 */ 345 void releaseFront(size_t elements) 346 { 347 assert(released + elements <= valid); 348 released += elements; 349 auto half = buffer.length / 2; 350 if(released >= half) 351 { 352 released -= half; 353 valid -= half; 354 } 355 } 356 357 /** 358 * Give bytes back to the buffer from the back of the buffer. 359 * These bytes can be removed in this operation or further operations and 360 * should no longer be used. 361 * 362 * Params: elements = number of elements to release. 363 */ 364 void releaseBack(size_t elements) 365 { 366 assert(released + elements <= valid); 367 valid -= elements; 368 } 369 370 /** 371 * The window of currently valid data. 372 */ 373 T[] window() @system 374 { 375 assert(released <= buffer.length && valid <= buffer.length); 376 return buffer.ptr[released .. valid]; 377 } 378 379 /** 380 * Returns: The number of unused elements that can be extended without 381 * needing to reallocate the buffer. 382 */ 383 size_t avail() 384 { 385 return buffer.length / 2 - (valid - released); 386 } 387 388 /** 389 * Returns: The total number of elements currently managed. 390 */ 391 size_t capacity() 392 { 393 return buffer.length / 2; 394 } 395 396 /** 397 * Add more data to the window of currently valid data. To avoid expensive 398 * reallocation, use avail to tune this call. 399 * 400 * Params: request = The number of additional elements to add to the valid window. 401 * Returns: The number of elements that were actually added to the valid 402 * window. Note that this may be less than the request if more elements 403 * could not be attained from the OS. 404 */ 405 size_t extend(size_t request) @system 406 { 407 import std.algorithm.mutation : copy; 408 import std.algorithm.comparison : max, min; 409 import core.sys.posix.unistd; 410 version (Posix) import core.sys.posix.sys.mman; 411 version (FreeBSD) import core.sys.freebsd.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON; 412 version (NetBSD) import core.sys.netbsd.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON; 413 version (linux) import core.sys.linux.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON; 414 version (OSX) import core.sys.darwin.sys.mman : MAP_FIXED, MAP_SHARED, MAP_ANON; 415 import core.sys.posix.fcntl; 416 417 418 // check to see if we can "move" the data for free. 419 auto validElems = valid - released; 420 if(validElems == 0) 421 valid = released = 0; 422 423 // we should never have to move data 424 immutable cap = buffer.length / 2; 425 assert(valid + cap - released <= buffer.length); 426 if(cap - validElems >= request) 427 { 428 // buffer has enough free space to accomodate. 429 valid += request; 430 return request; 431 } 432 433 // otherwise, we must allocate/extend a new buffer 434 // limit growth to 2x. 435 immutable maxBufSize = max(cap * 2, floorSize); 436 437 // copy and allocate a new buffer 438 auto oldLen = buffer.length; 439 // grow by at least 1.4, but not more than maxBufSize 440 request = min(request, maxBufSize - validElems); 441 auto fullSize = max(validElems + request, oldLen * 14 / 10, floorSize) * T.sizeof; 442 // round up to PAGESIZE 443 fullSize = (fullSize + PAGESIZE - 1) / PAGESIZE * PAGESIZE; 444 445 // mmap space to reserve the address space. We won't actually wire this 446 // to any memory until we open the shared memory and map it. 447 auto addr = mmap(null, fullSize * 2, PROT_NONE, MAP_SHARED | MAP_ANON, -1, 0); 448 if(addr == MAP_FAILED) 449 return 0; 450 451 // attempt to make a name that won't conflict with other processes. 452 // This is really sucky, but is required on posix systems, even though 453 // we aren't really sharing memory. 454 enum basename = "/iopipe_map_"; 455 char[basename.length + 8 + 1] shm_name = void; 456 shm_name[0 .. basename.length] = basename; 457 shm_name[basename.length .. $-1] = 'A'; 458 // get the process id 459 import std.process: thisProcessID; 460 uint pid = thisProcessID(); 461 auto idx = basename.length; 462 while(pid) 463 { 464 shm_name[idx++] = cast(char)('A' + (pid & 0x0f)); 465 pid >>= 4; 466 } 467 shm_name[$-1] = 0; 468 469 import std.conv: octal; 470 import std.exception; 471 int shfd = -1; 472 idx = 0; 473 while(shfd < 0) 474 { 475 // try 4 times to make this happen, if it doesn't, give up and 476 // return 0. This helps solve any possible race conditions with 477 // other threads. It's not perfect, but it should work reasonably 478 // well. 479 if(idx++ > 4) 480 { 481 munmap(addr, fullSize * 2); 482 return 0; 483 } 484 shfd = shm_open(shm_name.ptr, O_RDWR | O_CREAT | O_EXCL, octal!"600"); 485 // immediately remove the name link, we don't really want to share anything here. 486 shm_unlink(shm_name.ptr); 487 } 488 489 // after this function, we don't need the file descriptor. 490 scope(exit) close(shfd); 491 492 // create enough memory to hold the entire buffer. 493 if(ftruncate(shfd, fullSize) < 0) 494 { 495 munmap(addr, fullSize * 2); 496 return 0; 497 } 498 499 // map the shared memory into the reserved space twice, each half sees 500 // the same memory. 501 if(mmap(addr, fullSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shfd, 0) == MAP_FAILED) 502 { 503 munmap(addr, fullSize * 2); 504 return 0; 505 } 506 if(mmap(addr + fullSize, fullSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shfd, 0) == MAP_FAILED) 507 { 508 munmap(addr, fullSize * 2); 509 return 0; 510 } 511 auto newbuf = cast(T[])(addr[0 .. fullSize * 2]); 512 if (validElems > 0) { 513 copy(buffer[released .. valid], newbuf[0 .. validElems]); 514 } 515 valid = validElems + request; 516 assert(valid <= newbuf.length / 2); 517 released = 0; 518 // UNSAFE -- only use this in system code 519 if(buffer.length) 520 munmap(buffer.ptr, buffer.length * T.sizeof); // unmap the original memory 521 buffer = newbuf; 522 523 return request; 524 } 525 526 ~this() @system 527 { 528 if(buffer.ptr) 529 { 530 version (Posix) 531 { 532 import core.sys.posix.sys.mman; 533 munmap(buffer.ptr, buffer.length * T.sizeof); 534 } 535 } 536 } 537 538 private: 539 // Note: the buffer is 2 mmaps to the same memory page. 540 T[] buffer; 541 // We will only ever use 1/2 of the buffer at the most, 542 size_t valid; 543 size_t released; 544 } 545 546 @system unittest 547 { 548 RingBuffer!(ubyte, 8192) buf; 549 assert(buf.extend(4096) == 4096); 550 assert(buf.avail == 8192 - 4096); 551 assert(buf.capacity == 8192); 552 buf.window[0] = 0; 553 assert(buf.buffer.length == 8192 * 2); 554 555 assert(buf.extend(4096) == 4096); 556 assert(buf.avail == 0); 557 assert(buf.capacity == 8192); 558 559 buf.releaseFront(4096); 560 assert(buf.avail == 4096); 561 assert(buf.capacity == 8192); 562 assert(buf.extend(4096) == 4096); 563 assert(buf.avail == 0); 564 assert(buf.capacity == 8192); 565 import std.algorithm : copy, map, equal; 566 import std.range : iota; 567 iota(8192).map!(a => cast(ubyte)a).copy(buf.window); 568 assert(equal(iota(8192).map!(a => cast(ubyte)a), buf.window)); 569 buf.releaseFront(4096); 570 assert(equal(iota(4096, 8192).map!(a => cast(ubyte)a), buf.window)); 571 assert(buf.released == 0); // assure we wrap around 572 assert(buf.extend(8192) == 8192); 573 assert(equal(iota(4096, 8192).map!(a => cast(ubyte)a), buf.window[0 .. 4096])); 574 } 575 576 package static immutable size_t PAGESIZE; 577 578 // unfortunately, this is the only way to do it for now. Copied from 579 // core.thread 580 shared static this() 581 { 582 version (Windows) 583 { 584 import core.sys.windows.windows; 585 SYSTEM_INFO info; 586 GetSystemInfo(&info); 587 588 PAGESIZE = info.dwPageSize; 589 assert(PAGESIZE < int.max); 590 } 591 else version (Posix) 592 { 593 import core.sys.posix.unistd; 594 PAGESIZE = cast(size_t)sysconf(_SC_PAGESIZE); 595 } 596 else 597 { 598 static assert(0, "unimplemented"); 599 } 600 }