LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: coverage.lcov Lines: 118 119 99.2 %
Date: 2022-04-21 14:51:19 Functions: 79 79 100.0 %
Legend: Modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed

Not modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed
Branches: 242 296 81.8 %

           Branch data     Line data    Source code
#       1                 :            : // Copyright (c) 2012-2021 The Bitcoin Core developers
#       2                 :            : // Distributed under the MIT software license, see the accompanying
#       3                 :            : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#       4                 :            : 
#       5                 :            : #ifndef BITCOIN_CHECKQUEUE_H
#       6                 :            : #define BITCOIN_CHECKQUEUE_H
#       7                 :            : 
#       8                 :            : #include <sync.h>
#       9                 :            : #include <tinyformat.h>
#      10                 :            : #include <util/syscall_sandbox.h>
#      11                 :            : #include <util/threadnames.h>
#      12                 :            : 
#      13                 :            : #include <algorithm>
#      14                 :            : #include <vector>
#      15                 :            : 
#      16                 :            : template <typename T>
#      17                 :            : class CCheckQueueControl;
#      18                 :            : 
#      19                 :            : /**
#      20                 :            :  * Queue for verifications that have to be performed.
#      21                 :            :   * The verifications are represented by a type T, which must provide an
#      22                 :            :   * operator(), returning a bool.
#      23                 :            :   *
#      24                 :            :   * One thread (the master) is assumed to push batches of verifications
#      25                 :            :   * onto the queue, where they are processed by N-1 worker threads. When
#      26                 :            :   * the master is done adding work, it temporarily joins the worker pool
#      27                 :            :   * as an N'th worker, until all jobs are done.
#      28                 :            :   */
#      29                 :            : template <typename T>
#      30                 :            : class CCheckQueue
#      31                 :            : {
#      32                 :            : private:
#      33                 :            :     //! Mutex to protect the inner state
#      34                 :            :     Mutex m_mutex;
#      35                 :            : 
#      36                 :            :     //! Worker threads block on this when out of work
#      37                 :            :     std::condition_variable m_worker_cv;
#      38                 :            : 
#      39                 :            :     //! Master thread blocks on this when out of work
#      40                 :            :     std::condition_variable m_master_cv;
#      41                 :            : 
#      42                 :            :     //! The queue of elements to be processed.
#      43                 :            :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
#      44                 :            :     std::vector<T> queue GUARDED_BY(m_mutex);
#      45                 :            : 
#      46                 :            :     //! The number of workers (including the master) that are idle.
#      47                 :            :     int nIdle GUARDED_BY(m_mutex){0};
#      48                 :            : 
#      49                 :            :     //! The total number of workers (including the master).
#      50                 :            :     int nTotal GUARDED_BY(m_mutex){0};
#      51                 :            : 
#      52                 :            :     //! The temporary evaluation result.
#      53                 :            :     bool fAllOk GUARDED_BY(m_mutex){true};
#      54                 :            : 
#      55                 :            :     /**
#      56                 :            :      * Number of verifications that haven't completed yet.
#      57                 :            :      * This includes elements that are no longer queued, but still in the
#      58                 :            :      * worker's own batches.
#      59                 :            :      */
#      60                 :            :     unsigned int nTodo GUARDED_BY(m_mutex){0};
#      61                 :            : 
#      62                 :            :     //! The maximum number of elements to be processed in one batch
#      63                 :            :     const unsigned int nBatchSize;
#      64                 :            : 
#      65                 :            :     std::vector<std::thread> m_worker_threads;
#      66                 :            :     bool m_request_stop GUARDED_BY(m_mutex){false};
#      67                 :            : 
#      68                 :            :     /** Internal function that does bulk of the verification work. */
#      69                 :            :     bool Loop(bool fMaster)
#      70                 :      95441 :     {
#      71 [ +  + ][ +  + ]:      95441 :         std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
#         [ +  + ][ +  + ]
#         [ +  - ][ +  + ]
#                 [ +  + ]
#      72                 :      95441 :         std::vector<T> vChecks;
#      73                 :      95441 :         vChecks.reserve(nBatchSize);
#      74                 :      95441 :         unsigned int nNow = 0;
#      75                 :      95441 :         bool fOk = true;
#      76                 :    2383166 :         do {
#      77                 :    2383166 :             {
#      78                 :    2383166 :                 WAIT_LOCK(m_mutex, lock);
#      79                 :            :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
#      80 [ +  + ][ +  + ]:    2383166 :                 if (nNow) {
#         [ +  + ][ +  + ]
#         [ +  + ][ -  + ]
#                 [ +  + ]
#      81                 :    2287903 :                     fAllOk &= fOk;
#      82                 :    2287903 :                     nTodo -= nNow;
#      83 [ +  - ][ +  + ]:    2287903 :                     if (nTodo == 0 && !fMaster)
#         [ +  + ][ +  + ]
#         [ +  + ][ #  # ]
#         [ +  + ][ +  - ]
#         [ +  - ][ +  + ]
#         [ #  # ][ +  + ]
#         [ +  + ][ +  + ]
#      84                 :            :                         // We processed the last element; inform the master it can exit and return the result
#      85                 :      23231 :                         m_master_cv.notify_one();
#      86                 :    2287903 :                 } else {
#      87                 :            :                     // first iteration
#      88                 :      95263 :                     nTotal++;
#      89                 :      95263 :                 }
#      90                 :            :                 // logically, the do loop starts here
#      91 [ +  + ][ +  + ]:    2457307 :                 while (queue.empty() && !m_request_stop) {
#         [ +  + ][ +  - ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  - ]
#         [ +  + ][ +  + ]
#      92 [ +  + ][ +  + ]:     163594 :                     if (fMaster && nTodo == 0) {
#         [ +  + ][ +  + ]
#         [ +  + ][ +  - ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#         [ +  + ][ +  - ]
#      93                 :      89453 :                         nTotal--;
#      94                 :      89453 :                         bool fRet = fAllOk;
#      95                 :            :                         // reset the status for new work later
#      96                 :      89453 :                         fAllOk = true;
#      97                 :            :                         // return the current status
#      98                 :      89453 :                         return fRet;
#      99                 :      89453 :                     }
#     100                 :      74141 :                     nIdle++;
#     101                 :      74141 :                     cond.wait(lock); // wait
#     102                 :      74141 :                     nIdle--;
#     103                 :      74141 :                 }
#     104 [ #  # ][ +  + ]:    2293713 :                 if (m_request_stop) {
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#                 [ +  + ]
#     105                 :       5990 :                     return false;
#     106                 :       5990 :                 }
#     107                 :            : 
#     108                 :            :                 // Decide how many work units to process now.
#     109                 :            :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
#     110                 :            :                 //   all workers finish approximately simultaneously.
#     111                 :            :                 // * Try to account for idle jobs which will instantly start helping.
#     112                 :            :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
#     113                 :    2287723 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
#     114                 :    2287723 :                 vChecks.resize(nNow);
#     115 [ +  + ][ +  + ]:   25509771 :                 for (unsigned int i = 0; i < nNow; i++) {
#         [ +  + ][ +  + ]
#         [ +  + ][ #  # ]
#                 [ +  + ]
#     116                 :            :                     // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
#     117                 :            :                     // queue to the local batch vector instead of copying.
#     118                 :   23222048 :                     vChecks[i].swap(queue.back());
#     119                 :   23222048 :                     queue.pop_back();
#     120                 :   23222048 :                 }
#     121                 :            :                 // Check whether we need to do work at all
#     122                 :    2287723 :                 fOk = fAllOk;
#     123                 :    2287723 :             }
#     124                 :            :             // execute work
#     125 [ +  + ][ +  + ]:          0 :             for (T& check : vChecks)
#         [ +  + ][ +  + ]
#         [ #  # ][ +  + ]
#                 [ +  + ]
#     126 [ +  + ][ +  - ]:   23221925 :                 if (fOk)
#         [ #  # ][ +  + ]
#         [ +  + ][ +  - ]
#                 [ +  + ]
#     127                 :   23217868 :                     fOk = check();
#     128                 :    2287723 :             vChecks.clear();
#     129                 :    2287723 :         } while (true);
#     130                 :      95441 :     }
#     131                 :            : 
#     132                 :            : public:
#     133                 :            :     //! Mutex to ensure only one concurrent CCheckQueueControl
#     134                 :            :     Mutex m_control_mutex;
#     135                 :            : 
#     136                 :            :     //! Create a new check queue
#     137                 :            :     explicit CCheckQueue(unsigned int nBatchSizeIn)
#     138                 :            :         : nBatchSize(nBatchSizeIn)
#     139                 :        850 :     {
#     140                 :        850 :     }
#     141                 :            : 
#     142                 :            :     //! Create a pool of new worker threads.
#     143                 :            :     void StartWorkerThreads(const int threads_num)
#     144                 :       1008 :     {
#     145                 :       1008 :         {
#     146                 :       1008 :             LOCK(m_mutex);
#     147                 :       1008 :             nIdle = 0;
#     148                 :       1008 :             nTotal = 0;
#     149                 :       1008 :             fAllOk = true;
#     150                 :       1008 :         }
#     151                 :       1008 :         assert(m_worker_threads.empty());
#     152 [ +  + ][ +  + ]:       6998 :         for (int n = 0; n < threads_num; ++n) {
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#     153                 :       5990 :             m_worker_threads.emplace_back([this, n]() {
#     154                 :       5990 :                 util::ThreadRename(strprintf("scriptch.%i", n));
#     155                 :       5990 :                 SetSyscallSandboxPolicy(SyscallSandboxPolicy::VALIDATION_SCRIPT_CHECK);
#     156                 :       5990 :                 Loop(false /* worker thread */);
#     157                 :       5990 :             });
#     158                 :       5990 :         }
#     159                 :       1008 :     }
#     160                 :            : 
#     161                 :            :     //! Wait until execution finishes, and return whether all evaluations were successful.
#     162                 :            :     bool Wait()
#     163                 :      89453 :     {
#     164                 :      89453 :         return Loop(true /* master thread */);
#     165                 :      89453 :     }
#     166                 :            : 
#     167                 :            :     //! Add a batch of checks to the queue
#     168                 :            :     void Add(std::vector<T>& vChecks)
#     169                 :    5212243 :     {
#     170 [ +  + ][ +  + ]:    5212243 :         if (vChecks.empty()) {
#         [ +  + ][ +  + ]
#         [ -  + ][ +  + ]
#     171                 :     538563 :             return;
#     172                 :     538563 :         }
#     173                 :            : 
#     174                 :    4673680 :         {
#     175                 :    4673680 :             LOCK(m_mutex);
#     176 [ +  + ][ +  + ]:   23222048 :             for (T& check : vChecks) {
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#     177                 :   23222048 :                 queue.emplace_back();
#     178                 :   23222048 :                 check.swap(queue.back());
#     179                 :   23222048 :             }
#     180                 :    4673680 :             nTodo += vChecks.size();
#     181                 :    4673680 :         }
#     182                 :            : 
#     183 [ +  + ][ +  + ]:    4673680 :         if (vChecks.size() == 1) {
#         [ +  + ][ +  + ]
#         [ +  - ][ +  + ]
#     184                 :     544615 :             m_worker_cv.notify_one();
#     185                 :    4129065 :         } else {
#     186                 :    4129065 :             m_worker_cv.notify_all();
#     187                 :    4129065 :         }
#     188                 :    4673680 :     }
#     189                 :            : 
#     190                 :            :     //! Stop all of the worker threads.
#     191                 :            :     void StopWorkerThreads()
#     192                 :       1018 :     {
#     193                 :       1018 :         WITH_LOCK(m_mutex, m_request_stop = true);
#     194                 :       1018 :         m_worker_cv.notify_all();
#     195 [ +  + ][ +  + ]:       5990 :         for (std::thread& t : m_worker_threads) {
#         [ +  + ][ +  + ]
#         [ +  + ][ +  + ]
#     196                 :       5990 :             t.join();
#     197                 :       5990 :         }
#     198                 :       1018 :         m_worker_threads.clear();
#     199                 :       1018 :         WITH_LOCK(m_mutex, m_request_stop = false);
#     200                 :       1018 :     }
#     201                 :            : 
#     202                 :            :     ~CCheckQueue()
#     203                 :         22 :     {
#     204                 :         22 :         assert(m_worker_threads.empty());
#     205                 :         22 :     }
#     206                 :            : 
#     207                 :            : };
#     208                 :            : 
#     209                 :            : /**
#     210                 :            :  * RAII-style controller object for a CCheckQueue that guarantees the passed
#     211                 :            :  * queue is finished before continuing.
#     212                 :            :  */
#     213                 :            : template <typename T>
#     214                 :            : class CCheckQueueControl
#     215                 :            : {
#     216                 :            : private:
#     217                 :            :     CCheckQueue<T> * const pqueue;
#     218                 :            :     bool fDone;
#     219                 :            : 
#     220                 :            : public:
#     221                 :            :     CCheckQueueControl() = delete;
#     222                 :            :     CCheckQueueControl(const CCheckQueueControl&) = delete;
#     223                 :            :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
#     224                 :            :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
#     225                 :      98451 :     {
#     226                 :            :         // passed queue is supposed to be unused, or nullptr
#     227 [ +  - ][ +  - ]:      98451 :         if (pqueue != nullptr) {
#         [ +  - ][ +  + ]
#         [ +  - ][ +  - ]
#                 [ +  - ]
#     228                 :      89453 :             ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
#     229                 :      89453 :         }
#     230                 :      98451 :     }
#     231                 :            : 
#     232                 :            :     bool Wait()
#     233                 :     103502 :     {
#     234 [ +  + ][ -  + ]:     103502 :         if (pqueue == nullptr)
#         [ -  + ][ -  + ]
#         [ -  + ][ -  + ]
#                 [ -  + ]
#     235                 :      14049 :             return true;
#     236                 :      89453 :         bool fRet = pqueue->Wait();
#     237                 :      89453 :         fDone = true;
#     238                 :      89453 :         return fRet;
#     239                 :     103502 :     }
#     240                 :            : 
#     241                 :            :     void Add(std::vector<T>& vChecks)
#     242                 :    5216123 :     {
#     243 [ +  - ][ +  - ]:    5216123 :         if (pqueue != nullptr)
#         [ +  + ][ +  - ]
#         [ +  - ][ +  - ]
#     244                 :    5212243 :             pqueue->Add(vChecks);
#     245                 :    5216123 :     }
#     246                 :            : 
#     247                 :            :     ~CCheckQueueControl()
#     248                 :      98451 :     {
#     249 [ +  - ][ -  + ]:      98451 :         if (!fDone)
#         [ -  + ][ +  - ]
#         [ +  + ][ -  + ]
#                 [ +  - ]
#     250                 :      11044 :             Wait();
#     251 [ +  - ][ +  - ]:      98451 :         if (pqueue != nullptr) {
#         [ +  + ][ +  - ]
#         [ +  - ][ +  - ]
#                 [ +  - ]
#     252                 :      89453 :             LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
#     253                 :      89453 :         }
#     254                 :      98451 :     }
#     255                 :            : };
#     256                 :            : 
#     257                 :            : #endif // BITCOIN_CHECKQUEUE_H

Generated by: LCOV version 0-eol-96201-ge66f56f4af6a