Branch data Line data Source code
# 1 : : // Copyright (c) 2015-2020 The Bitcoin Core developers
# 2 : : // Distributed under the MIT software license, see the accompanying
# 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
# 4 : :
# 5 : : #include <scheduler.h>
# 6 : :
# 7 : : #include <random.h>
# 8 : : #include <util/time.h>
# 9 : :
# 10 : : #include <assert.h>
# 11 : : #include <functional>
# 12 : : #include <utility>
# 13 : :
# 14 : : CScheduler::CScheduler()
# 15 : 824 : {
# 16 : 824 : }
# 17 : :
# 18 : : CScheduler::~CScheduler()
# 19 : 824 : {
# 20 : 824 : assert(nThreadsServicingQueue == 0);
# 21 [ + + ]: 824 : if (stopWhenEmpty) assert(taskQueue.empty());
# 22 : 824 : }
# 23 : :
# 24 : :
# 25 : : void CScheduler::serviceQueue()
# 26 : 850 : {
# 27 : 850 : WAIT_LOCK(newTaskMutex, lock);
# 28 : 850 : ++nThreadsServicingQueue;
# 29 : :
# 30 : : // newTaskMutex is locked throughout this loop EXCEPT
# 31 : : // when the thread is waiting or when the user's function
# 32 : : // is called.
# 33 [ + + ]: 210473 : while (!shouldStop()) {
# 34 : 209623 : try {
# 35 [ + + ][ + + ]: 210655 : while (!shouldStop() && taskQueue.empty()) {
# 36 : : // Wait until there is something to do.
# 37 : 1032 : newTaskScheduled.wait(lock);
# 38 : 1032 : }
# 39 : :
# 40 : : // Wait until either there is a new task, or until
# 41 : : // the time of the first item on the queue:
# 42 : :
# 43 [ + + ][ + - ]: 299545 : while (!shouldStop() && !taskQueue.empty()) {
# 44 : 298717 : std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
# 45 [ + + ]: 298717 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
# 46 : 208795 : break; // Exit loop after timeout, it means we reached the time of the event
# 47 : 208795 : }
# 48 : 298717 : }
# 49 : :
# 50 : : // If there are multiple threads, the queue can empty while we're waiting (another
# 51 : : // thread may service the task we were waiting on).
# 52 [ + + ][ - + ]: 209623 : if (shouldStop() || taskQueue.empty())
# 53 : 832 : continue;
# 54 : :
# 55 : 208791 : Function f = taskQueue.begin()->second;
# 56 : 208791 : taskQueue.erase(taskQueue.begin());
# 57 : :
# 58 : 208791 : {
# 59 : : // Unlock before calling f, so it can reschedule itself or another task
# 60 : : // without deadlocking:
# 61 : 208791 : REVERSE_LOCK(lock);
# 62 : 208791 : f();
# 63 : 208791 : }
# 64 : 208791 : } catch (...) {
# 65 : 0 : --nThreadsServicingQueue;
# 66 : 0 : throw;
# 67 : 0 : }
# 68 : 209623 : }
# 69 : 850 : --nThreadsServicingQueue;
# 70 : 850 : newTaskScheduled.notify_one();
# 71 : 850 : }
# 72 : :
# 73 : : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
# 74 : 214764 : {
# 75 : 214764 : {
# 76 : 214764 : LOCK(newTaskMutex);
# 77 : 214764 : taskQueue.insert(std::make_pair(t, f));
# 78 : 214764 : }
# 79 : 214764 : newTaskScheduled.notify_one();
# 80 : 214764 : }
# 81 : :
# 82 : : void CScheduler::MockForward(std::chrono::seconds delta_seconds)
# 83 : 11 : {
# 84 : 11 : assert(delta_seconds > 0s && delta_seconds <= 1h);
# 85 : :
# 86 : 11 : {
# 87 : 11 : LOCK(newTaskMutex);
# 88 : :
# 89 : : // use temp_queue to maintain updated schedule
# 90 : 11 : std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
# 91 : :
# 92 [ + + ]: 67 : for (const auto& element : taskQueue) {
# 93 : 67 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
# 94 : 67 : }
# 95 : :
# 96 : : // point taskQueue to temp_queue
# 97 : 11 : taskQueue = std::move(temp_queue);
# 98 : 11 : }
# 99 : :
# 100 : : // notify that the taskQueue needs to be processed
# 101 : 11 : newTaskScheduled.notify_one();
# 102 : 11 : }
# 103 : :
# 104 : : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
# 105 : 38915 : {
# 106 : 38915 : f();
# 107 : 38915 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
# 108 : 38915 : }
# 109 : :
# 110 : : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
# 111 : 3919 : {
# 112 : 3919 : scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
# 113 : 3919 : }
# 114 : :
# 115 : : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
# 116 : : std::chrono::system_clock::time_point& last) const
# 117 : 8 : {
# 118 : 8 : LOCK(newTaskMutex);
# 119 : 8 : size_t result = taskQueue.size();
# 120 [ + + ]: 8 : if (!taskQueue.empty()) {
# 121 : 6 : first = taskQueue.begin()->first;
# 122 : 6 : last = taskQueue.rbegin()->first;
# 123 : 6 : }
# 124 : 8 : return result;
# 125 : 8 : }
# 126 : :
# 127 : : bool CScheduler::AreThreadsServicingQueue() const
# 128 : 818 : {
# 129 : 818 : LOCK(newTaskMutex);
# 130 : 818 : return nThreadsServicingQueue;
# 131 : 818 : }
# 132 : :
# 133 : :
# 134 : : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
# 135 : 338124 : {
# 136 : 338124 : {
# 137 : 338124 : LOCK(m_cs_callbacks_pending);
# 138 : : // Try to avoid scheduling too many copies here, but if we
# 139 : : // accidentally have two ProcessQueue's scheduled at once its
# 140 : : // not a big deal.
# 141 [ + + ]: 338124 : if (m_are_callbacks_running) return;
# 142 [ + + ]: 257207 : if (m_callbacks_pending.empty()) return;
# 143 : 170315 : }
# 144 : 170315 : m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
# 145 : 170315 : }
# 146 : :
# 147 : : void SingleThreadedSchedulerClient::ProcessQueue()
# 148 : 169879 : {
# 149 : 169879 : std::function<void()> callback;
# 150 : 169879 : {
# 151 : 169879 : LOCK(m_cs_callbacks_pending);
# 152 [ + + ]: 169879 : if (m_are_callbacks_running) return;
# 153 [ + + ]: 169871 : if (m_callbacks_pending.empty()) return;
# 154 : 168749 : m_are_callbacks_running = true;
# 155 : :
# 156 : 168749 : callback = std::move(m_callbacks_pending.front());
# 157 : 168749 : m_callbacks_pending.pop_front();
# 158 : 168749 : }
# 159 : :
# 160 : : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
# 161 : : // to ensure both happen safely even if callback() throws.
# 162 : 168749 : struct RAIICallbacksRunning {
# 163 : 168749 : SingleThreadedSchedulerClient* instance;
# 164 : 168749 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
# 165 : 168749 : ~RAIICallbacksRunning()
# 166 : 168749 : {
# 167 : 168749 : {
# 168 : 168749 : LOCK(instance->m_cs_callbacks_pending);
# 169 : 168749 : instance->m_are_callbacks_running = false;
# 170 : 168749 : }
# 171 : 168749 : instance->MaybeScheduleProcessQueue();
# 172 : 168749 : }
# 173 : 168749 : } raiicallbacksrunning(this);
# 174 : :
# 175 : 168749 : callback();
# 176 : 168749 : }
# 177 : :
# 178 : : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
# 179 : 169375 : {
# 180 : 169375 : assert(m_pscheduler);
# 181 : :
# 182 : 169375 : {
# 183 : 169375 : LOCK(m_cs_callbacks_pending);
# 184 : 169375 : m_callbacks_pending.emplace_back(std::move(func));
# 185 : 169375 : }
# 186 : 169375 : MaybeScheduleProcessQueue();
# 187 : 169375 : }
# 188 : :
# 189 : : void SingleThreadedSchedulerClient::EmptyQueue()
# 190 : 818 : {
# 191 : 818 : assert(!m_pscheduler->AreThreadsServicingQueue());
# 192 : 818 : bool should_continue = true;
# 193 [ + + ]: 1636 : while (should_continue) {
# 194 : 818 : ProcessQueue();
# 195 : 818 : LOCK(m_cs_callbacks_pending);
# 196 : 818 : should_continue = !m_callbacks_pending.empty();
# 197 : 818 : }
# 198 : 818 : }
# 199 : :
# 200 : : size_t SingleThreadedSchedulerClient::CallbacksPending()
# 201 : 85370 : {
# 202 : 85370 : LOCK(m_cs_callbacks_pending);
# 203 : 85370 : return m_callbacks_pending.size();
# 204 : 85370 : }
|