Branch data Line data Source code
# 1 : : // Copyright (c) 2012-2020 The Bitcoin Core developers
# 2 : : // Distributed under the MIT software license, see the accompanying
# 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
# 4 : :
# 5 : : #ifndef BITCOIN_CHECKQUEUE_H
# 6 : : #define BITCOIN_CHECKQUEUE_H
# 7 : :
# 8 : : #include <sync.h>
# 9 : : #include <tinyformat.h>
# 10 : : #include <util/threadnames.h>
# 11 : :
# 12 : : #include <algorithm>
# 13 : : #include <vector>
# 14 : :
# 15 : : template <typename T>
# 16 : : class CCheckQueueControl;
# 17 : :
# 18 : : /**
# 19 : : * Queue for verifications that have to be performed.
# 20 : : * The verifications are represented by a type T, which must provide an
# 21 : : * operator(), returning a bool.
# 22 : : *
# 23 : : * One thread (the master) is assumed to push batches of verifications
# 24 : : * onto the queue, where they are processed by N-1 worker threads. When
# 25 : : * the master is done adding work, it temporarily joins the worker pool
# 26 : : * as an N'th worker, until all jobs are done.
# 27 : : */
# 28 : : template <typename T>
# 29 : : class CCheckQueue
# 30 : : {
# 31 : : private:
# 32 : : //! Mutex to protect the inner state
# 33 : : Mutex m_mutex;
# 34 : :
# 35 : : //! Worker threads block on this when out of work
# 36 : : std::condition_variable m_worker_cv;
# 37 : :
# 38 : : //! Master thread blocks on this when out of work
# 39 : : std::condition_variable m_master_cv;
# 40 : :
# 41 : : //! The queue of elements to be processed.
# 42 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
# 43 : : std::vector<T> queue GUARDED_BY(m_mutex);
# 44 : :
# 45 : : //! The number of workers (including the master) that are idle.
# 46 : : int nIdle GUARDED_BY(m_mutex){0};
# 47 : :
# 48 : : //! The total number of workers (including the master).
# 49 : : int nTotal GUARDED_BY(m_mutex){0};
# 50 : :
# 51 : : //! The temporary evaluation result.
# 52 : : bool fAllOk GUARDED_BY(m_mutex){true};
# 53 : :
# 54 : : /**
# 55 : : * Number of verifications that haven't completed yet.
# 56 : : * This includes elements that are no longer queued, but still in the
# 57 : : * worker's own batches.
# 58 : : */
# 59 : : unsigned int nTodo GUARDED_BY(m_mutex){0};
# 60 : :
# 61 : : //! The maximum number of elements to be processed in one batch
# 62 : : const unsigned int nBatchSize;
# 63 : :
# 64 : : std::vector<std::thread> m_worker_threads;
# 65 : : bool m_request_stop GUARDED_BY(m_mutex){false};
# 66 : :
# 67 : : /** Internal function that does bulk of the verification work. */
# 68 : : bool Loop(bool fMaster)
# 69 : 97169 : {
# 70 [ + + ][ + + ]: 97169 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
# [ + + ][ + + ]
# [ + - ][ + + ]
# [ + + ]
# 71 : 97169 : std::vector<T> vChecks;
# 72 : 97169 : vChecks.reserve(nBatchSize);
# 73 : 97169 : unsigned int nNow = 0;
# 74 : 97169 : bool fOk = true;
# 75 : 3486455 : do {
# 76 : 3486455 : {
# 77 : 3486455 : WAIT_LOCK(m_mutex, lock);
# 78 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
# 79 [ + + ][ + + ]: 3486455 : if (nNow) {
# [ + + ][ + + ]
# [ + + ][ - + ]
# [ + + ]
# 80 : 3389575 : fAllOk &= fOk;
# 81 : 3389575 : nTodo -= nNow;
# 82 [ + - ][ + + ]: 3389575 : if (nTodo == 0 && !fMaster)
# [ + + ][ + + ]
# [ + + ][ # # ]
# [ + + ][ + - ]
# [ + - ][ + + ]
# [ # # ][ + + ]
# [ + + ][ + + ]
# 83 : : // We processed the last element; inform the master it can exit and return the result
# 84 : 29866 : m_master_cv.notify_one();
# 85 : 3389575 : } else {
# 86 : : // first iteration
# 87 : 96880 : nTotal++;
# 88 : 96880 : }
# 89 : : // logically, the do loop starts here
# 90 [ + + ][ + + ]: 3596660 : while (queue.empty() && !m_request_stop) {
# [ + + ][ + - ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + - ]
# [ + + ][ + + ]
# 91 [ + + ][ + + ]: 202377 : if (fMaster && nTodo == 0) {
# [ + + ][ + + ]
# [ + + ][ + - ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ][ + - ]
# 92 : 92172 : nTotal--;
# 93 : 92172 : bool fRet = fAllOk;
# 94 : : // reset the status for new work later
# 95 : 92172 : fAllOk = true;
# 96 : : // return the current status
# 97 : 92172 : return fRet;
# 98 : 92172 : }
# 99 : 110205 : nIdle++;
# 100 : 110205 : cond.wait(lock); // wait
# 101 : 110205 : nIdle--;
# 102 : 110205 : }
# 103 [ # # ][ + + ]: 3486455 : if (m_request_stop) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# [ + + ]
# 104 : 5000 : return false;
# 105 : 5000 : }
# 106 : :
# 107 : : // Decide how many work units to process now.
# 108 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so
# 109 : : // all workers finish approximately simultaneously.
# 110 : : // * Try to account for idle jobs which will instantly start helping.
# 111 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
# 112 : 3389283 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
# 113 : 3389283 : vChecks.resize(nNow);
# 114 [ + + ][ + + ]: 27359564 : for (unsigned int i = 0; i < nNow; i++) {
# [ + + ][ + + ]
# [ + + ][ # # ]
# [ + + ]
# 115 : : // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
# 116 : : // queue to the local batch vector instead of copying.
# 117 : 23970281 : vChecks[i].swap(queue.back());
# 118 : 23970281 : queue.pop_back();
# 119 : 23970281 : }
# 120 : : // Check whether we need to do work at all
# 121 : 3389283 : fOk = fAllOk;
# 122 : 3389283 : }
# 123 : : // execute work
# 124 [ + + ][ + + ]: 3389283 : for (T& check : vChecks)
# [ + + ][ + + ]
# [ # # ][ + + ]
# [ + + ]
# 125 [ + + ][ + - ]: 23970016 : if (fOk)
# [ # # ][ + + ]
# [ + + ][ + - ]
# [ + + ]
# 126 : 23966674 : fOk = check();
# 127 : 3389283 : vChecks.clear();
# 128 : 3389283 : } while (true);
# 129 : 97169 : }
# 130 : :
# 131 : : public:
# 132 : : //! Mutex to ensure only one concurrent CCheckQueueControl
# 133 : : Mutex m_control_mutex;
# 134 : :
# 135 : : //! Create a new check queue
# 136 : : explicit CCheckQueue(unsigned int nBatchSizeIn)
# 137 : : : nBatchSize(nBatchSizeIn)
# 138 : 712 : {
# 139 : 712 : }
# 140 : :
# 141 : : //! Create a pool of new worker threads.
# 142 : : void StartWorkerThreads(const int threads_num)
# 143 : 833 : {
# 144 : 833 : {
# 145 : 833 : LOCK(m_mutex);
# 146 : 833 : nIdle = 0;
# 147 : 833 : nTotal = 0;
# 148 : 833 : fAllOk = true;
# 149 : 833 : }
# 150 : 833 : assert(m_worker_threads.empty());
# 151 [ + + ][ + + ]: 5833 : for (int n = 0; n < threads_num; ++n) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 152 : 5000 : m_worker_threads.emplace_back([this, n]() {
# 153 : 5000 : util::ThreadRename(strprintf("scriptch.%i", n));
# 154 : 5000 : Loop(false /* worker thread */);
# 155 : 5000 : });
# 156 : 5000 : }
# 157 : 833 : }
# 158 : :
# 159 : : //! Wait until execution finishes, and return whether all evaluations were successful.
# 160 : : bool Wait()
# 161 : 92172 : {
# 162 : 92172 : return Loop(true /* master thread */);
# 163 : 92172 : }
# 164 : :
# 165 : : //! Add a batch of checks to the queue
# 166 : : void Add(std::vector<T>& vChecks)
# 167 : 5386411 : {
# 168 : 5386411 : LOCK(m_mutex);
# 169 [ + + ][ + + ]: 23970281 : for (T& check : vChecks) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 170 : 23970281 : queue.push_back(T());
# 171 : 23970281 : check.swap(queue.back());
# 172 : 23970281 : }
# 173 : 5386411 : nTodo += vChecks.size();
# 174 [ + + ][ + + ]: 5386411 : if (vChecks.size() == 1)
# [ + + ][ + + ]
# [ + - ][ + + ]
# 175 : 568137 : m_worker_cv.notify_one();
# 176 [ + + ][ + + ]: 4818274 : else if (vChecks.size() > 1)
# [ + + ][ + + ]
# [ # # ][ + + ]
# 177 : 4260319 : m_worker_cv.notify_all();
# 178 : 5386411 : }
# 179 : :
# 180 : : //! Stop all of the worker threads.
# 181 : : void StopWorkerThreads()
# 182 : 840 : {
# 183 : 840 : WITH_LOCK(m_mutex, m_request_stop = true);
# 184 : 840 : m_worker_cv.notify_all();
# 185 [ + + ][ + + ]: 5000 : for (std::thread& t : m_worker_threads) {
# [ + + ][ + + ]
# [ + + ][ + + ]
# 186 : 5000 : t.join();
# 187 : 5000 : }
# 188 : 840 : m_worker_threads.clear();
# 189 : 840 : WITH_LOCK(m_mutex, m_request_stop = false);
# 190 : 840 : }
# 191 : :
# 192 : : ~CCheckQueue()
# 193 : 22 : {
# 194 : 22 : assert(m_worker_threads.empty());
# 195 : 22 : }
# 196 : :
# 197 : : };
# 198 : :
# 199 : : /**
# 200 : : * RAII-style controller object for a CCheckQueue that guarantees the passed
# 201 : : * queue is finished before continuing.
# 202 : : */
# 203 : : template <typename T>
# 204 : : class CCheckQueueControl
# 205 : : {
# 206 : : private:
# 207 : : CCheckQueue<T> * const pqueue;
# 208 : : bool fDone;
# 209 : :
# 210 : : public:
# 211 : : CCheckQueueControl() = delete;
# 212 : : CCheckQueueControl(const CCheckQueueControl&) = delete;
# 213 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
# 214 : : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
# 215 : 105171 : {
# 216 : : // passed queue is supposed to be unused, or nullptr
# 217 [ + - ][ + - ]: 105171 : if (pqueue != nullptr) {
# [ + - ][ + + ]
# [ + - ][ + - ]
# [ + - ]
# 218 : 92172 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
# 219 : 92172 : }
# 220 : 105171 : }
# 221 : :
# 222 : : bool Wait()
# 223 : 115486 : {
# 224 [ + + ][ - + ]: 115486 : if (pqueue == nullptr)
# [ - + ][ - + ]
# [ - + ][ - + ]
# [ - + ]
# 225 : 23314 : return true;
# 226 : 92172 : bool fRet = pqueue->Wait();
# 227 : 92172 : fDone = true;
# 228 : 92172 : return fRet;
# 229 : 92172 : }
# 230 : :
# 231 : : void Add(std::vector<T>& vChecks)
# 232 : 5391026 : {
# 233 [ + - ][ + - ]: 5391026 : if (pqueue != nullptr)
# [ + + ][ + - ]
# [ + - ][ + - ]
# 234 : 5386411 : pqueue->Add(vChecks);
# 235 : 5391026 : }
# 236 : :
# 237 : : ~CCheckQueueControl()
# 238 : 105171 : {
# 239 [ + - ][ - + ]: 105171 : if (!fDone)
# [ - + ][ + - ]
# [ + + ][ - + ]
# [ + - ]
# 240 : 15051 : Wait();
# 241 [ + - ][ + - ]: 105171 : if (pqueue != nullptr) {
# [ + + ][ + - ]
# [ + - ][ + - ]
# [ + - ]
# 242 : 92172 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
# 243 : 92172 : }
# 244 : 105171 : }
# 245 : : };
# 246 : :
# 247 : : #endif // BITCOIN_CHECKQUEUE_H
|