Listener( MultiProducerConsumerRingBuffer<T>& buffer, uint32_t write_p) : buffer_(buffer) , read_p_(write_p) {} ~Listener() { buffer_.unregister_listener(*this); } /** * @returns the Cell at the read pointer or nullptr if the buffer is empty */ Cell* head() { auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
// If local read_pointer and write_pointer are equal => buffer is empty for this listener if (read_p_ == pointer.ptr.write_p ) { returnnullptr; }
auto cell = &buffer_.cells_[get_pointer_value(read_p_)]; return cell->ref_counter() != 0 ? cell : nullptr; } /** * Decreases the ref_counter of the head cell, * if the counter reaches 0 the cell becomes dirty * and free_cells are incremented * @return true if the cell ref_counter is 0 after pop * @throw std::exception if buffer is empty */ boolpop() { auto cell = head(); if (!cell) { throw std::runtime_error("Buffer empty"); } auto counter = cell->ref_counter_.fetch_sub(1); assert(counter > 0); // If all the listeners have read the cell if (counter == 1) { // Increase the free cells => increase the global read pointer auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed); while (!buffer_.node_->pointer_.compare_exchange_weak(pointer, { { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } }, std::memory_order_release, std::memory_order_relaxed)) { } } // Increase the local read pointer read_p_ = buffer_.inc_pointer(read_p_); return (counter == 1); }
std::unique_ptr<Listener> register_listener() { // The new listener's read pointer is the current write pointer auto listener = std::unique_ptr<Listener>( newListener( *this, node_->pointer_.load(std::memory_order_relaxed).ptr.write_p));