1 /**
2     Ringbuffer
3 
4     Copyright: (c) Enalye 2017
5     License: Zlib
6     Authors: Enalye
7 */
8 
9 module atelier.core.ringbuffer;
10 
11 import core.sync.mutex;
12 import core.sync.semaphore;
13 
14 import atelier.core.singleton;
15 
16 /// Fixed size circular buffer. \
17 /// Size **must** be a power of 2.
18 /// Block when reading when empty or writing when full.
19 class RingBuffer(T, uint Size = 128u) : Singleton!(RingBuffer!(T, Size)) {
20     static assert(Size != 0 && (Size & (Size - 1)) == 0);
21     private {
22         const uint _bufferSize = Size;
23         Mutex _writerMutex, _readerMutex;
24         Semaphore _fullSemaphore, _emptySemaphore;
25         uint _posWriter, _posReader, _size;
26         T[_bufferSize] _buffer;
27     }
28 
29     @property {
30         /// Is the buffer empty ?
31         bool isEmpty() const {
32             return _size == 0u;
33         }
34         /// Is the buffer full ?
35         bool isFull() const {
36             return _size == _bufferSize;
37         }
38     }
39 
40     /// Ctor
41     this() {
42         _writerMutex = new Mutex;
43         _readerMutex = new Mutex;
44         _fullSemaphore = new Semaphore(0u);
45         _emptySemaphore = new Semaphore(Size);
46     }
47 
48     /// Append a new value.
49     void write(T value) {
50         synchronized (_writerMutex) {
51             _emptySemaphore.wait();
52             _buffer[_posWriter] = value;
53             _posWriter = (_posWriter + 1u) & (Size - 1);
54             _size++;
55             _fullSemaphore.notify();
56         }
57     }
58 
59     /// Extract a value.
60     T read() {
61         T value;
62         synchronized (_readerMutex) {
63             _fullSemaphore.wait();
64             value = _buffer[_posReader];
65             _posReader = (_posReader + 1u) & (Size - 1);
66             _size--;
67             _emptySemaphore.notify();
68         }
69         return value;
70     }
71 
72     /// Cleanup everything, don't use the buffer afterwards.
73     void close() {
74         foreach (i; 0 .. _bufferSize)
75             _emptySemaphore.notify();
76         foreach (i; 0 .. _bufferSize)
77             _fullSemaphore.notify();
78         _writerMutex.unlock();
79         _readerMutex.unlock();
80         _size = 0u;
81     }
82 
83     /// Empty the buffer.
84     void reset() {
85         synchronized (_writerMutex) {
86             synchronized (_readerMutex) {
87                 foreach (i; 0 .. _bufferSize)
88                     _emptySemaphore.notify();
89                 foreach (i; 0 .. _bufferSize) {
90                     _emptySemaphore.wait();
91                     _fullSemaphore.notify();
92                 }
93                 _size = 0u;
94                 _posReader = 0u;
95                 _posWriter = 0u;
96             }
97         }
98     }
99 }