Branch data Line data Source code
# 1 : : // Copyright (c) 2012-2021 The Bitcoin Core developers
# 2 : : // Distributed under the MIT software license, see the accompanying
# 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
# 4 : :
# 5 : : #ifndef BITCOIN_CHECKQUEUE_H
# 6 : : #define BITCOIN_CHECKQUEUE_H
# 7 : :
# 8 : : #include <sync.h>
# 9 : : #include <tinyformat.h>
# 10 : : #include <util/syscall_sandbox.h>
# 11 : : #include <util/threadnames.h>
# 12 : :
# 13 : : #include <algorithm>
# 14 : : #include <vector>
# 15 : :
# 16 : : template <typename T>
# 17 : : class CCheckQueueControl;
# 18 : :
# 19 : : /**
# 20 : : * Queue for verifications that have to be performed.
# 21 : : * The verifications are represented by a type T, which must provide an
# 22 : : * operator(), returning a bool.
# 23 : : *
# 24 : : * One thread (the master) is assumed to push batches of verifications
# 25 : : * onto the queue, where they are processed by N-1 worker threads. When
# 26 : : * the master is done adding work, it temporarily joins the worker pool
# 27 : : * as an N'th worker, until all jobs are done.
# 28 : : */
# 29 : : template <typename T>
# 30 : : class CCheckQueue
# 31 : : {
# 32 : : private:
# 33 : : //! Mutex to protect the inner state
# 34 : : Mutex m_mutex;
# 35 : :
# 36 : : //! Worker threads block on this when out of work
# 37 : : std::condition_variable m_worker_cv;
# 38 : :
# 39 : : //! Master thread blocks on this when out of work
# 40 : : std::condition_variable m_master_cv;
# 41 : :
# 42 : : //! The queue of elements to be processed.
# 43 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
# 44 : : std::vector<T> queue GUARDED_BY(m_mutex);
# 45 : :
# 46 : : //! The number of workers (including the master) that are idle.
# 47 : : int nIdle GUARDED_BY(m_mutex){0};
# 48 : :
# 49 : : //! The total number of workers (including the master).
# 50 : : int nTotal GUARDED_BY(m_mutex){0};
# 51 : :
# 52 : : //! The temporary evaluation result.
# 53 : : bool fAllOk GUARDED_BY(m_mutex){true};
# 54 : :
# 55 : : /**
# 56 : : * Number of verifications that haven't completed yet.
# 57 : : * This includes elements that are no longer queued, but still in the
# 58 : : * worker's own batches.
# 59 : : */
# 60 : : unsigned int nTodo GUARDED_BY(m_mutex){0};
# 61 : :
# 62 : : //! The maximum number of elements to be processed in one batch
# 63 : : const unsigned int nBatchSize;
# 64 : :
# 65 : : std::vector<std::thread> m_worker_threads;
# 66 : : bool m_request_stop GUARDED_BY(m_mutex){false};
# 67 : :
# 68 : : /** Internal function that does bulk of the verification work. */
# 69 : : bool Loop(bool fMaster)
# 70 : 95441 : {
# 71 [ + + ][ + + ]: 95441 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
# [ + + ][ + + ]
# [ + - ][ + + ]
# [ + + ]
# 72 : 95441 : std::vector<T> vChecks;
# 73 : 95441 : vChecks.reserve(nBatchSize);
# 74 : 95441 : unsigned int nNow = 0;
# 75 : 95441 : bool fOk = true;
# 76 : 2383166 : do {
# 77 : 2383166 : {
# 78 : 2383166 : WAIT_LOCK(m_mutex, lock);
# 79 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
# 80 [ + + ][ + + ]: 2383166 : if (nNow) {
# [ + + ][ + + ]
# [ + + ][ - + ]
# [ + + ]
# 81 : 2287903 : fAllOk &= fOk;
# 82 : 2287903 : nTodo -= nNow;
# 83 [ + - ][ + + ]: 2287903 : if (nTodo == 0 && !fMaster)
# [ + + ][ + + ]
# [ + + ][ # # ]
# [ + + ][ + - ]
# [ + - ][ + + ]
# [ # # ][ + + ]
# [ + + ][ + + ]
# 84 : : // We processed the last element; inform the master it can exit and return the result
# 85 : 23231 : m_master_cv.notify_one();
# 86 : 2287903 : } else {
# 87 : : // first iteration
# 88 : 95263 : nTotal++;
# 89 : 95263 : }
# 90 : : // logically, the do loop starts here
# 91 [ + + ][ + + ]: 2457307 : while (queue.empty() && !m_request_stop) {
# [ + + ][ + - ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + - ]
# [ + + ][ + + ]
# 92 [ + + ][ + + ]: 163594 : if (fMaster && nTodo == 0) {
# [ + + ][ + + ]
# [ + + ][ + - ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + - ]
# 93 : 89453 : nTotal--;
# 94 : 89453 : bool fRet = fAllOk;
# 95 : : // reset the status for new work later
# 96 : 89453 : fAllOk = true;
# 97 : : // return the current status
# 98 : 89453 : return fRet;
# 99 : 89453 : }
# 100 : 74141 : nIdle++;
# 101 : 74141 : cond.wait(lock); // wait
# 102 : 74141 : nIdle--;
# 103 : 74141 : }
# 104 [ # # ][ + + ]: 2293713 : if (m_request_stop) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ]
# 105 : 5990 : return false;
# 106 : 5990 : }
# 107 : :
# 108 : : // Decide how many work units to process now.
# 109 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so
# 110 : : // all workers finish approximately simultaneously.
# 111 : : // * Try to account for idle jobs which will instantly start helping.
# 112 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
# 113 : 2287723 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
# 114 : 2287723 : vChecks.resize(nNow);
# 115 [ + + ][ + + ]: 25509771 : for (unsigned int i = 0; i < nNow; i++) {
# [ + + ][ + + ]
# [ + + ][ # # ]
# [ + + ]
# 116 : : // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
# 117 : : // queue to the local batch vector instead of copying.
# 118 : 23222048 : vChecks[i].swap(queue.back());
# 119 : 23222048 : queue.pop_back();
# 120 : 23222048 : }
# 121 : : // Check whether we need to do work at all
# 122 : 2287723 : fOk = fAllOk;
# 123 : 2287723 : }
# 124 : : // execute work
# 125 [ + + ][ + + ]: 0 : for (T& check : vChecks)
# [ + + ][ + + ]
# [ # # ][ + + ]
# [ + + ]
# 126 [ + + ][ + - ]: 23221925 : if (fOk)
# [ # # ][ + + ]
# [ + + ][ + - ]
# [ + + ]
# 127 : 23217868 : fOk = check();
# 128 : 2287723 : vChecks.clear();
# 129 : 2287723 : } while (true);
# 130 : 95441 : }
# 131 : :
# 132 : : public:
# 133 : : //! Mutex to ensure only one concurrent CCheckQueueControl
# 134 : : Mutex m_control_mutex;
# 135 : :
# 136 : : //! Create a new check queue
# 137 : : explicit CCheckQueue(unsigned int nBatchSizeIn)
# 138 : : : nBatchSize(nBatchSizeIn)
# 139 : 850 : {
# 140 : 850 : }
# 141 : :
# 142 : : //! Create a pool of new worker threads.
# 143 : : void StartWorkerThreads(const int threads_num)
# 144 : 1008 : {
# 145 : 1008 : {
# 146 : 1008 : LOCK(m_mutex);
# 147 : 1008 : nIdle = 0;
# 148 : 1008 : nTotal = 0;
# 149 : 1008 : fAllOk = true;
# 150 : 1008 : }
# 151 : 1008 : assert(m_worker_threads.empty());
# 152 [ + + ][ + + ]: 6998 : for (int n = 0; n < threads_num; ++n) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 153 : 5990 : m_worker_threads.emplace_back([this, n]() {
# 154 : 5990 : util::ThreadRename(strprintf("scriptch.%i", n));
# 155 : 5990 : SetSyscallSandboxPolicy(SyscallSandboxPolicy::VALIDATION_SCRIPT_CHECK);
# 156 : 5990 : Loop(false /* worker thread */);
# 157 : 5990 : });
# 158 : 5990 : }
# 159 : 1008 : }
# 160 : :
# 161 : : //! Wait until execution finishes, and return whether all evaluations were successful.
# 162 : : bool Wait()
# 163 : 89453 : {
# 164 : 89453 : return Loop(true /* master thread */);
# 165 : 89453 : }
# 166 : :
# 167 : : //! Add a batch of checks to the queue
# 168 : : void Add(std::vector<T>& vChecks)
# 169 : 5212243 : {
# 170 [ + + ][ + + ]: 5212243 : if (vChecks.empty()) {
# [ + + ][ + + ]
# [ - + ][ + + ]
# 171 : 538563 : return;
# 172 : 538563 : }
# 173 : :
# 174 : 4673680 : {
# 175 : 4673680 : LOCK(m_mutex);
# 176 [ + + ][ + + ]: 23222048 : for (T& check : vChecks) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 177 : 23222048 : queue.emplace_back();
# 178 : 23222048 : check.swap(queue.back());
# 179 : 23222048 : }
# 180 : 4673680 : nTodo += vChecks.size();
# 181 : 4673680 : }
# 182 : :
# 183 [ + + ][ + + ]: 4673680 : if (vChecks.size() == 1) {
# [ + + ][ + + ]
# [ + - ][ + + ]
# 184 : 544615 : m_worker_cv.notify_one();
# 185 : 4129065 : } else {
# 186 : 4129065 : m_worker_cv.notify_all();
# 187 : 4129065 : }
# 188 : 4673680 : }
# 189 : :
# 190 : : //! Stop all of the worker threads.
# 191 : : void StopWorkerThreads()
# 192 : 1018 : {
# 193 : 1018 : WITH_LOCK(m_mutex, m_request_stop = true);
# 194 : 1018 : m_worker_cv.notify_all();
# 195 [ + + ][ + + ]: 5990 : for (std::thread& t : m_worker_threads) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 196 : 5990 : t.join();
# 197 : 5990 : }
# 198 : 1018 : m_worker_threads.clear();
# 199 : 1018 : WITH_LOCK(m_mutex, m_request_stop = false);
# 200 : 1018 : }
# 201 : :
# 202 : : ~CCheckQueue()
# 203 : 22 : {
# 204 : 22 : assert(m_worker_threads.empty());
# 205 : 22 : }
# 206 : :
# 207 : : };
# 208 : :
# 209 : : /**
# 210 : : * RAII-style controller object for a CCheckQueue that guarantees the passed
# 211 : : * queue is finished before continuing.
# 212 : : */
# 213 : : template <typename T>
# 214 : : class CCheckQueueControl
# 215 : : {
# 216 : : private:
# 217 : : CCheckQueue<T> * const pqueue;
# 218 : : bool fDone;
# 219 : :
# 220 : : public:
# 221 : : CCheckQueueControl() = delete;
# 222 : : CCheckQueueControl(const CCheckQueueControl&) = delete;
# 223 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
# 224 : : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
# 225 : 98451 : {
# 226 : : // passed queue is supposed to be unused, or nullptr
# 227 [ + - ][ + - ]: 98451 : if (pqueue != nullptr) {
# [ + - ][ + + ]
# [ + - ][ + - ]
# [ + - ]
# 228 : 89453 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
# 229 : 89453 : }
# 230 : 98451 : }
# 231 : :
# 232 : : bool Wait()
# 233 : 103502 : {
# 234 [ + + ][ - + ]: 103502 : if (pqueue == nullptr)
# [ - + ][ - + ]
# [ - + ][ - + ]
# [ - + ]
# 235 : 14049 : return true;
# 236 : 89453 : bool fRet = pqueue->Wait();
# 237 : 89453 : fDone = true;
# 238 : 89453 : return fRet;
# 239 : 103502 : }
# 240 : :
# 241 : : void Add(std::vector<T>& vChecks)
# 242 : 5216123 : {
# 243 [ + - ][ + - ]: 5216123 : if (pqueue != nullptr)
# [ + + ][ + - ]
# [ + - ][ + - ]
# 244 : 5212243 : pqueue->Add(vChecks);
# 245 : 5216123 : }
# 246 : :
# 247 : : ~CCheckQueueControl()
# 248 : 98451 : {
# 249 [ + - ][ - + ]: 98451 : if (!fDone)
# [ - + ][ + - ]
# [ + + ][ - + ]
# [ + - ]
# 250 : 11044 : Wait();
# 251 [ + - ][ + - ]: 98451 : if (pqueue != nullptr) {
# [ + + ][ + - ]
# [ + - ][ + - ]
# [ + - ]
# 252 : 89453 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
# 253 : 89453 : }
# 254 : 98451 : }
# 255 : : };
# 256 : :
# 257 : : #endif // BITCOIN_CHECKQUEUE_H
|