LCOV - code coverage report
Current view: top level - src - scheduler.cpp (source / functions) Hit Total Coverage
Test: coverage.lcov Lines: 122 129 94.6 %
Date: 2022-04-21 14:51:19 Functions: 18 18 100.0 %
Legend: Modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed

Not modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed
Branches: 30 32 93.8 %

           Branch data     Line data    Source code
#       1                 :            : // Copyright (c) 2015-2021 The Bitcoin Core developers
#       2                 :            : // Distributed under the MIT software license, see the accompanying
#       3                 :            : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#       4                 :            : 
#       5                 :            : #include <scheduler.h>
#       6                 :            : 
#       7                 :            : #include <random.h>
#       8                 :            : #include <util/syscall_sandbox.h>
#       9                 :            : #include <util/time.h>
#      10                 :            : 
#      11                 :            : #include <assert.h>
#      12                 :            : #include <functional>
#      13                 :            : #include <utility>
#      14                 :            : 
#      15                 :            : CScheduler::CScheduler()
#      16                 :       1002 : {
#      17                 :       1002 : }
#      18                 :            : 
#      19                 :            : CScheduler::~CScheduler()
#      20                 :       1002 : {
#      21                 :       1002 :     assert(nThreadsServicingQueue == 0);
#      22         [ +  + ]:       1002 :     if (stopWhenEmpty) assert(taskQueue.empty());
#      23                 :       1002 : }
#      24                 :            : 
#      25                 :            : 
#      26                 :            : void CScheduler::serviceQueue()
#      27                 :       1028 : {
#      28                 :       1028 :     SetSyscallSandboxPolicy(SyscallSandboxPolicy::SCHEDULER);
#      29                 :       1028 :     WAIT_LOCK(newTaskMutex, lock);
#      30                 :       1028 :     ++nThreadsServicingQueue;
#      31                 :            : 
#      32                 :            :     // newTaskMutex is locked throughout this loop EXCEPT
#      33                 :            :     // when the thread is waiting or when the user's function
#      34                 :            :     // is called.
#      35         [ +  + ]:     196706 :     while (!shouldStop()) {
#      36                 :     195678 :         try {
#      37 [ +  + ][ +  + ]:     203843 :             while (!shouldStop() && taskQueue.empty()) {
#      38                 :            :                 // Wait until there is something to do.
#      39                 :       8165 :                 newTaskScheduled.wait(lock);
#      40                 :       8165 :             }
#      41                 :            : 
#      42                 :            :             // Wait until either there is a new task, or until
#      43                 :            :             // the time of the first item on the queue:
#      44                 :            : 
#      45 [ +  + ][ +  - ]:     280696 :             while (!shouldStop() && !taskQueue.empty()) {
#      46                 :     279687 :                 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
#      47         [ +  + ]:     279687 :                 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
#      48                 :     194669 :                     break; // Exit loop after timeout, it means we reached the time of the event
#      49                 :     194669 :                 }
#      50                 :     279687 :             }
#      51                 :            : 
#      52                 :            :             // If there are multiple threads, the queue can empty while we're waiting (another
#      53                 :            :             // thread may service the task we were waiting on).
#      54 [ +  + ][ -  + ]:     195678 :             if (shouldStop() || taskQueue.empty())
#      55                 :       1011 :                 continue;
#      56                 :            : 
#      57                 :     194667 :             Function f = taskQueue.begin()->second;
#      58                 :     194667 :             taskQueue.erase(taskQueue.begin());
#      59                 :            : 
#      60                 :     194667 :             {
#      61                 :            :                 // Unlock before calling f, so it can reschedule itself or another task
#      62                 :            :                 // without deadlocking:
#      63                 :     194667 :                 REVERSE_LOCK(lock);
#      64                 :     194667 :                 f();
#      65                 :     194667 :             }
#      66                 :     194667 :         } catch (...) {
#      67                 :          0 :             --nThreadsServicingQueue;
#      68                 :          0 :             throw;
#      69                 :          0 :         }
#      70                 :     195678 :     }
#      71                 :       1028 :     --nThreadsServicingQueue;
#      72                 :       1028 :     newTaskScheduled.notify_one();
#      73                 :       1028 : }
#      74                 :            : 
#      75                 :            : void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
#      76                 :     201239 : {
#      77                 :     201239 :     {
#      78                 :     201239 :         LOCK(newTaskMutex);
#      79                 :     201239 :         taskQueue.insert(std::make_pair(t, f));
#      80                 :     201239 :     }
#      81                 :     201239 :     newTaskScheduled.notify_one();
#      82                 :     201239 : }
#      83                 :            : 
#      84                 :            : void CScheduler::MockForward(std::chrono::seconds delta_seconds)
#      85                 :         11 : {
#      86                 :         11 :     assert(delta_seconds > 0s && delta_seconds <= 1h);
#      87                 :            : 
#      88                 :          0 :     {
#      89                 :         11 :         LOCK(newTaskMutex);
#      90                 :            : 
#      91                 :            :         // use temp_queue to maintain updated schedule
#      92                 :         11 :         std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
#      93                 :            : 
#      94         [ +  + ]:         66 :         for (const auto& element : taskQueue) {
#      95                 :         66 :             temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
#      96                 :         66 :         }
#      97                 :            : 
#      98                 :            :         // point taskQueue to temp_queue
#      99                 :         11 :         taskQueue = std::move(temp_queue);
#     100                 :         11 :     }
#     101                 :            : 
#     102                 :            :     // notify that the taskQueue needs to be processed
#     103                 :         11 :     newTaskScheduled.notify_one();
#     104                 :         11 : }
#     105                 :            : 
#     106                 :            : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
#     107                 :      31991 : {
#     108                 :      31991 :     f();
#     109                 :      31991 :     s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
#     110                 :      31991 : }
#     111                 :            : 
#     112                 :            : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
#     113                 :       4379 : {
#     114                 :       4379 :     scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta);
#     115                 :       4379 : }
#     116                 :            : 
#     117                 :            : size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
#     118                 :            :                                 std::chrono::system_clock::time_point& last) const
#     119                 :          8 : {
#     120                 :          8 :     LOCK(newTaskMutex);
#     121                 :          8 :     size_t result = taskQueue.size();
#     122         [ +  + ]:          8 :     if (!taskQueue.empty()) {
#     123                 :          6 :         first = taskQueue.begin()->first;
#     124                 :          6 :         last = taskQueue.rbegin()->first;
#     125                 :          6 :     }
#     126                 :          8 :     return result;
#     127                 :          8 : }
#     128                 :            : 
#     129                 :            : bool CScheduler::AreThreadsServicingQueue() const
#     130                 :        996 : {
#     131                 :        996 :     LOCK(newTaskMutex);
#     132                 :        996 :     return nThreadsServicingQueue;
#     133                 :        996 : }
#     134                 :            : 
#     135                 :            : 
#     136                 :            : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
#     137                 :     322361 : {
#     138                 :     322361 :     {
#     139                 :     322361 :         LOCK(m_callbacks_mutex);
#     140                 :            :         // Try to avoid scheduling too many copies here, but if we
#     141                 :            :         // accidentally have two ProcessQueue's scheduled at once its
#     142                 :            :         // not a big deal.
#     143         [ +  + ]:     322361 :         if (m_are_callbacks_running) return;
#     144         [ +  + ]:     252057 :         if (m_callbacks_pending.empty()) return;
#     145                 :     252057 :     }
#     146                 :     163339 :     m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
#     147                 :     163339 : }
#     148                 :            : 
#     149                 :            : void SingleThreadedSchedulerClient::ProcessQueue()
#     150                 :     162865 : {
#     151                 :     162865 :     std::function<void()> callback;
#     152                 :     162865 :     {
#     153                 :     162865 :         LOCK(m_callbacks_mutex);
#     154         [ +  + ]:     162865 :         if (m_are_callbacks_running) return;
#     155         [ +  + ]:     162859 :         if (m_callbacks_pending.empty()) return;
#     156                 :     160814 :         m_are_callbacks_running = true;
#     157                 :            : 
#     158                 :     160814 :         callback = std::move(m_callbacks_pending.front());
#     159                 :     160814 :         m_callbacks_pending.pop_front();
#     160                 :     160814 :     }
#     161                 :            : 
#     162                 :            :     // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
#     163                 :            :     // to ensure both happen safely even if callback() throws.
#     164                 :          0 :     struct RAIICallbacksRunning {
#     165                 :     160814 :         SingleThreadedSchedulerClient* instance;
#     166                 :     160814 :         explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
#     167                 :     160814 :         ~RAIICallbacksRunning()
#     168                 :     160814 :         {
#     169                 :     160814 :             {
#     170                 :     160814 :                 LOCK(instance->m_callbacks_mutex);
#     171                 :     160814 :                 instance->m_are_callbacks_running = false;
#     172                 :     160814 :             }
#     173                 :     160814 :             instance->MaybeScheduleProcessQueue();
#     174                 :     160814 :         }
#     175                 :     160814 :     } raiicallbacksrunning(this);
#     176                 :            : 
#     177                 :     160814 :     callback();
#     178                 :     160814 : }
#     179                 :            : 
#     180                 :            : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
#     181                 :     161547 : {
#     182                 :     161547 :     assert(m_pscheduler);
#     183                 :            : 
#     184                 :          0 :     {
#     185                 :     161547 :         LOCK(m_callbacks_mutex);
#     186                 :     161547 :         m_callbacks_pending.emplace_back(std::move(func));
#     187                 :     161547 :     }
#     188                 :     161547 :     MaybeScheduleProcessQueue();
#     189                 :     161547 : }
#     190                 :            : 
#     191                 :            : void SingleThreadedSchedulerClient::EmptyQueue()
#     192                 :        996 : {
#     193                 :        996 :     assert(!m_pscheduler->AreThreadsServicingQueue());
#     194                 :          0 :     bool should_continue = true;
#     195         [ +  + ]:       1994 :     while (should_continue) {
#     196                 :        998 :         ProcessQueue();
#     197                 :        998 :         LOCK(m_callbacks_mutex);
#     198                 :        998 :         should_continue = !m_callbacks_pending.empty();
#     199                 :        998 :     }
#     200                 :        996 : }
#     201                 :            : 
#     202                 :            : size_t SingleThreadedSchedulerClient::CallbacksPending()
#     203                 :      83109 : {
#     204                 :      83109 :     LOCK(m_callbacks_mutex);
#     205                 :      83109 :     return m_callbacks_pending.size();
#     206                 :      83109 : }

Generated by: LCOV version 0-eol-96201-ge66f56f4af6a