Branch data Line data Source code
# 1 : : // Copyright (c) 2015-2021 The Bitcoin Core developers # 2 : : // Distributed under the MIT software license, see the accompanying # 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. # 4 : : # 5 : : #ifndef BITCOIN_SCHEDULER_H # 6 : : #define BITCOIN_SCHEDULER_H # 7 : : # 8 : : #include <condition_variable> # 9 : : #include <functional> # 10 : : #include <list> # 11 : : #include <map> # 12 : : #include <thread> # 13 : : # 14 : : #include <sync.h> # 15 : : # 16 : : /** # 17 : : * Simple class for background tasks that should be run # 18 : : * periodically or once "after a while" # 19 : : * # 20 : : * Usage: # 21 : : * # 22 : : * CScheduler* s = new CScheduler(); # 23 : : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } # 24 : : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); # 25 : : * std::thread* t = new std::thread([&] { s->serviceQueue(); }); # 26 : : * # 27 : : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: # 28 : : * s->stop(); # 29 : : * t->join(); # 30 : : * delete t; # 31 : : * delete s; // Must be done after thread is interrupted/joined. # 32 : : */ # 33 : : class CScheduler # 34 : : { # 35 : : public: # 36 : : CScheduler(); # 37 : : ~CScheduler(); # 38 : : # 39 : : std::thread m_service_thread; # 40 : : # 41 : : typedef std::function<void()> Function; # 42 : : # 43 : : /** Call func at/after time t */ # 44 : : void schedule(Function f, std::chrono::system_clock::time_point t); # 45 : : # 46 : : /** Call f once after the delta has passed */ # 47 : : void scheduleFromNow(Function f, std::chrono::milliseconds delta) # 48 : 37100 : { # 49 : 37100 : schedule(std::move(f), std::chrono::system_clock::now() + delta); # 50 : 37100 : } # 51 : : # 52 : : /** # 53 : : * Repeat f until the scheduler is stopped. First run is after delta has passed once. # 54 : : * # 55 : : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more # 56 : : * accurate scheduling, don't use this method. # 57 : : */ # 58 : : void scheduleEvery(Function f, std::chrono::milliseconds delta); # 59 : : # 60 : : /** # 61 : : * Mock the scheduler to fast forward in time. # 62 : : * Iterates through items on taskQueue and reschedules them # 63 : : * to be delta_seconds sooner. # 64 : : */ # 65 : : void MockForward(std::chrono::seconds delta_seconds); # 66 : : # 67 : : /** # 68 : : * Services the queue 'forever'. Should be run in a thread. # 69 : : */ # 70 : : void serviceQueue(); # 71 : : # 72 : : /** Tell any threads running serviceQueue to stop as soon as the current task is done */ # 73 : : void stop() # 74 : 1009 : { # 75 : 1009 : WITH_LOCK(newTaskMutex, stopRequested = true); # 76 : 1009 : newTaskScheduled.notify_all(); # 77 [ + + ]: 1009 : if (m_service_thread.joinable()) m_service_thread.join(); # 78 : 1009 : } # 79 : : /** Tell any threads running serviceQueue to stop when there is no work left to be done */ # 80 : : void StopWhenDrained() # 81 : 4 : { # 82 : 4 : WITH_LOCK(newTaskMutex, stopWhenEmpty = true); # 83 : 4 : newTaskScheduled.notify_all(); # 84 [ - + ]: 4 : if (m_service_thread.joinable()) m_service_thread.join(); # 85 : 4 : } # 86 : : # 87 : : /** # 88 : : * Returns number of tasks waiting to be serviced, # 89 : : * and first and last task times # 90 : : */ # 91 : : size_t getQueueInfo(std::chrono::system_clock::time_point& first, # 92 : : std::chrono::system_clock::time_point& last) const; # 93 : : # 94 : : /** Returns true if there are threads actively running in serviceQueue() */ # 95 : : bool AreThreadsServicingQueue() const; # 96 : : # 97 : : private: # 98 : : mutable Mutex newTaskMutex; # 99 : : std::condition_variable newTaskScheduled; # 100 : : std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); # 101 : : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; # 102 : : bool stopRequested GUARDED_BY(newTaskMutex){false}; # 103 : : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; # 104 [ + + ][ + + ]: 876923 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } # [ + + ] # 105 : : }; # 106 : : # 107 : : /** # 108 : : * Class used by CScheduler clients which may schedule multiple jobs # 109 : : * which are required to be run serially. Jobs may not be run on the # 110 : : * same thread, but no two jobs will be executed # 111 : : * at the same time and memory will be release-acquire consistent # 112 : : * (the scheduler will internally do an acquire before invoking a callback # 113 : : * as well as a release at the end). In practice this means that a callback # 114 : : * B() will be able to observe all of the effects of callback A() which executed # 115 : : * before it. # 116 : : */ # 117 : : class SingleThreadedSchedulerClient # 118 : : { # 119 : : private: # 120 : : CScheduler* m_pscheduler; # 121 : : # 122 : : Mutex m_callbacks_mutex; # 123 : : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); # 124 : : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; # 125 : : # 126 : : void MaybeScheduleProcessQueue(); # 127 : : void ProcessQueue(); # 128 : : # 129 : : public: # 130 : 1000 : explicit SingleThreadedSchedulerClient(CScheduler* pschedulerIn) : m_pscheduler(pschedulerIn) {} # 131 : : # 132 : : /** # 133 : : * Add a callback to be executed. Callbacks are executed serially # 134 : : * and memory is release-acquire consistent between callback executions. # 135 : : * Practically, this means that callbacks can behave as if they are executed # 136 : : * in order by a single thread. # 137 : : */ # 138 : : void AddToProcessQueue(std::function<void()> func); # 139 : : # 140 : : /** # 141 : : * Processes all remaining queue members on the calling thread, blocking until queue is empty # 142 : : * Must be called after the CScheduler has no remaining processing threads! # 143 : : */ # 144 : : void EmptyQueue(); # 145 : : # 146 : : size_t CallbacksPending(); # 147 : : }; # 148 : : # 149 : : #endif // BITCOIN_SCHEDULER_H