blob: f61868268e5ea6fd4817fbeb781dbd1977047860 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
#include "StarWorkerPool.hpp"
#include "StarIterator.hpp"
#include "StarMathCommon.hpp"
namespace Star {
bool WorkerPoolHandle::done() const {
MutexLocker locker(m_impl->mutex);
return m_impl->done;
}
bool WorkerPoolHandle::wait(unsigned millis) const {
MutexLocker locker(m_impl->mutex);
if (!m_impl->done && millis != 0)
m_impl->condition.wait(m_impl->mutex, millis);
if (m_impl->exception)
std::rethrow_exception(m_impl->exception);
return m_impl->done;
}
bool WorkerPoolHandle::poll() const {
return wait(0);
}
void WorkerPoolHandle::finish() const {
MutexLocker locker(m_impl->mutex);
if (!m_impl->done)
m_impl->condition.wait(m_impl->mutex);
if (m_impl->exception)
std::rethrow_exception(m_impl->exception);
return;
}
WorkerPoolHandle::Impl::Impl() : done(false) {}
WorkerPoolHandle::WorkerPoolHandle(shared_ptr<Impl> impl) : m_impl(std::move(impl)) {}
WorkerPool::WorkerPool(String name) : m_name(std::move(name)) {}
WorkerPool::WorkerPool(String name, unsigned threadCount) : WorkerPool(std::move(name)) {
start(threadCount);
}
WorkerPool::~WorkerPool() {
stop();
}
WorkerPool::WorkerPool(WorkerPool&&) = default;
WorkerPool& WorkerPool::operator=(WorkerPool&&) = default;
void WorkerPool::start(unsigned threadCount) {
MutexLocker threadLock(m_threadMutex);
for (auto const& workerThread : m_workerThreads)
workerThread->shouldStop = true;
m_workCondition.broadcast();
m_workerThreads.clear();
for (size_t i = m_workerThreads.size(); i < threadCount; ++i)
m_workerThreads.append(make_unique<WorkerThread>(this));
}
void WorkerPool::stop() {
MutexLocker threadLock(m_threadMutex);
for (auto const& workerThread : m_workerThreads)
workerThread->shouldStop = true;
{
// Must hold the work lock while broadcasting to ensure that any worker
// threads that might wait without stopping actually get the signal.
MutexLocker workLock(m_workMutex);
m_workCondition.broadcast();
}
m_workerThreads.clear();
}
void WorkerPool::finish() {
// This is kind of a weird way to "wait" until all the pending work is
// finished. In order for the currently active worker threads to
// cooperatively complete the remaining work, the work lock must not be held
// the entire time (then just this thread would be the one finishing the
// work). Instead, the calling thread joins in on the action and tries to
// finish work while yielding to the other threads after each completed job.
MutexLocker workMutex(m_workMutex);
while (!m_pendingWork.empty()) {
auto firstWork = m_pendingWork.takeFirst();
workMutex.unlock();
firstWork();
Thread::yield();
workMutex.lock();
}
workMutex.unlock();
stop();
}
WorkerPoolHandle WorkerPool::addWork(function<void()> work) {
// Construct a worker pool handle and wrap the work to signal the handle when
// finished. Set the result to empty string if successful and to the content
// of the exception if an exception is thrown.
auto workerPoolHandleImpl = make_shared<WorkerPoolHandle::Impl>();
queueWork([workerPoolHandleImpl, work]() {
try {
work();
MutexLocker handleLocker(workerPoolHandleImpl->mutex);
workerPoolHandleImpl->done = true;
workerPoolHandleImpl->condition.broadcast();
} catch (...) {
MutexLocker handleLocker(workerPoolHandleImpl->mutex);
workerPoolHandleImpl->done = true;
workerPoolHandleImpl->exception = std::current_exception();
workerPoolHandleImpl->condition.broadcast();
}
});
return workerPoolHandleImpl;
}
WorkerPool::WorkerThread::WorkerThread(WorkerPool* parent)
: Thread(strf("WorkerThread for WorkerPool '{}'", parent->m_name)),
parent(parent),
shouldStop(false),
waiting(false) {
start();
}
WorkerPool::WorkerThread::~WorkerThread() {
join();
}
void WorkerPool::WorkerThread::run() {
MutexLocker workLock(parent->m_workMutex);
while (true) {
if (shouldStop)
break;
if (parent->m_pendingWork.empty()) {
waiting = true;
parent->m_workCondition.wait(parent->m_workMutex);
waiting = false;
}
if (!parent->m_pendingWork.empty()) {
auto work = parent->m_pendingWork.takeFirst();
workLock.unlock();
work();
workLock.lock();
}
}
}
void WorkerPool::queueWork(function<void()> work) {
MutexLocker workLock(m_workMutex);
m_pendingWork.append(std::move(work));
m_workCondition.signal();
}
}
|