diff options
author | Kae <80987908+Novaenia@users.noreply.github.com> | 2023-06-20 14:33:09 +1000 |
---|---|---|
committer | Kae <80987908+Novaenia@users.noreply.github.com> | 2023-06-20 14:33:09 +1000 |
commit | 6352e8e3196f78388b6c771073f9e03eaa612673 (patch) | |
tree | e23772f79a7fbc41bc9108951e9e136857484bf4 /source/core/StarWorkerPool.cpp | |
parent | 6741a057e5639280d85d0f88ba26f000baa58f61 (diff) |
everything everywhere
all at once
Diffstat (limited to 'source/core/StarWorkerPool.cpp')
-rw-r--r-- | source/core/StarWorkerPool.cpp | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/source/core/StarWorkerPool.cpp b/source/core/StarWorkerPool.cpp new file mode 100644 index 0000000..fa37a57 --- /dev/null +++ b/source/core/StarWorkerPool.cpp @@ -0,0 +1,166 @@ +#include "StarWorkerPool.hpp" +#include "StarIterator.hpp" +#include "StarMathCommon.hpp" + +namespace Star { + +bool WorkerPoolHandle::done() const { + MutexLocker locker(m_impl->mutex); + return m_impl->done; +} + +bool WorkerPoolHandle::wait(unsigned millis) const { + MutexLocker locker(m_impl->mutex); + + if (!m_impl->done && millis != 0) + m_impl->condition.wait(m_impl->mutex, millis); + + if (m_impl->exception) + std::rethrow_exception(m_impl->exception); + + return m_impl->done; +} + +bool WorkerPoolHandle::poll() const { + return wait(0); +} + +void WorkerPoolHandle::finish() const { + MutexLocker locker(m_impl->mutex); + + if (!m_impl->done) + m_impl->condition.wait(m_impl->mutex); + + if (m_impl->exception) + std::rethrow_exception(m_impl->exception); + + return; +} + +WorkerPoolHandle::Impl::Impl() : done(false) {} + +WorkerPoolHandle::WorkerPoolHandle(shared_ptr<Impl> impl) : m_impl(move(impl)) {} + +WorkerPool::WorkerPool(String name) : m_name(move(name)) {} + +WorkerPool::WorkerPool(String name, unsigned threadCount) : WorkerPool(move(name)) { + start(threadCount); +} + +WorkerPool::~WorkerPool() { + stop(); +} + +WorkerPool::WorkerPool(WorkerPool&&) = default; +WorkerPool& WorkerPool::operator=(WorkerPool&&) = default; + +void WorkerPool::start(unsigned threadCount) { + MutexLocker threadLock(m_threadMutex); + + for (auto const& workerThread : m_workerThreads) + workerThread->shouldStop = true; + + m_workCondition.broadcast(); + m_workerThreads.clear(); + + for (size_t i = m_workerThreads.size(); i < threadCount; ++i) + m_workerThreads.append(make_unique<WorkerThread>(this)); +} + +void WorkerPool::stop() { + MutexLocker threadLock(m_threadMutex); + for (auto const& workerThread : m_workerThreads) + workerThread->shouldStop = true; + + { + // Must hold the work lock while broadcasting to ensure that any worker + // threads that might wait without stopping actually get the signal. + MutexLocker workLock(m_workMutex); + m_workCondition.broadcast(); + } + + m_workerThreads.clear(); +} + +void WorkerPool::finish() { + // This is kind of a weird way to "wait" until all the pending work is + // finished. In order for the currently active worker threads to + // cooperatively complete the remaining work, the work lock must not be held + // the entire time (then just this thread would be the one finishing the + // work). Instead, the calling thread joins in on the action and tries to + // finish work while yielding to the other threads after each completed job. + MutexLocker workMutex(m_workMutex); + while (!m_pendingWork.empty()) { + auto firstWork = m_pendingWork.takeFirst(); + workMutex.unlock(); + firstWork(); + Thread::yield(); + workMutex.lock(); + } + workMutex.unlock(); + + stop(); +} + +WorkerPoolHandle WorkerPool::addWork(function<void()> work) { + // Construct a worker pool handle and wrap the work to signal the handle when + // finished. Set the result to empty string if successful and to the content + // of the exception if an exception is thrown. + auto workerPoolHandleImpl = make_shared<WorkerPoolHandle::Impl>(); + queueWork([workerPoolHandleImpl, work]() { + try { + work(); + MutexLocker handleLocker(workerPoolHandleImpl->mutex); + workerPoolHandleImpl->done = true; + workerPoolHandleImpl->condition.broadcast(); + } catch (...) { + MutexLocker handleLocker(workerPoolHandleImpl->mutex); + workerPoolHandleImpl->done = true; + workerPoolHandleImpl->exception = std::current_exception(); + workerPoolHandleImpl->condition.broadcast(); + } + }); + + return workerPoolHandleImpl; +} + +WorkerPool::WorkerThread::WorkerThread(WorkerPool* parent) + : Thread(strf("WorkerThread for WorkerPool '%s'", parent->m_name)), + parent(parent), + shouldStop(false), + waiting(false) { + start(); +} + +WorkerPool::WorkerThread::~WorkerThread() { + join(); +} + +void WorkerPool::WorkerThread::run() { + MutexLocker workLock(parent->m_workMutex); + while (true) { + if (shouldStop) + break; + + if (parent->m_pendingWork.empty()) { + waiting = true; + parent->m_workCondition.wait(parent->m_workMutex); + waiting = false; + } + + if (!parent->m_pendingWork.empty()) { + auto work = parent->m_pendingWork.takeFirst(); + workLock.unlock(); + work(); + workLock.lock(); + } + } +} + +void WorkerPool::queueWork(function<void()> work) { + MutexLocker workLock(m_workMutex); + m_pendingWork.append(move(work)); + m_workCondition.signal(); +} + +} |