#pragma once #include "StarList.hpp" namespace Star { // Holds a stream of values which separate observers can query and track // occurrences in the stream without pulling them from the stream. Each // addition to the stream is given an abstract step value, and queries to the // stream can reference a given step value in order to track events since the // last query. template class ObserverStream { public: ObserverStream(uint64_t historyLimit = 0); // If a history limit is set, then any entries with step values older than // the given limit will be discarded automatically. A historyLimit of 0 // means that no values will be forgotten. The step value increases by one // with each entry added, or can be increased artificially by a call to // tickStep. uint64_t historyLimit() const; void setHistoryLimit(uint64_t historyLimit = 0); // Add a value to the end of the stream and increment the step value by 1. void add(T value); // Artificially tick the step by the given delta, which can be used to clear // older values. void tick(uint64_t delta = 1); // Query values in the stream since the given step value. Will return the // values in the stream, and a new since value to pass to query on the next // call. pair, uint64_t> query(uint64_t since = 0) const; // Resets the step value to 0 and clears all values. void reset(); private: uint64_t m_historyLimit; uint64_t m_nextStep; Deque> m_values; }; template ObserverStream::ObserverStream(uint64_t historyLimit) : m_historyLimit(historyLimit), m_nextStep(0) {} template uint64_t ObserverStream::historyLimit() const { return m_historyLimit; } template void ObserverStream::setHistoryLimit(uint64_t historyLimit) { m_historyLimit = historyLimit; tick(0); } template void ObserverStream::add(T value) { m_values.append({m_nextStep, std::move(value)}); tick(1); } template void ObserverStream::tick(uint64_t delta) { m_nextStep += delta; uint64_t removeBefore = m_nextStep - min(m_nextStep, m_historyLimit); while (!m_values.empty() && m_values.first().first < removeBefore) m_values.removeFirst(); } template pair, uint64_t> ObserverStream::query(uint64_t since) const { List res; auto i = std::lower_bound(m_values.begin(), m_values.end(), since, [](pair const& p, uint64_t step) { return p.first < step; }); while (i != m_values.end()) { res.append(i->second); ++i; } return {res, m_nextStep}; } template void ObserverStream::reset() { m_nextStep = 0; m_values.clear(); } }