Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
path: root/source/core/StarObserverStream.hpp
blob: f78f07fe9bd74f199b3bedc60b0025e6c79eb413 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once

#include "StarList.hpp"

namespace Star {

// Holds a stream of values which separate observers can query and track
// occurrences in the stream without pulling them from the stream.  Each
// addition to the stream is given an abstract step value, and queries to the
// stream can reference a given step value in order to track events since the
// last query.
template <typename T>
class ObserverStream {
public:
  ObserverStream(uint64_t historyLimit = 0);

  // If a history limit is set, then any entries with step values older than
  // the given limit will be discarded automatically.  A historyLimit of 0
  // means that no values will be forgotten.  The step value increases by one
  // with each entry added, or can be increased artificially by a call to
  // tickStep.
  uint64_t historyLimit() const;
  void setHistoryLimit(uint64_t historyLimit = 0);

  // Add a value to the end of the stream and increment the step value by 1.
  void add(T value);

  // Artificially tick the step by the given delta, which can be used to clear
  // older values.
  void tick(uint64_t delta = 1);

  // Query values in the stream since the given step value.  Will return the
  // values in the stream, and a new since value to pass to query on the next
  // call.
  pair<List<T>, uint64_t> query(uint64_t since = 0) const;

  // Resets the step value to 0 and clears all values.
  void reset();

private:
  uint64_t m_historyLimit;
  uint64_t m_nextStep;
  Deque<pair<uint64_t, T>> m_values;
};

template <typename T>
ObserverStream<T>::ObserverStream(uint64_t historyLimit)
  : m_historyLimit(historyLimit), m_nextStep(0) {}

template <typename T>
uint64_t ObserverStream<T>::historyLimit() const {
  return m_historyLimit;
}

template <typename T>
void ObserverStream<T>::setHistoryLimit(uint64_t historyLimit) {
  m_historyLimit = historyLimit;
  tick(0);
}

template <typename T>
void ObserverStream<T>::add(T value) {
  m_values.append({m_nextStep, std::move(value)});
  tick(1);
}

template <typename T>
void ObserverStream<T>::tick(uint64_t delta) {
  m_nextStep += delta;
  uint64_t removeBefore = m_nextStep - min(m_nextStep, m_historyLimit);
  while (!m_values.empty() && m_values.first().first < removeBefore)
    m_values.removeFirst();
}

template <typename T>
pair<List<T>, uint64_t> ObserverStream<T>::query(uint64_t since) const {
  List<T> res;
  auto i = std::lower_bound(m_values.begin(),
      m_values.end(),
      since,
      [](pair<uint64_t, T> const& p, uint64_t step) { return p.first < step; });
  while (i != m_values.end()) {
    res.append(i->second);
    ++i;
  }
  return {res, m_nextStep};
}

template <typename T>
void ObserverStream<T>::reset() {
  m_nextStep = 0;
  m_values.clear();
}

}