Branch data Line data Source code
# 1 : : // Copyright (c) 2015-2021 The Bitcoin Core developers
# 2 : : // Distributed under the MIT software license, see the accompanying
# 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
# 4 : :
# 5 : : #include <scheduler.h>
# 6 : :
# 7 : : #include <random.h>
# 8 : : #include <util/syscall_sandbox.h>
# 9 : : #include <util/time.h>
# 10 : :
# 11 : : #include <assert.h>
# 12 : : #include <functional>
# 13 : : #include <utility>
# 14 : :
# 15 : : CScheduler::CScheduler()
# 16 : 1002 : {
# 17 : 1002 : }
# 18 : :
# 19 : : CScheduler::~CScheduler()
# 20 : 1002 : {
# 21 : 1002 : assert(nThreadsServicingQueue == 0);
# 22 [ + + ]: 1002 : if (stopWhenEmpty) assert(taskQueue.empty());
# 23 : 1002 : }
# 24 : :
# 25 : :
# 26 : : void CScheduler::serviceQueue()
# 27 : 1028 : {
# 28 : 1028 : SetSyscallSandboxPolicy(SyscallSandboxPolicy::SCHEDULER);
# 29 : 1028 : WAIT_LOCK(newTaskMutex, lock);
# 30 : 1028 : ++nThreadsServicingQueue;
# 31 : :
# 32 : : // newTaskMutex is locked throughout this loop EXCEPT
# 33 : : // when the thread is waiting or when the user's function
# 34 : : // is called.
# 35 [ + + ]: 196706 : while (!shouldStop()) {
# 36 : 195678 : try {
# 37 [ + + ][ + + ]: 203843 : while (!shouldStop() && taskQueue.empty()) {
# 38 : : // Wait until there is something to do.
# 39 : 8165 : newTaskScheduled.wait(lock);
# 40 : 8165 : }
# 41 : :
# 42 : : // Wait until either there is a new task, or until
# 43 : : // the time of the first item on the queue:
# 44 : :
# 45 [ + + ][ + - ]: 280696 : while (!shouldStop() && !taskQueue.empty()) {
# 46 : 279687 : std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
# 47 [ + + ]: 279687 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
# 48 : 194669 : break; // Exit loop after timeout, it means we reached the time of the event
# 49 : 194669 : }
# 50 : 279687 : }
# 51 : :
# 52 : : // If there are multiple threads, the queue can empty while we're waiting (another
# 53 : : // thread may service the task we were waiting on).
# 54 [ + + ][ - + ]: 195678 : if (shouldStop() || taskQueue.empty())
# 55 : 1011 : continue;
# 56 : :
# 57 : 194667 : Function f = taskQueue.begin()->second;
# 58 : 194667 : taskQueue.erase(taskQueue.begin());
# 59 : :
# 60 : 194667 : {
# 61 : : // Unlock before calling f, so it can reschedule itself or another task
# 62 : : // without deadlocking:
# 63 : 194667 : REVERSE_LOCK(lock);
# 64 : 194667 : f();
# 65 : 194667 : }
# 66 : 194667 : } catch (...) {
# 67 : 0 : --nThreadsServicingQueue;
# 68 : 0 : throw;
# 69 : 0 : }
# 70 : 195678 : }
# 71 : 1028 : --nThreadsServicingQueue;
# 72 : 1028 : newTaskScheduled.notify_one();
# 73 : 1028 : }
# 74 : :
# 75 : : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
# 76 : 201239 : {
# 77 : 201239 : {
# 78 : 201239 : LOCK(newTaskMutex);
# 79 : 201239 : taskQueue.insert(std::make_pair(t, f));
# 80 : 201239 : }
# 81 : 201239 : newTaskScheduled.notify_one();
# 82 : 201239 : }
# 83 : :
# 84 : : void CScheduler::MockForward(std::chrono::seconds delta_seconds)
# 85 : 11 : {
# 86 : 11 : assert(delta_seconds > 0s && delta_seconds <= 1h);
# 87 : :
# 88 : 0 : {
# 89 : 11 : LOCK(newTaskMutex);
# 90 : :
# 91 : : // use temp_queue to maintain updated schedule
# 92 : 11 : std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
# 93 : :
# 94 [ + + ]: 66 : for (const auto& element : taskQueue) {
# 95 : 66 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
# 96 : 66 : }
# 97 : :
# 98 : : // point taskQueue to temp_queue
# 99 : 11 : taskQueue = std::move(temp_queue);
# 100 : 11 : }
# 101 : :
# 102 : : // notify that the taskQueue needs to be processed
# 103 : 11 : newTaskScheduled.notify_one();
# 104 : 11 : }
# 105 : :
# 106 : : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
# 107 : 31991 : {
# 108 : 31991 : f();
# 109 : 31991 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
# 110 : 31991 : }
# 111 : :
# 112 : : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
# 113 : 4379 : {
# 114 : 4379 : scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta);
# 115 : 4379 : }
# 116 : :
# 117 : : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
# 118 : : std::chrono::system_clock::time_point& last) const
# 119 : 8 : {
# 120 : 8 : LOCK(newTaskMutex);
# 121 : 8 : size_t result = taskQueue.size();
# 122 [ + + ]: 8 : if (!taskQueue.empty()) {
# 123 : 6 : first = taskQueue.begin()->first;
# 124 : 6 : last = taskQueue.rbegin()->first;
# 125 : 6 : }
# 126 : 8 : return result;
# 127 : 8 : }
# 128 : :
# 129 : : bool CScheduler::AreThreadsServicingQueue() const
# 130 : 996 : {
# 131 : 996 : LOCK(newTaskMutex);
# 132 : 996 : return nThreadsServicingQueue;
# 133 : 996 : }
# 134 : :
# 135 : :
# 136 : : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
# 137 : 322361 : {
# 138 : 322361 : {
# 139 : 322361 : LOCK(m_callbacks_mutex);
# 140 : : // Try to avoid scheduling too many copies here, but if we
# 141 : : // accidentally have two ProcessQueue's scheduled at once its
# 142 : : // not a big deal.
# 143 [ + + ]: 322361 : if (m_are_callbacks_running) return;
# 144 [ + + ]: 252057 : if (m_callbacks_pending.empty()) return;
# 145 : 252057 : }
# 146 : 163339 : m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
# 147 : 163339 : }
# 148 : :
# 149 : : void SingleThreadedSchedulerClient::ProcessQueue()
# 150 : 162865 : {
# 151 : 162865 : std::function<void()> callback;
# 152 : 162865 : {
# 153 : 162865 : LOCK(m_callbacks_mutex);
# 154 [ + + ]: 162865 : if (m_are_callbacks_running) return;
# 155 [ + + ]: 162859 : if (m_callbacks_pending.empty()) return;
# 156 : 160814 : m_are_callbacks_running = true;
# 157 : :
# 158 : 160814 : callback = std::move(m_callbacks_pending.front());
# 159 : 160814 : m_callbacks_pending.pop_front();
# 160 : 160814 : }
# 161 : :
# 162 : : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
# 163 : : // to ensure both happen safely even if callback() throws.
# 164 : 0 : struct RAIICallbacksRunning {
# 165 : 160814 : SingleThreadedSchedulerClient* instance;
# 166 : 160814 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
# 167 : 160814 : ~RAIICallbacksRunning()
# 168 : 160814 : {
# 169 : 160814 : {
# 170 : 160814 : LOCK(instance->m_callbacks_mutex);
# 171 : 160814 : instance->m_are_callbacks_running = false;
# 172 : 160814 : }
# 173 : 160814 : instance->MaybeScheduleProcessQueue();
# 174 : 160814 : }
# 175 : 160814 : } raiicallbacksrunning(this);
# 176 : :
# 177 : 160814 : callback();
# 178 : 160814 : }
# 179 : :
# 180 : : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
# 181 : 161547 : {
# 182 : 161547 : assert(m_pscheduler);
# 183 : :
# 184 : 0 : {
# 185 : 161547 : LOCK(m_callbacks_mutex);
# 186 : 161547 : m_callbacks_pending.emplace_back(std::move(func));
# 187 : 161547 : }
# 188 : 161547 : MaybeScheduleProcessQueue();
# 189 : 161547 : }
# 190 : :
# 191 : : void SingleThreadedSchedulerClient::EmptyQueue()
# 192 : 996 : {
# 193 : 996 : assert(!m_pscheduler->AreThreadsServicingQueue());
# 194 : 0 : bool should_continue = true;
# 195 [ + + ]: 1994 : while (should_continue) {
# 196 : 998 : ProcessQueue();
# 197 : 998 : LOCK(m_callbacks_mutex);
# 198 : 998 : should_continue = !m_callbacks_pending.empty();
# 199 : 998 : }
# 200 : 996 : }
# 201 : :
# 202 : : size_t SingleThreadedSchedulerClient::CallbacksPending()
# 203 : 83109 : {
# 204 : 83109 : LOCK(m_callbacks_mutex);
# 205 : 83109 : return m_callbacks_pending.size();
# 206 : 83109 : }
|