diff --git a/src/RingBuffer.cpp b/src/RingBuffer.cpp index de3eb08a5..a3e0983d4 100644 --- a/src/RingBuffer.cpp +++ b/src/RingBuffer.cpp @@ -5,16 +5,19 @@ RingBuffer.cpp Dominic Mazzoni + Paul Licameli *******************************************************************//*! \class RingBuffer \brief Holds streamed audio samples. - This class is thread-safe, assuming that there is only one - thread writing, and one thread reading. If two threads both - need to read, or both need to write, they need to lock this - class from outside using their own mutex. + Assuming that there is only one thread writing, and one thread reading, + this class implements a lock-free thread-safe bounded queue of samples + with atomic variables that contain the first filled and free positions. + + If two threads both need to read, or both need to write, they need to lock + this class from outside using their own mutex. AvailForPut and AvailForGet may underestimate but will never overestimate. @@ -35,27 +38,46 @@ RingBuffer::~RingBuffer() { } -size_t RingBuffer::Len() +// Calculations of free and filled space, given snapshots taken of the start +// and end values + +size_t RingBuffer::Filled( size_t start, size_t end ) { - return (mEnd + mBufferSize - mStart) % mBufferSize; + return (end + mBufferSize - start) % mBufferSize; +} + +size_t RingBuffer::Free( size_t start, size_t end ) +{ + return std::max(mBufferSize - Filled( start, end ), 4) - 4; } // // For the writer only: +// Only writer writes the end, so it can read it again relaxed +// And it reads the start written by reader, but reader sends no other +// information needing to synchronize with the start, so relaxed memory order +// is good there too // size_t RingBuffer::AvailForPut() { - return std::max(mBufferSize - Len(), 4) - 4; + auto start = mStart.load( std::memory_order_relaxed ); + auto end = mEnd.load( std::memory_order_relaxed ); + return Free( start, end ); + + // Reader might increase the available free space after return, but will + // never decrease it, so writer can safely assume this much at least } size_t RingBuffer::Put(samplePtr buffer, sampleFormat format, size_t samplesToCopy) { - samplesToCopy = std::min( samplesToCopy, AvailForPut() ); + auto start = mStart.load( std::memory_order_relaxed ); + auto end = mEnd.load( std::memory_order_relaxed ); + samplesToCopy = std::min( samplesToCopy, Free( start, end ) ); auto src = buffer; size_t copied = 0; - auto pos = mEnd; + auto pos = end; while(samplesToCopy) { auto block = std::min( samplesToCopy, mBufferSize - pos ); @@ -70,16 +92,20 @@ size_t RingBuffer::Put(samplePtr buffer, sampleFormat format, copied += block; } - mEnd = pos; + // Atomically update the end pointer with release, so the nonatomic writes + // just done to the buffer don't get reordered after + mEnd.store(pos, std::memory_order_release); return copied; } size_t RingBuffer::Clear(sampleFormat format, size_t samplesToClear) { - samplesToClear = std::min( samplesToClear, AvailForPut() ); + auto start = mStart.load( std::memory_order_relaxed ); + auto end = mEnd.load( std::memory_order_relaxed ); + samplesToClear = std::min( samplesToClear, Free( start, end ) ); size_t cleared = 0; - auto pos = mEnd; + auto pos = end; while(samplesToClear) { auto block = std::min( samplesToClear, mBufferSize - pos ); @@ -91,48 +117,70 @@ size_t RingBuffer::Clear(sampleFormat format, size_t samplesToClear) cleared += block; } - mEnd = pos; + // Atomically update the end pointer with release, so the nonatomic writes + // just done to the buffer don't get reordered after + mEnd.store(pos, std::memory_order_release); return cleared; } // // For the reader only: +// Only reader writes the start, so it can read it again relaxed +// But it reads the end written by the writer, who also sends sample data +// with the changes of end; therefore that must be read with acquire order +// if we do more than merely query the size or throw samples away // size_t RingBuffer::AvailForGet() { - return Len(); + auto end = mEnd.load( std::memory_order_relaxed ); // get away with it here + auto start = mStart.load( std::memory_order_relaxed ); + return Filled( start, end ); + + // Writer might increase the available samples after return, but will + // never decrease them, so reader can safely assume this much at least } size_t RingBuffer::Get(samplePtr buffer, sampleFormat format, size_t samplesToCopy) { - samplesToCopy = std::min( samplesToCopy, Len() ); + // Must match the writer's release with acquire for well defined reads of + // the buffer + auto end = mEnd.load( std::memory_order_acquire ); + auto start = mStart.load( std::memory_order_relaxed ); + samplesToCopy = std::min( samplesToCopy, Filled( start, end ) ); auto dest = buffer; size_t copied = 0; while(samplesToCopy) { - auto block = std::min( samplesToCopy, mBufferSize - mStart ); + auto block = std::min( samplesToCopy, mBufferSize - start ); - CopySamples(mBuffer.ptr() + mStart * SAMPLE_SIZE(mFormat), mFormat, + CopySamples(mBuffer.ptr() + start * SAMPLE_SIZE(mFormat), mFormat, dest, format, block); dest += block * SAMPLE_SIZE(format); - mStart = (mStart + block) % mBufferSize; + start = (start + block) % mBufferSize; samplesToCopy -= block; copied += block; } + // Communicate to writer that we have consumed some data, and that's all + mStart.store( start, std::memory_order_relaxed ); + return copied; } size_t RingBuffer::Discard(size_t samplesToDiscard) { - samplesToDiscard = std::min( samplesToDiscard, Len() ); + auto end = mEnd.load( std::memory_order_relaxed ); // get away with it here + auto start = mStart.load( std::memory_order_relaxed ); + samplesToDiscard = std::min( samplesToDiscard, Filled( start, end ) ); - mStart = (mStart + samplesToDiscard) % mBufferSize; + // Communicate to writer that we have consumed some data, and that's all + mStart.store((start + samplesToDiscard) % mBufferSize, + std::memory_order_relaxed); return samplesToDiscard; } diff --git a/src/RingBuffer.h b/src/RingBuffer.h index 6fc19bd42..cdc5354b8 100644 --- a/src/RingBuffer.h +++ b/src/RingBuffer.h @@ -12,6 +12,7 @@ #define __AUDACITY_RING_BUFFER__ #include "SampleFormat.h" +#include class RingBuffer { public: @@ -35,12 +36,13 @@ class RingBuffer { size_t Discard(size_t samples); private: - size_t Len(); + size_t Filled( size_t start, size_t end ); + size_t Free( size_t start, size_t end ); sampleFormat mFormat; - size_t mStart { 0 }; - size_t mEnd { 0 }; - size_t mBufferSize; + std::atomic mStart { 0 }; + std::atomic mEnd { 0 }; + const size_t mBufferSize; SampleBuffer mBuffer; };