diff options
author | Kae <80987908+Novaenia@users.noreply.github.com> | 2023-06-20 14:33:09 +1000 |
---|---|---|
committer | Kae <80987908+Novaenia@users.noreply.github.com> | 2023-06-20 14:33:09 +1000 |
commit | 6352e8e3196f78388b6c771073f9e03eaa612673 (patch) | |
tree | e23772f79a7fbc41bc9108951e9e136857484bf4 /source/core/StarObserverStream.hpp | |
parent | 6741a057e5639280d85d0f88ba26f000baa58f61 (diff) |
everything everywhere
all at once
Diffstat (limited to 'source/core/StarObserverStream.hpp')
-rw-r--r-- | source/core/StarObserverStream.hpp | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/source/core/StarObserverStream.hpp b/source/core/StarObserverStream.hpp new file mode 100644 index 0000000..c97376c --- /dev/null +++ b/source/core/StarObserverStream.hpp @@ -0,0 +1,98 @@ +#ifndef STAR_OBSERVER_STREAM_HPP +#define STAR_OBSERVER_STREAM_HPP + +#include "StarList.hpp" + +namespace Star { + +// Holds a stream of values which separate observers can query and track +// occurrences in the stream without pulling them from the stream. Each +// addition to the stream is given an abstract step value, and queries to the +// stream can reference a given step value in order to track events since the +// last query. +template <typename T> +class ObserverStream { +public: + ObserverStream(uint64_t historyLimit = 0); + + // If a history limit is set, then any entries with step values older than + // the given limit will be discarded automatically. A historyLimit of 0 + // means that no values will be forgotten. The step value increases by one + // with each entry added, or can be increased artificially by a call to + // tickStep. + uint64_t historyLimit() const; + void setHistoryLimit(uint64_t historyLimit = 0); + + // Add a value to the end of the stream and increment the step value by 1. + void add(T value); + + // Artificially tick the step by the given delta, which can be used to clear + // older values. + void tick(uint64_t delta = 1); + + // Query values in the stream since the given step value. Will return the + // values in the stream, and a new since value to pass to query on the next + // call. + pair<List<T>, uint64_t> query(uint64_t since = 0) const; + + // Resets the step value to 0 and clears all values. + void reset(); + +private: + uint64_t m_historyLimit; + uint64_t m_nextStep; + Deque<pair<uint64_t, T>> m_values; +}; + +template <typename T> +ObserverStream<T>::ObserverStream(uint64_t historyLimit) + : m_historyLimit(historyLimit), m_nextStep(0) {} + +template <typename T> +uint64_t ObserverStream<T>::historyLimit() const { + return m_historyLimit; +} + +template <typename T> +void ObserverStream<T>::setHistoryLimit(uint64_t historyLimit) { + m_historyLimit = historyLimit; + tick(0); +} + +template <typename T> +void ObserverStream<T>::add(T value) { + m_values.append({m_nextStep, move(value)}); + tick(1); +} + +template <typename T> +void ObserverStream<T>::tick(uint64_t delta) { + m_nextStep += delta; + uint64_t removeBefore = m_nextStep - min(m_nextStep, m_historyLimit); + while (!m_values.empty() && m_values.first().first < removeBefore) + m_values.removeFirst(); +} + +template <typename T> +pair<List<T>, uint64_t> ObserverStream<T>::query(uint64_t since) const { + List<T> res; + auto i = std::lower_bound(m_values.begin(), + m_values.end(), + since, + [](pair<uint64_t, T> const& p, uint64_t step) { return p.first < step; }); + while (i != m_values.end()) { + res.append(i->second); + ++i; + } + return {res, m_nextStep}; +} + +template <typename T> +void ObserverStream<T>::reset() { + m_nextStep = 0; + m_values.clear(); +} + +} + +#endif |