#pragma once #include "StarThread.hpp" namespace Star { STAR_EXCEPTION(WorkerPoolException, StarException); STAR_CLASS(WorkerPool); // Shareable handle for a WorkerPool computation that does not produce any // value. class WorkerPoolHandle { public: // Returns true if the work is completed (either due to error or actual // completion, will not re-throw) bool done() const; // Waits up to given millis for the computation to finish. Returns true if // the computation finished within the allotted time, false otherwise. If // the computation is finished but it threw an exception, it will be // re-thrown here. bool wait(unsigned millis) const; // synonym for wait(0) bool poll() const; // Wait until the computation finishes completely. If the computation threw // an exception it will be re-thrown by this method. void finish() const; private: friend WorkerPool; struct Impl { Impl(); Mutex mutex; ConditionVariable condition; atomic done; std::exception_ptr exception; }; WorkerPoolHandle(shared_ptr impl); shared_ptr m_impl; }; // Shareable handle for a WorkerPool computation that produces a value. template class WorkerPoolPromise { public: // Returns true if the work is completed (either due to error or actual // completion, will not re-throw) bool done() const; // Waits for the given amount of time for the work to be completed. If the // work is completed, returns true. If the producer function throws for any // reason, this method will re-throw the exception. If millis is zero, does // not wait at all simply polls to see if the computation is finished. bool wait(unsigned millis) const; // synonym for wait(0) bool poll() const; // Blocks until the work is done, and returns the result. May be called // multiple times to access the result. If the computation threw // an exception it will be re-thrown by this method. ResultType& get(); ResultType const& get() const; private: friend WorkerPool; struct Impl { Mutex mutex; ConditionVariable condition; Maybe result; std::exception_ptr exception; }; WorkerPoolPromise(shared_ptr impl); shared_ptr m_impl; }; class WorkerPool { public: // Creates a stopped pool WorkerPool(String name); // Creates a started pool WorkerPool(String name, unsigned threadCount); ~WorkerPool(); WorkerPool(WorkerPool&&); WorkerPool& operator=(WorkerPool&&); // Start the thread pool with the given thread count range, or if it is // already started, reconfigure the thread counts. void start(unsigned threadCount); // Stop the thread pool, not necessarily finishing any pending jobs (may // leave pending jobs on the queue). void stop(); // Try to finish any remaining jobs, then stop the thread pool. This method // must not be called if the worker pool will continuously receive new work, // as it may not ever complete if that is the case. The work queue must // eventually become empty for this to properly return. void finish(); // Add the given work to the pool and return a handle for the work. It not // required that the caller of this method hold on to the worker handle, the // work will be managed and completed regardless of the WorkerPoolHandle // lifetime. WorkerPoolHandle addWork(function work); // Like addWork, but the worker is expected to produce some result. The // returned promise can be used to get this return value once the producer is // complete. template WorkerPoolPromise addProducer(function producer); private: class WorkerThread : public Thread { public: // Starts automatically WorkerThread(WorkerPool* parent); ~WorkerThread(); void run() override; WorkerPool* parent; atomic shouldStop; atomic waiting; }; void queueWork(function work); String m_name; Mutex m_threadMutex; List> m_workerThreads; Mutex m_workMutex; ConditionVariable m_workCondition; Deque> m_pendingWork; }; template bool WorkerPoolPromise::done() const { MutexLocker locker(m_impl->mutex); return m_impl->result || m_impl->exception; } template bool WorkerPoolPromise::wait(unsigned millis) const { MutexLocker locker(m_impl->mutex); if (!m_impl->result && !m_impl->exception && millis != 0) m_impl->condition.wait(m_impl->mutex, millis); if (m_impl->exception) std::rethrow_exception(m_impl->exception); if (m_impl->result) return true; return false; } template bool WorkerPoolPromise::poll() const { return wait(0); } template ResultType& WorkerPoolPromise::get() { MutexLocker locker(m_impl->mutex); if (!m_impl->result && !m_impl->exception) m_impl->condition.wait(m_impl->mutex); if (m_impl->exception) std::rethrow_exception(m_impl->exception); return *m_impl->result; } template ResultType const& WorkerPoolPromise::get() const { return const_cast(this)->get(); } template WorkerPoolPromise::WorkerPoolPromise(shared_ptr impl) : m_impl(std::move(impl)) {} template WorkerPoolPromise WorkerPool::addProducer(function producer) { // Construct a worker pool promise and wrap the producer to signal the // promise when finished. auto workerPoolPromiseImpl = make_shared::Impl>(); queueWork([workerPoolPromiseImpl, producer]() { try { auto result = producer(); MutexLocker promiseLocker(workerPoolPromiseImpl->mutex); workerPoolPromiseImpl->result = std::move(result); workerPoolPromiseImpl->condition.broadcast(); } catch (...) { MutexLocker promiseLocker(workerPoolPromiseImpl->mutex); workerPoolPromiseImpl->exception = std::current_exception(); workerPoolPromiseImpl->condition.broadcast(); } }); return workerPoolPromiseImpl; } }