LCOV - code coverage report
Current view: top level - src/leveldb/db - db_impl.cc (source / functions) Hit Total Coverage
Test: coverage.lcov Lines: 744 1139 65.3 %
Date: 2022-04-21 14:51:19 Functions: 39 53 73.6 %
Legend: Modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed

Not modified by patch:
Lines: hit not hit | Branches: + taken - not taken # not executed
Branches: 234 466 50.2 %

           Branch data     Line data    Source code
#       1                 :            : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
#       2                 :            : // Use of this source code is governed by a BSD-style license that can be
#       3                 :            : // found in the LICENSE file. See the AUTHORS file for names of contributors.
#       4                 :            : 
#       5                 :            : #include "db/db_impl.h"
#       6                 :            : 
#       7                 :            : #include <stdint.h>
#       8                 :            : #include <stdio.h>
#       9                 :            : 
#      10                 :            : #include <algorithm>
#      11                 :            : #include <atomic>
#      12                 :            : #include <set>
#      13                 :            : #include <string>
#      14                 :            : #include <vector>
#      15                 :            : 
#      16                 :            : #include "db/builder.h"
#      17                 :            : #include "db/db_iter.h"
#      18                 :            : #include "db/dbformat.h"
#      19                 :            : #include "db/filename.h"
#      20                 :            : #include "db/log_reader.h"
#      21                 :            : #include "db/log_writer.h"
#      22                 :            : #include "db/memtable.h"
#      23                 :            : #include "db/table_cache.h"
#      24                 :            : #include "db/version_set.h"
#      25                 :            : #include "db/write_batch_internal.h"
#      26                 :            : #include "leveldb/db.h"
#      27                 :            : #include "leveldb/env.h"
#      28                 :            : #include "leveldb/status.h"
#      29                 :            : #include "leveldb/table.h"
#      30                 :            : #include "leveldb/table_builder.h"
#      31                 :            : #include "port/port.h"
#      32                 :            : #include "table/block.h"
#      33                 :            : #include "table/merger.h"
#      34                 :            : #include "table/two_level_iterator.h"
#      35                 :            : #include "util/coding.h"
#      36                 :            : #include "util/logging.h"
#      37                 :            : #include "util/mutexlock.h"
#      38                 :            : 
#      39                 :            : namespace leveldb {
#      40                 :            : 
#      41                 :            : const int kNumNonTableCacheFiles = 10;
#      42                 :            : 
#      43                 :            : // Information kept for every waiting writer
#      44                 :            : struct DBImpl::Writer {
#      45                 :            :   explicit Writer(port::Mutex* mu)
#      46                 :      13678 :       : batch(nullptr), sync(false), done(false), cv(mu) {}
#      47                 :            : 
#      48                 :            :   Status status;
#      49                 :            :   WriteBatch* batch;
#      50                 :            :   bool sync;
#      51                 :            :   bool done;
#      52                 :            :   port::CondVar cv;
#      53                 :            : };
#      54                 :            : 
#      55                 :            : struct DBImpl::CompactionState {
#      56                 :            :   // Files produced by compaction
#      57                 :            :   struct Output {
#      58                 :            :     uint64_t number;
#      59                 :            :     uint64_t file_size;
#      60                 :            :     InternalKey smallest, largest;
#      61                 :            :   };
#      62                 :            : 
#      63                 :      32503 :   Output* current_output() { return &outputs[outputs.size() - 1]; }
#      64                 :            : 
#      65                 :            :   explicit CompactionState(Compaction* c)
#      66                 :            :       : compaction(c),
#      67                 :            :         smallest_snapshot(0),
#      68                 :            :         outfile(nullptr),
#      69                 :            :         builder(nullptr),
#      70                 :        108 :         total_bytes(0) {}
#      71                 :            : 
#      72                 :            :   Compaction* const compaction;
#      73                 :            : 
#      74                 :            :   // Sequence numbers < smallest_snapshot are not significant since we
#      75                 :            :   // will never have to service a snapshot below smallest_snapshot.
#      76                 :            :   // Therefore if we have seen a sequence number S <= smallest_snapshot,
#      77                 :            :   // we can drop all entries for the same key with sequence numbers < S.
#      78                 :            :   SequenceNumber smallest_snapshot;
#      79                 :            : 
#      80                 :            :   std::vector<Output> outputs;
#      81                 :            : 
#      82                 :            :   // State kept for output being generated
#      83                 :            :   WritableFile* outfile;
#      84                 :            :   TableBuilder* builder;
#      85                 :            : 
#      86                 :            :   uint64_t total_bytes;
#      87                 :            : };
#      88                 :            : 
#      89                 :            : // Fix user-supplied options to be reasonable
#      90                 :            : template <class T, class V>
#      91                 :       8772 : static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
#      92 [ -  + ][ -  + ]:       8772 :   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
#      93 [ +  + ][ -  + ]:       8772 :   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
#      94                 :       8772 : }
#      95                 :            : Options SanitizeOptions(const std::string& dbname,
#      96                 :            :                         const InternalKeyComparator* icmp,
#      97                 :            :                         const InternalFilterPolicy* ipolicy,
#      98                 :       2193 :                         const Options& src) {
#      99                 :       2193 :   Options result = src;
#     100                 :       2193 :   result.comparator = icmp;
#     101         [ +  - ]:       2193 :   result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
#     102                 :       2193 :   ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
#     103                 :       2193 :   ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
#     104                 :       2193 :   ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
#     105                 :       2193 :   ClipToRange(&result.block_size, 1 << 10, 4 << 20);
#     106         [ -  + ]:       2193 :   if (result.info_log == nullptr) {
#     107                 :            :     // Open a log file in the same directory as the db
#     108                 :          0 :     src.env->CreateDir(dbname);  // In case it does not exist
#     109                 :          0 :     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
#     110                 :          0 :     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
#     111         [ #  # ]:          0 :     if (!s.ok()) {
#     112                 :            :       // No place suitable for logging
#     113                 :          0 :       result.info_log = nullptr;
#     114                 :          0 :     }
#     115                 :          0 :   }
#     116         [ -  + ]:       2193 :   if (result.block_cache == nullptr) {
#     117                 :          0 :     result.block_cache = NewLRUCache(8 << 20);
#     118                 :          0 :   }
#     119                 :       2193 :   return result;
#     120                 :       2193 : }
#     121                 :            : 
#     122                 :       2193 : static int TableCacheSize(const Options& sanitized_options) {
#     123                 :            :   // Reserve ten files or so for other uses and give the rest to TableCache.
#     124                 :       2193 :   return sanitized_options.max_open_files - kNumNonTableCacheFiles;
#     125                 :       2193 : }
#     126                 :            : 
#     127                 :            : DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
#     128                 :            :     : env_(raw_options.env),
#     129                 :            :       internal_comparator_(raw_options.comparator),
#     130                 :            :       internal_filter_policy_(raw_options.filter_policy),
#     131                 :            :       options_(SanitizeOptions(dbname, &internal_comparator_,
#     132                 :            :                                &internal_filter_policy_, raw_options)),
#     133                 :            :       owns_info_log_(options_.info_log != raw_options.info_log),
#     134                 :            :       owns_cache_(options_.block_cache != raw_options.block_cache),
#     135                 :            :       dbname_(dbname),
#     136                 :            :       table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
#     137                 :            :       db_lock_(nullptr),
#     138                 :            :       shutting_down_(false),
#     139                 :            :       background_work_finished_signal_(&mutex_),
#     140                 :            :       mem_(nullptr),
#     141                 :            :       imm_(nullptr),
#     142                 :            :       has_imm_(false),
#     143                 :            :       logfile_(nullptr),
#     144                 :            :       logfile_number_(0),
#     145                 :            :       log_(nullptr),
#     146                 :            :       seed_(0),
#     147                 :            :       tmp_batch_(new WriteBatch),
#     148                 :            :       background_compaction_scheduled_(false),
#     149                 :            :       manual_compaction_(nullptr),
#     150                 :            :       versions_(new VersionSet(dbname_, &options_, table_cache_,
#     151                 :       2193 :                                &internal_comparator_)) {}
#     152                 :            : 
#     153                 :       2193 : DBImpl::~DBImpl() {
#     154                 :            :   // Wait for background work to finish.
#     155                 :       2193 :   mutex_.Lock();
#     156                 :       2193 :   shutting_down_.store(true, std::memory_order_release);
#     157         [ -  + ]:       2193 :   while (background_compaction_scheduled_) {
#     158                 :          0 :     background_work_finished_signal_.Wait();
#     159                 :          0 :   }
#     160                 :       2193 :   mutex_.Unlock();
#     161                 :            : 
#     162         [ +  - ]:       2193 :   if (db_lock_ != nullptr) {
#     163                 :       2193 :     env_->UnlockFile(db_lock_);
#     164                 :       2193 :   }
#     165                 :            : 
#     166                 :       2193 :   delete versions_;
#     167         [ +  + ]:       2193 :   if (mem_ != nullptr) mem_->Unref();
#     168         [ -  + ]:       2193 :   if (imm_ != nullptr) imm_->Unref();
#     169                 :       2193 :   delete tmp_batch_;
#     170                 :       2193 :   delete log_;
#     171                 :       2193 :   delete logfile_;
#     172                 :       2193 :   delete table_cache_;
#     173                 :            : 
#     174         [ -  + ]:       2193 :   if (owns_info_log_) {
#     175                 :          0 :     delete options_.info_log;
#     176                 :          0 :   }
#     177         [ -  + ]:       2193 :   if (owns_cache_) {
#     178                 :          0 :     delete options_.block_cache;
#     179                 :          0 :   }
#     180                 :       2193 : }
#     181                 :            : 
#     182                 :       1220 : Status DBImpl::NewDB() {
#     183                 :       1220 :   VersionEdit new_db;
#     184                 :       1220 :   new_db.SetComparatorName(user_comparator()->Name());
#     185                 :       1220 :   new_db.SetLogNumber(0);
#     186                 :       1220 :   new_db.SetNextFile(2);
#     187                 :       1220 :   new_db.SetLastSequence(0);
#     188                 :            : 
#     189                 :       1220 :   const std::string manifest = DescriptorFileName(dbname_, 1);
#     190                 :       1220 :   WritableFile* file;
#     191                 :       1220 :   Status s = env_->NewWritableFile(manifest, &file);
#     192         [ -  + ]:       1220 :   if (!s.ok()) {
#     193                 :          0 :     return s;
#     194                 :          0 :   }
#     195                 :       1220 :   {
#     196                 :       1220 :     log::Writer log(file);
#     197                 :       1220 :     std::string record;
#     198                 :       1220 :     new_db.EncodeTo(&record);
#     199                 :       1220 :     s = log.AddRecord(record);
#     200         [ +  - ]:       1220 :     if (s.ok()) {
#     201                 :       1220 :       s = file->Close();
#     202                 :       1220 :     }
#     203                 :       1220 :   }
#     204                 :       1220 :   delete file;
#     205         [ +  - ]:       1220 :   if (s.ok()) {
#     206                 :            :     // Make "CURRENT" file that points to the new manifest file.
#     207                 :       1220 :     s = SetCurrentFile(env_, dbname_, 1);
#     208                 :       1220 :   } else {
#     209                 :          0 :     env_->DeleteFile(manifest);
#     210                 :          0 :   }
#     211                 :       1220 :   return s;
#     212                 :       1220 : }
#     213                 :            : 
#     214                 :       4468 : void DBImpl::MaybeIgnoreError(Status* s) const {
#     215 [ +  - ][ #  # ]:       4468 :   if (s->ok() || options_.paranoid_checks) {
#     216                 :            :     // No change needed
#     217                 :       4468 :   } else {
#     218                 :          0 :     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
#     219                 :          0 :     *s = Status::OK();
#     220                 :          0 :   }
#     221                 :       4468 : }
#     222                 :            : 
#     223                 :       2302 : void DBImpl::DeleteObsoleteFiles() {
#     224                 :       2302 :   mutex_.AssertHeld();
#     225                 :            : 
#     226         [ -  + ]:       2302 :   if (!bg_error_.ok()) {
#     227                 :            :     // After a background error, we don't know whether a new version may
#     228                 :            :     // or may not have been committed, so we cannot safely garbage collect.
#     229                 :          0 :     return;
#     230                 :          0 :   }
#     231                 :            : 
#     232                 :            :   // Make a set of all of the live files
#     233                 :       2302 :   std::set<uint64_t> live = pending_outputs_;
#     234                 :       2302 :   versions_->AddLiveFiles(&live);
#     235                 :            : 
#     236                 :       2302 :   std::vector<std::string> filenames;
#     237                 :       2302 :   env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
#     238                 :       2302 :   uint64_t number;
#     239                 :       2302 :   FileType type;
#     240                 :       2302 :   std::vector<std::string> files_to_delete;
#     241         [ +  + ]:      17393 :   for (std::string& filename : filenames) {
#     242         [ +  + ]:      17393 :     if (ParseFileName(filename, &number, &type)) {
#     243                 :      14095 :       bool keep = true;
#     244         [ -  + ]:      14095 :       switch (type) {
#     245         [ +  + ]:       3276 :         case kLogFile:
#     246         [ +  + ]:       3276 :           keep = ((number >= versions_->LogNumber()) ||
#     247         [ -  + ]:       3276 :                   (number == versions_->PrevLogNumber()));
#     248                 :       3276 :           break;
#     249         [ +  + ]:       4493 :         case kDescriptorFile:
#     250                 :            :           // Keep my manifest file, and any newer incarnations'
#     251                 :            :           // (in case there is a race that allows other incarnations)
#     252                 :       4493 :           keep = (number >= versions_->ManifestFileNumber());
#     253                 :       4493 :           break;
#     254         [ +  + ]:       2375 :         case kTableFile:
#     255                 :       2375 :           keep = (live.find(number) != live.end());
#     256                 :       2375 :           break;
#     257         [ -  + ]:          0 :         case kTempFile:
#     258                 :            :           // Any temp files that are currently being written to must
#     259                 :            :           // be recorded in pending_outputs_, which is inserted into "live"
#     260                 :          0 :           keep = (live.find(number) != live.end());
#     261                 :          0 :           break;
#     262         [ +  + ]:       2302 :         case kCurrentFile:
#     263         [ +  + ]:       3951 :         case kDBLockFile:
#     264         [ -  + ]:       3951 :         case kInfoLogFile:
#     265                 :       3951 :           keep = true;
#     266                 :       3951 :           break;
#     267                 :      14095 :       }
#     268                 :            : 
#     269         [ +  + ]:      14095 :       if (!keep) {
#     270                 :       3568 :         files_to_delete.push_back(std::move(filename));
#     271         [ +  + ]:       3568 :         if (type == kTableFile) {
#     272                 :        403 :           table_cache_->Evict(number);
#     273                 :        403 :         }
#     274                 :       3568 :         Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
#     275                 :       3568 :             static_cast<unsigned long long>(number));
#     276                 :       3568 :       }
#     277                 :      14095 :     }
#     278                 :      17393 :   }
#     279                 :            : 
#     280                 :            :   // While deleting all files unblock other threads. All files being deleted
#     281                 :            :   // have unique names which will not collide with newly created files and
#     282                 :            :   // are therefore safe to delete while allowing other threads to proceed.
#     283                 :       2302 :   mutex_.Unlock();
#     284         [ +  + ]:       3568 :   for (const std::string& filename : files_to_delete) {
#     285                 :       3568 :     env_->DeleteFile(dbname_ + "/" + filename);
#     286                 :       3568 :   }
#     287                 :       2302 :   mutex_.Lock();
#     288                 :       2302 : }
#     289                 :            : 
#     290                 :       2193 : Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
#     291                 :       2193 :   mutex_.AssertHeld();
#     292                 :            : 
#     293                 :            :   // Ignore error from CreateDir since the creation of the DB is
#     294                 :            :   // committed only when the descriptor is created, and this directory
#     295                 :            :   // may already exist from a previous failed creation attempt.
#     296                 :       2193 :   env_->CreateDir(dbname_);
#     297                 :       2193 :   assert(db_lock_ == nullptr);
#     298                 :          0 :   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
#     299         [ -  + ]:       2193 :   if (!s.ok()) {
#     300                 :          0 :     return s;
#     301                 :          0 :   }
#     302                 :            : 
#     303         [ +  + ]:       2193 :   if (!env_->FileExists(CurrentFileName(dbname_))) {
#     304         [ +  - ]:       1220 :     if (options_.create_if_missing) {
#     305                 :       1220 :       s = NewDB();
#     306         [ -  + ]:       1220 :       if (!s.ok()) {
#     307                 :          0 :         return s;
#     308                 :          0 :       }
#     309                 :       1220 :     } else {
#     310                 :          0 :       return Status::InvalidArgument(
#     311                 :          0 :           dbname_, "does not exist (create_if_missing is false)");
#     312                 :          0 :     }
#     313                 :       1220 :   } else {
#     314         [ -  + ]:        973 :     if (options_.error_if_exists) {
#     315                 :          0 :       return Status::InvalidArgument(dbname_,
#     316                 :          0 :                                      "exists (error_if_exists is true)");
#     317                 :          0 :     }
#     318                 :        973 :   }
#     319                 :            : 
#     320                 :       2193 :   s = versions_->Recover(save_manifest);
#     321         [ -  + ]:       2193 :   if (!s.ok()) {
#     322                 :          0 :     return s;
#     323                 :          0 :   }
#     324                 :       2193 :   SequenceNumber max_sequence(0);
#     325                 :            : 
#     326                 :            :   // Recover from all newer log files than the ones named in the
#     327                 :            :   // descriptor (new log files may have been added by the previous
#     328                 :            :   // incarnation without registering them in the descriptor).
#     329                 :            :   //
#     330                 :            :   // Note that PrevLogNumber() is no longer used, but we pay
#     331                 :            :   // attention to it in case we are recovering a database
#     332                 :            :   // produced by an older version of leveldb.
#     333                 :       2193 :   const uint64_t min_log = versions_->LogNumber();
#     334                 :       2193 :   const uint64_t prev_log = versions_->PrevLogNumber();
#     335                 :       2193 :   std::vector<std::string> filenames;
#     336                 :       2193 :   s = env_->GetChildren(dbname_, &filenames);
#     337         [ -  + ]:       2193 :   if (!s.ok()) {
#     338                 :          0 :     return s;
#     339                 :          0 :   }
#     340                 :       2193 :   std::set<uint64_t> expected;
#     341                 :       2193 :   versions_->AddLiveFiles(&expected);
#     342                 :       2193 :   uint64_t number;
#     343                 :       2193 :   FileType type;
#     344                 :       2193 :   std::vector<uint64_t> logs;
#     345         [ +  + ]:      13062 :   for (size_t i = 0; i < filenames.size(); i++) {
#     346         [ +  + ]:      10869 :     if (ParseFileName(filenames[i], &number, &type)) {
#     347                 :       7778 :       expected.erase(number);
#     348 [ +  + ][ +  - ]:       7778 :       if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
#                 [ #  # ]
#     349                 :        973 :         logs.push_back(number);
#     350                 :       7778 :     }
#     351                 :      10869 :   }
#     352         [ +  + ]:       2193 :   if (!expected.empty()) {
#     353                 :          2 :     char buf[50];
#     354                 :          2 :     snprintf(buf, sizeof(buf), "%d missing files; e.g.",
#     355                 :          2 :              static_cast<int>(expected.size()));
#     356                 :          2 :     return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
#     357                 :          2 :   }
#     358                 :            : 
#     359                 :            :   // Recover in the order in which the logs were generated
#     360                 :       2191 :   std::sort(logs.begin(), logs.end());
#     361         [ +  + ]:       3162 :   for (size_t i = 0; i < logs.size(); i++) {
#     362                 :        971 :     s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
#     363                 :        971 :                        &max_sequence);
#     364         [ -  + ]:        971 :     if (!s.ok()) {
#     365                 :          0 :       return s;
#     366                 :          0 :     }
#     367                 :            : 
#     368                 :            :     // The previous incarnation may not have written any MANIFEST
#     369                 :            :     // records after allocating this log number.  So we manually
#     370                 :            :     // update the file number allocation counter in VersionSet.
#     371                 :        971 :     versions_->MarkFileNumberUsed(logs[i]);
#     372                 :        971 :   }
#     373                 :            : 
#     374         [ +  + ]:       2191 :   if (versions_->LastSequence() < max_sequence) {
#     375                 :        966 :     versions_->SetLastSequence(max_sequence);
#     376                 :        966 :   }
#     377                 :            : 
#     378                 :       2191 :   return Status::OK();
#     379                 :       2191 : }
#     380                 :            : 
#     381                 :            : Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
#     382                 :            :                               bool* save_manifest, VersionEdit* edit,
#     383                 :        971 :                               SequenceNumber* max_sequence) {
#     384                 :        971 :   struct LogReporter : public log::Reader::Reporter {
#     385                 :        971 :     Env* env;
#     386                 :        971 :     Logger* info_log;
#     387                 :        971 :     const char* fname;
#     388                 :        971 :     Status* status;  // null if options_.paranoid_checks==false
#     389                 :        971 :     void Corruption(size_t bytes, const Status& s) override {
#     390                 :          0 :       Log(info_log, "%s%s: dropping %d bytes; %s",
#     391         [ #  # ]:          0 :           (this->status == nullptr ? "(ignoring error) " : ""), fname,
#     392                 :          0 :           static_cast<int>(bytes), s.ToString().c_str());
#     393 [ #  # ][ #  # ]:          0 :       if (this->status != nullptr && this->status->ok()) *this->status = s;
#     394                 :          0 :     }
#     395                 :        971 :   };
#     396                 :            : 
#     397                 :        971 :   mutex_.AssertHeld();
#     398                 :            : 
#     399                 :            :   // Open the log file
#     400                 :        971 :   std::string fname = LogFileName(dbname_, log_number);
#     401                 :        971 :   SequentialFile* file;
#     402                 :        971 :   Status status = env_->NewSequentialFile(fname, &file);
#     403         [ -  + ]:        971 :   if (!status.ok()) {
#     404                 :          0 :     MaybeIgnoreError(&status);
#     405                 :          0 :     return status;
#     406                 :          0 :   }
#     407                 :            : 
#     408                 :            :   // Create the log reader.
#     409                 :        971 :   LogReporter reporter;
#     410                 :        971 :   reporter.env = env_;
#     411                 :        971 :   reporter.info_log = options_.info_log;
#     412                 :        971 :   reporter.fname = fname.c_str();
#     413         [ +  - ]:        971 :   reporter.status = (options_.paranoid_checks ? &status : nullptr);
#     414                 :            :   // We intentionally make log::Reader do checksumming even if
#     415                 :            :   // paranoid_checks==false so that corruptions cause entire commits
#     416                 :            :   // to be skipped instead of propagating bad information (like overly
#     417                 :            :   // large sequence numbers).
#     418                 :        971 :   log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
#     419                 :        971 :   Log(options_.info_log, "Recovering log #%llu",
#     420                 :        971 :       (unsigned long long)log_number);
#     421                 :            : 
#     422                 :            :   // Read all the records and add to a memtable
#     423                 :        971 :   std::string scratch;
#     424                 :        971 :   Slice record;
#     425                 :        971 :   WriteBatch batch;
#     426                 :        971 :   int compactions = 0;
#     427                 :        971 :   MemTable* mem = nullptr;
#     428 [ +  + ][ +  - ]:       5439 :   while (reader.ReadRecord(&record, &scratch) && status.ok()) {
#     429         [ -  + ]:       4468 :     if (record.size() < 12) {
#     430                 :          0 :       reporter.Corruption(record.size(),
#     431                 :          0 :                           Status::Corruption("log record too small", fname));
#     432                 :          0 :       continue;
#     433                 :          0 :     }
#     434                 :       4468 :     WriteBatchInternal::SetContents(&batch, record);
#     435                 :            : 
#     436         [ +  + ]:       4468 :     if (mem == nullptr) {
#     437                 :        966 :       mem = new MemTable(internal_comparator_);
#     438                 :        966 :       mem->Ref();
#     439                 :        966 :     }
#     440                 :       4468 :     status = WriteBatchInternal::InsertInto(&batch, mem);
#     441                 :       4468 :     MaybeIgnoreError(&status);
#     442         [ -  + ]:       4468 :     if (!status.ok()) {
#     443                 :          0 :       break;
#     444                 :          0 :     }
#     445                 :       4468 :     const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
#     446                 :       4468 :                                     WriteBatchInternal::Count(&batch) - 1;
#     447         [ +  - ]:       4468 :     if (last_seq > *max_sequence) {
#     448                 :       4468 :       *max_sequence = last_seq;
#     449                 :       4468 :     }
#     450                 :            : 
#     451         [ -  + ]:       4468 :     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
#     452                 :          0 :       compactions++;
#     453                 :          0 :       *save_manifest = true;
#     454                 :          0 :       status = WriteLevel0Table(mem, edit, nullptr);
#     455                 :          0 :       mem->Unref();
#     456                 :          0 :       mem = nullptr;
#     457         [ #  # ]:          0 :       if (!status.ok()) {
#     458                 :            :         // Reflect errors immediately so that conditions like full
#     459                 :            :         // file-systems cause the DB::Open() to fail.
#     460                 :          0 :         break;
#     461                 :          0 :       }
#     462                 :          0 :     }
#     463                 :       4468 :   }
#     464                 :            : 
#     465                 :        971 :   delete file;
#     466                 :            : 
#     467                 :            :   // See if we should keep reusing the last log file.
#     468 [ +  - ][ -  + ]:        971 :   if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
#         [ #  # ][ #  # ]
#     469                 :          0 :     assert(logfile_ == nullptr);
#     470                 :          0 :     assert(log_ == nullptr);
#     471                 :          0 :     assert(mem_ == nullptr);
#     472                 :          0 :     uint64_t lfile_size;
#     473 [ #  # ][ #  # ]:          0 :     if (env_->GetFileSize(fname, &lfile_size).ok() &&
#     474         [ #  # ]:          0 :         env_->NewAppendableFile(fname, &logfile_).ok()) {
#     475                 :          0 :       Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
#     476                 :          0 :       log_ = new log::Writer(logfile_, lfile_size);
#     477                 :          0 :       logfile_number_ = log_number;
#     478         [ #  # ]:          0 :       if (mem != nullptr) {
#     479                 :          0 :         mem_ = mem;
#     480                 :          0 :         mem = nullptr;
#     481                 :          0 :       } else {
#     482                 :            :         // mem can be nullptr if lognum exists but was empty.
#     483                 :          0 :         mem_ = new MemTable(internal_comparator_);
#     484                 :          0 :         mem_->Ref();
#     485                 :          0 :       }
#     486                 :          0 :     }
#     487                 :          0 :   }
#     488                 :            : 
#     489         [ +  + ]:        971 :   if (mem != nullptr) {
#     490                 :            :     // mem did not get reused; compact it.
#     491         [ +  - ]:        966 :     if (status.ok()) {
#     492                 :        966 :       *save_manifest = true;
#     493                 :        966 :       status = WriteLevel0Table(mem, edit, nullptr);
#     494                 :        966 :     }
#     495                 :        966 :     mem->Unref();
#     496                 :        966 :   }
#     497                 :            : 
#     498                 :        971 :   return status;
#     499                 :        971 : }
#     500                 :            : 
#     501                 :            : Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
#     502                 :        969 :                                 Version* base) {
#     503                 :        969 :   mutex_.AssertHeld();
#     504                 :        969 :   const uint64_t start_micros = env_->NowMicros();
#     505                 :        969 :   FileMetaData meta;
#     506                 :        969 :   meta.number = versions_->NewFileNumber();
#     507                 :        969 :   pending_outputs_.insert(meta.number);
#     508                 :        969 :   Iterator* iter = mem->NewIterator();
#     509                 :        969 :   Log(options_.info_log, "Level-0 table #%llu: started",
#     510                 :        969 :       (unsigned long long)meta.number);
#     511                 :            : 
#     512                 :        969 :   Status s;
#     513                 :        969 :   {
#     514                 :        969 :     mutex_.Unlock();
#     515                 :        969 :     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
#     516                 :        969 :     mutex_.Lock();
#     517                 :        969 :   }
#     518                 :            : 
#     519                 :        969 :   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
#     520                 :        969 :       (unsigned long long)meta.number, (unsigned long long)meta.file_size,
#     521                 :        969 :       s.ToString().c_str());
#     522                 :        969 :   delete iter;
#     523                 :        969 :   pending_outputs_.erase(meta.number);
#     524                 :            : 
#     525                 :            :   // Note that if file_size is zero, the file has been deleted and
#     526                 :            :   // should not be added to the manifest.
#     527                 :        969 :   int level = 0;
#     528 [ +  - ][ +  - ]:        969 :   if (s.ok() && meta.file_size > 0) {
#     529                 :        969 :     const Slice min_user_key = meta.smallest.user_key();
#     530                 :        969 :     const Slice max_user_key = meta.largest.user_key();
#     531         [ +  + ]:        969 :     if (base != nullptr) {
#     532                 :          3 :       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
#     533                 :          3 :     }
#     534                 :        969 :     edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
#     535                 :        969 :                   meta.largest);
#     536                 :        969 :   }
#     537                 :            : 
#     538                 :        969 :   CompactionStats stats;
#     539                 :        969 :   stats.micros = env_->NowMicros() - start_micros;
#     540                 :        969 :   stats.bytes_written = meta.file_size;
#     541                 :        969 :   stats_[level].Add(stats);
#     542                 :        969 :   return s;
#     543                 :        969 : }
#     544                 :            : 
#     545                 :          3 : void DBImpl::CompactMemTable() {
#     546                 :          3 :   mutex_.AssertHeld();
#     547                 :          3 :   assert(imm_ != nullptr);
#     548                 :            : 
#     549                 :            :   // Save the contents of the memtable as a new Table
#     550                 :          0 :   VersionEdit edit;
#     551                 :          3 :   Version* base = versions_->current();
#     552                 :          3 :   base->Ref();
#     553                 :          3 :   Status s = WriteLevel0Table(imm_, &edit, base);
#     554                 :          3 :   base->Unref();
#     555                 :            : 
#     556 [ +  - ][ -  + ]:          3 :   if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
#     557                 :          0 :     s = Status::IOError("Deleting DB during memtable compaction");
#     558                 :          0 :   }
#     559                 :            : 
#     560                 :            :   // Replace immutable memtable with the generated Table
#     561         [ +  - ]:          3 :   if (s.ok()) {
#     562                 :          3 :     edit.SetPrevLogNumber(0);
#     563                 :          3 :     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
#     564                 :          3 :     s = versions_->LogAndApply(&edit, &mutex_);
#     565                 :          3 :   }
#     566                 :            : 
#     567         [ +  - ]:          3 :   if (s.ok()) {
#     568                 :            :     // Commit to the new state
#     569                 :          3 :     imm_->Unref();
#     570                 :          3 :     imm_ = nullptr;
#     571                 :          3 :     has_imm_.store(false, std::memory_order_release);
#     572                 :          3 :     DeleteObsoleteFiles();
#     573                 :          3 :   } else {
#     574                 :          0 :     RecordBackgroundError(s);
#     575                 :          0 :   }
#     576                 :          3 : }
#     577                 :            : 
#     578                 :          0 : void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
#     579                 :          0 :   int max_level_with_files = 1;
#     580                 :          0 :   {
#     581                 :          0 :     MutexLock l(&mutex_);
#     582                 :          0 :     Version* base = versions_->current();
#     583         [ #  # ]:          0 :     for (int level = 1; level < config::kNumLevels; level++) {
#     584         [ #  # ]:          0 :       if (base->OverlapInLevel(level, begin, end)) {
#     585                 :          0 :         max_level_with_files = level;
#     586                 :          0 :       }
#     587                 :          0 :     }
#     588                 :          0 :   }
#     589                 :          0 :   TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
#     590         [ #  # ]:          0 :   for (int level = 0; level < max_level_with_files; level++) {
#     591                 :          0 :     TEST_CompactRange(level, begin, end);
#     592                 :          0 :   }
#     593                 :          0 : }
#     594                 :            : 
#     595                 :            : void DBImpl::TEST_CompactRange(int level, const Slice* begin,
#     596                 :          0 :                                const Slice* end) {
#     597                 :          0 :   assert(level >= 0);
#     598                 :          0 :   assert(level + 1 < config::kNumLevels);
#     599                 :            : 
#     600                 :          0 :   InternalKey begin_storage, end_storage;
#     601                 :            : 
#     602                 :          0 :   ManualCompaction manual;
#     603                 :          0 :   manual.level = level;
#     604                 :          0 :   manual.done = false;
#     605         [ #  # ]:          0 :   if (begin == nullptr) {
#     606                 :          0 :     manual.begin = nullptr;
#     607                 :          0 :   } else {
#     608                 :          0 :     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
#     609                 :          0 :     manual.begin = &begin_storage;
#     610                 :          0 :   }
#     611         [ #  # ]:          0 :   if (end == nullptr) {
#     612                 :          0 :     manual.end = nullptr;
#     613                 :          0 :   } else {
#     614                 :          0 :     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
#     615                 :          0 :     manual.end = &end_storage;
#     616                 :          0 :   }
#     617                 :            : 
#     618                 :          0 :   MutexLock l(&mutex_);
#     619 [ #  # ][ #  # ]:          0 :   while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
#     620         [ #  # ]:          0 :          bg_error_.ok()) {
#     621         [ #  # ]:          0 :     if (manual_compaction_ == nullptr) {  // Idle
#     622                 :          0 :       manual_compaction_ = &manual;
#     623                 :          0 :       MaybeScheduleCompaction();
#     624                 :          0 :     } else {  // Running either my compaction or another compaction.
#     625                 :          0 :       background_work_finished_signal_.Wait();
#     626                 :          0 :     }
#     627                 :          0 :   }
#     628         [ #  # ]:          0 :   if (manual_compaction_ == &manual) {
#     629                 :            :     // Cancel my manual compaction since we aborted early for some reason.
#     630                 :          0 :     manual_compaction_ = nullptr;
#     631                 :          0 :   }
#     632                 :          0 : }
#     633                 :            : 
#     634                 :          0 : Status DBImpl::TEST_CompactMemTable() {
#     635                 :            :   // nullptr batch means just wait for earlier writes to be done
#     636                 :          0 :   Status s = Write(WriteOptions(), nullptr);
#     637         [ #  # ]:          0 :   if (s.ok()) {
#     638                 :            :     // Wait until the compaction completes
#     639                 :          0 :     MutexLock l(&mutex_);
#     640 [ #  # ][ #  # ]:          0 :     while (imm_ != nullptr && bg_error_.ok()) {
#     641                 :          0 :       background_work_finished_signal_.Wait();
#     642                 :          0 :     }
#     643         [ #  # ]:          0 :     if (imm_ != nullptr) {
#     644                 :          0 :       s = bg_error_;
#     645                 :          0 :     }
#     646                 :          0 :   }
#     647                 :          0 :   return s;
#     648                 :          0 : }
#     649                 :            : 
#     650                 :          0 : void DBImpl::RecordBackgroundError(const Status& s) {
#     651                 :          0 :   mutex_.AssertHeld();
#     652         [ #  # ]:          0 :   if (bg_error_.ok()) {
#     653                 :          0 :     bg_error_ = s;
#     654                 :          0 :     background_work_finished_signal_.SignalAll();
#     655                 :          0 :   }
#     656                 :          0 : }
#     657                 :            : 
#     658                 :       2329 : void DBImpl::MaybeScheduleCompaction() {
#     659                 :       2329 :   mutex_.AssertHeld();
#     660         [ -  + ]:       2329 :   if (background_compaction_scheduled_) {
#     661                 :            :     // Already scheduled
#     662         [ -  + ]:       2329 :   } else if (shutting_down_.load(std::memory_order_acquire)) {
#     663                 :            :     // DB is being deleted; no more background compactions
#     664         [ -  + ]:       2329 :   } else if (!bg_error_.ok()) {
#     665                 :            :     // Already got an error; no more changes
#     666 [ +  + ][ +  - ]:       2329 :   } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
#     667         [ +  + ]:       2329 :              !versions_->NeedsCompaction()) {
#     668                 :            :     // No work to be done
#     669                 :       2218 :   } else {
#     670                 :        111 :     background_compaction_scheduled_ = true;
#     671                 :        111 :     env_->Schedule(&DBImpl::BGWork, this);
#     672                 :        111 :   }
#     673                 :       2329 : }
#     674                 :            : 
#     675                 :        111 : void DBImpl::BGWork(void* db) {
#     676                 :        111 :   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
#     677                 :        111 : }
#     678                 :            : 
#     679                 :        111 : void DBImpl::BackgroundCall() {
#     680                 :        111 :   MutexLock l(&mutex_);
#     681                 :        111 :   assert(background_compaction_scheduled_);
#     682         [ -  + ]:        111 :   if (shutting_down_.load(std::memory_order_acquire)) {
#     683                 :            :     // No more background work when shutting down.
#     684         [ -  + ]:        111 :   } else if (!bg_error_.ok()) {
#     685                 :            :     // No more background work after a background error.
#     686                 :        111 :   } else {
#     687                 :        111 :     BackgroundCompaction();
#     688                 :        111 :   }
#     689                 :            : 
#     690                 :        111 :   background_compaction_scheduled_ = false;
#     691                 :            : 
#     692                 :            :   // Previous compaction may have produced too many files in a level,
#     693                 :            :   // so reschedule another compaction if needed.
#     694                 :        111 :   MaybeScheduleCompaction();
#     695                 :        111 :   background_work_finished_signal_.SignalAll();
#     696                 :        111 : }
#     697                 :            : 
#     698                 :        111 : void DBImpl::BackgroundCompaction() {
#     699                 :        111 :   mutex_.AssertHeld();
#     700                 :            : 
#     701         [ +  + ]:        111 :   if (imm_ != nullptr) {
#     702                 :          3 :     CompactMemTable();
#     703                 :          3 :     return;
#     704                 :          3 :   }
#     705                 :            : 
#     706                 :        108 :   Compaction* c;
#     707                 :        108 :   bool is_manual = (manual_compaction_ != nullptr);
#     708                 :        108 :   InternalKey manual_end;
#     709         [ -  + ]:        108 :   if (is_manual) {
#     710                 :          0 :     ManualCompaction* m = manual_compaction_;
#     711                 :          0 :     c = versions_->CompactRange(m->level, m->begin, m->end);
#     712                 :          0 :     m->done = (c == nullptr);
#     713         [ #  # ]:          0 :     if (c != nullptr) {
#     714                 :          0 :       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
#     715                 :          0 :     }
#     716                 :          0 :     Log(options_.info_log,
#     717                 :          0 :         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
#     718         [ #  # ]:          0 :         m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
#     719         [ #  # ]:          0 :         (m->end ? m->end->DebugString().c_str() : "(end)"),
#     720         [ #  # ]:          0 :         (m->done ? "(end)" : manual_end.DebugString().c_str()));
#     721                 :        108 :   } else {
#     722                 :        108 :     c = versions_->PickCompaction();
#     723                 :        108 :   }
#     724                 :            : 
#     725                 :        108 :   Status status;
#     726         [ -  + ]:        108 :   if (c == nullptr) {
#     727                 :            :     // Nothing to do
#     728 [ +  - ][ -  + ]:        108 :   } else if (!is_manual && c->IsTrivialMove()) {
#     729                 :            :     // Move file to next level
#     730                 :          0 :     assert(c->num_input_files(0) == 1);
#     731                 :          0 :     FileMetaData* f = c->input(0, 0);
#     732                 :          0 :     c->edit()->DeleteFile(c->level(), f->number);
#     733                 :          0 :     c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
#     734                 :          0 :                        f->largest);
#     735                 :          0 :     status = versions_->LogAndApply(c->edit(), &mutex_);
#     736         [ #  # ]:          0 :     if (!status.ok()) {
#     737                 :          0 :       RecordBackgroundError(status);
#     738                 :          0 :     }
#     739                 :          0 :     VersionSet::LevelSummaryStorage tmp;
#     740                 :          0 :     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
#     741                 :          0 :         static_cast<unsigned long long>(f->number), c->level() + 1,
#     742                 :          0 :         static_cast<unsigned long long>(f->file_size),
#     743                 :          0 :         status.ToString().c_str(), versions_->LevelSummary(&tmp));
#     744                 :        108 :   } else {
#     745                 :        108 :     CompactionState* compact = new CompactionState(c);
#     746                 :        108 :     status = DoCompactionWork(compact);
#     747         [ -  + ]:        108 :     if (!status.ok()) {
#     748                 :          0 :       RecordBackgroundError(status);
#     749                 :          0 :     }
#     750                 :        108 :     CleanupCompaction(compact);
#     751                 :        108 :     c->ReleaseInputs();
#     752                 :        108 :     DeleteObsoleteFiles();
#     753                 :        108 :   }
#     754                 :          0 :   delete c;
#     755                 :            : 
#     756         [ +  - ]:        108 :   if (status.ok()) {
#     757                 :            :     // Done
#     758         [ #  # ]:        108 :   } else if (shutting_down_.load(std::memory_order_acquire)) {
#     759                 :            :     // Ignore compaction errors found during shutting down
#     760                 :          0 :   } else {
#     761                 :          0 :     Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
#     762                 :          0 :   }
#     763                 :            : 
#     764         [ -  + ]:        108 :   if (is_manual) {
#     765                 :          0 :     ManualCompaction* m = manual_compaction_;
#     766         [ #  # ]:          0 :     if (!status.ok()) {
#     767                 :          0 :       m->done = true;
#     768                 :          0 :     }
#     769         [ #  # ]:          0 :     if (!m->done) {
#     770                 :            :       // We only compacted part of the requested range.  Update *m
#     771                 :            :       // to the range that is left to be compacted.
#     772                 :          0 :       m->tmp_storage = manual_end;
#     773                 :          0 :       m->begin = &m->tmp_storage;
#     774                 :          0 :     }
#     775                 :          0 :     manual_compaction_ = nullptr;
#     776                 :          0 :   }
#     777                 :        108 : }
#     778                 :            : 
#     779                 :        108 : void DBImpl::CleanupCompaction(CompactionState* compact) {
#     780                 :        108 :   mutex_.AssertHeld();
#     781         [ -  + ]:        108 :   if (compact->builder != nullptr) {
#     782                 :            :     // May happen if we get a shutdown call in the middle of compaction
#     783                 :          0 :     compact->builder->Abandon();
#     784                 :          0 :     delete compact->builder;
#     785                 :        108 :   } else {
#     786                 :        108 :     assert(compact->outfile == nullptr);
#     787                 :        108 :   }
#     788                 :          0 :   delete compact->outfile;
#     789         [ +  + ]:        216 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
#     790                 :        108 :     const CompactionState::Output& out = compact->outputs[i];
#     791                 :        108 :     pending_outputs_.erase(out.number);
#     792                 :        108 :   }
#     793                 :        108 :   delete compact;
#     794                 :        108 : }
#     795                 :            : 
#     796                 :        108 : Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
#     797                 :        108 :   assert(compact != nullptr);
#     798                 :          0 :   assert(compact->builder == nullptr);
#     799                 :          0 :   uint64_t file_number;
#     800                 :        108 :   {
#     801                 :        108 :     mutex_.Lock();
#     802                 :        108 :     file_number = versions_->NewFileNumber();
#     803                 :        108 :     pending_outputs_.insert(file_number);
#     804                 :        108 :     CompactionState::Output out;
#     805                 :        108 :     out.number = file_number;
#     806                 :        108 :     out.smallest.Clear();
#     807                 :        108 :     out.largest.Clear();
#     808                 :        108 :     compact->outputs.push_back(out);
#     809                 :        108 :     mutex_.Unlock();
#     810                 :        108 :   }
#     811                 :            : 
#     812                 :            :   // Make the output file
#     813                 :        108 :   std::string fname = TableFileName(dbname_, file_number);
#     814                 :        108 :   Status s = env_->NewWritableFile(fname, &compact->outfile);
#     815         [ +  - ]:        108 :   if (s.ok()) {
#     816                 :        108 :     compact->builder = new TableBuilder(options_, compact->outfile);
#     817                 :        108 :   }
#     818                 :        108 :   return s;
#     819                 :        108 : }
#     820                 :            : 
#     821                 :            : Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
#     822                 :        108 :                                           Iterator* input) {
#     823                 :        108 :   assert(compact != nullptr);
#     824                 :          0 :   assert(compact->outfile != nullptr);
#     825                 :          0 :   assert(compact->builder != nullptr);
#     826                 :            : 
#     827                 :          0 :   const uint64_t output_number = compact->current_output()->number;
#     828                 :        108 :   assert(output_number != 0);
#     829                 :            : 
#     830                 :            :   // Check for iterator errors
#     831                 :          0 :   Status s = input->status();
#     832                 :        108 :   const uint64_t current_entries = compact->builder->NumEntries();
#     833         [ +  - ]:        108 :   if (s.ok()) {
#     834                 :        108 :     s = compact->builder->Finish();
#     835                 :        108 :   } else {
#     836                 :          0 :     compact->builder->Abandon();
#     837                 :          0 :   }
#     838                 :        108 :   const uint64_t current_bytes = compact->builder->FileSize();
#     839                 :        108 :   compact->current_output()->file_size = current_bytes;
#     840                 :        108 :   compact->total_bytes += current_bytes;
#     841                 :        108 :   delete compact->builder;
#     842                 :        108 :   compact->builder = nullptr;
#     843                 :            : 
#     844                 :            :   // Finish and check for file errors
#     845         [ +  - ]:        108 :   if (s.ok()) {
#     846                 :        108 :     s = compact->outfile->Sync();
#     847                 :        108 :   }
#     848         [ +  - ]:        108 :   if (s.ok()) {
#     849                 :        108 :     s = compact->outfile->Close();
#     850                 :        108 :   }
#     851                 :        108 :   delete compact->outfile;
#     852                 :        108 :   compact->outfile = nullptr;
#     853                 :            : 
#     854 [ +  - ][ +  - ]:        108 :   if (s.ok() && current_entries > 0) {
#     855                 :            :     // Verify that the table is usable
#     856                 :        108 :     Iterator* iter =
#     857                 :        108 :         table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
#     858                 :        108 :     s = iter->status();
#     859                 :        108 :     delete iter;
#     860         [ +  - ]:        108 :     if (s.ok()) {
#     861                 :        108 :       Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
#     862                 :        108 :           (unsigned long long)output_number, compact->compaction->level(),
#     863                 :        108 :           (unsigned long long)current_entries,
#     864                 :        108 :           (unsigned long long)current_bytes);
#     865                 :        108 :     }
#     866                 :        108 :   }
#     867                 :        108 :   return s;
#     868                 :        108 : }
#     869                 :            : 
#     870                 :        108 : Status DBImpl::InstallCompactionResults(CompactionState* compact) {
#     871                 :        108 :   mutex_.AssertHeld();
#     872                 :        108 :   Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
#     873                 :        108 :       compact->compaction->num_input_files(0), compact->compaction->level(),
#     874                 :        108 :       compact->compaction->num_input_files(1), compact->compaction->level() + 1,
#     875                 :        108 :       static_cast<long long>(compact->total_bytes));
#     876                 :            : 
#     877                 :            :   // Add compaction outputs
#     878                 :        108 :   compact->compaction->AddInputDeletions(compact->compaction->edit());
#     879                 :        108 :   const int level = compact->compaction->level();
#     880         [ +  + ]:        216 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
#     881                 :        108 :     const CompactionState::Output& out = compact->outputs[i];
#     882                 :        108 :     compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
#     883                 :        108 :                                          out.smallest, out.largest);
#     884                 :        108 :   }
#     885                 :        108 :   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
#     886                 :        108 : }
#     887                 :            : 
#     888                 :        108 : Status DBImpl::DoCompactionWork(CompactionState* compact) {
#     889                 :        108 :   const uint64_t start_micros = env_->NowMicros();
#     890                 :        108 :   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
#     891                 :            : 
#     892                 :        108 :   Log(options_.info_log, "Compacting %d@%d + %d@%d files",
#     893                 :        108 :       compact->compaction->num_input_files(0), compact->compaction->level(),
#     894                 :        108 :       compact->compaction->num_input_files(1),
#     895                 :        108 :       compact->compaction->level() + 1);
#     896                 :            : 
#     897                 :        108 :   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
#     898                 :          0 :   assert(compact->builder == nullptr);
#     899                 :          0 :   assert(compact->outfile == nullptr);
#     900         [ +  - ]:        108 :   if (snapshots_.empty()) {
#     901                 :        108 :     compact->smallest_snapshot = versions_->LastSequence();
#     902                 :        108 :   } else {
#     903                 :          0 :     compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
#     904                 :          0 :   }
#     905                 :            : 
#     906                 :        108 :   Iterator* input = versions_->MakeInputIterator(compact->compaction);
#     907                 :            : 
#     908                 :            :   // Release mutex while we're actually doing the compaction work
#     909                 :        108 :   mutex_.Unlock();
#     910                 :            : 
#     911                 :        108 :   input->SeekToFirst();
#     912                 :        108 :   Status status;
#     913                 :        108 :   ParsedInternalKey ikey;
#     914                 :        108 :   std::string current_user_key;
#     915                 :        108 :   bool has_current_user_key = false;
#     916                 :        108 :   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
#     917 [ +  + ][ +  - ]:      35480 :   while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
#     918                 :            :     // Prioritize immutable compaction work
#     919         [ -  + ]:      35372 :     if (has_imm_.load(std::memory_order_relaxed)) {
#     920                 :          0 :       const uint64_t imm_start = env_->NowMicros();
#     921                 :          0 :       mutex_.Lock();
#     922         [ #  # ]:          0 :       if (imm_ != nullptr) {
#     923                 :          0 :         CompactMemTable();
#     924                 :            :         // Wake up MakeRoomForWrite() if necessary.
#     925                 :          0 :         background_work_finished_signal_.SignalAll();
#     926                 :          0 :       }
#     927                 :          0 :       mutex_.Unlock();
#     928                 :          0 :       imm_micros += (env_->NowMicros() - imm_start);
#     929                 :          0 :     }
#     930                 :            : 
#     931                 :      35372 :     Slice key = input->key();
#     932         [ -  + ]:      35372 :     if (compact->compaction->ShouldStopBefore(key) &&
#     933         [ #  # ]:      35372 :         compact->builder != nullptr) {
#     934                 :          0 :       status = FinishCompactionOutputFile(compact, input);
#     935         [ #  # ]:          0 :       if (!status.ok()) {
#     936                 :          0 :         break;
#     937                 :          0 :       }
#     938                 :          0 :     }
#     939                 :            : 
#     940                 :            :     // Handle key/value, add to state, etc.
#     941                 :      35372 :     bool drop = false;
#     942         [ -  + ]:      35372 :     if (!ParseInternalKey(key, &ikey)) {
#     943                 :            :       // Do not hide error keys
#     944                 :          0 :       current_user_key.clear();
#     945                 :          0 :       has_current_user_key = false;
#     946                 :          0 :       last_sequence_for_key = kMaxSequenceNumber;
#     947                 :      35372 :     } else {
#     948 [ +  + ][ +  + ]:      35372 :       if (!has_current_user_key ||
#     949         [ +  + ]:      35372 :           user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
#     950                 :      35264 :               0) {
#     951                 :            :         // First occurrence of this user key
#     952                 :      32359 :         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
#     953                 :      32359 :         has_current_user_key = true;
#     954                 :      32359 :         last_sequence_for_key = kMaxSequenceNumber;
#     955                 :      32359 :       }
#     956                 :            : 
#     957         [ +  + ]:      35372 :       if (last_sequence_for_key <= compact->smallest_snapshot) {
#     958                 :            :         // Hidden by an newer entry for same user key
#     959                 :       3013 :         drop = true;  // (A)
#     960         [ +  + ]:      32359 :       } else if (ikey.type == kTypeDeletion &&
#     961         [ +  - ]:      32359 :                  ikey.sequence <= compact->smallest_snapshot &&
#     962         [ +  - ]:      32359 :                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
#     963                 :            :         // For this user key:
#     964                 :            :         // (1) there is no data in higher levels
#     965                 :            :         // (2) data in lower levels will have larger sequence numbers
#     966                 :            :         // (3) data in layers that are being compacted here and have
#     967                 :            :         //     smaller sequence numbers will be dropped in the next
#     968                 :            :         //     few iterations of this loop (by rule (A) above).
#     969                 :            :         // Therefore this deletion marker is obsolete and can be dropped.
#     970                 :        180 :         drop = true;
#     971                 :        180 :       }
#     972                 :            : 
#     973                 :      35372 :       last_sequence_for_key = ikey.sequence;
#     974                 :      35372 :     }
#     975                 :            : #if 0
#     976                 :            :     Log(options_.info_log,
#     977                 :            :         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
#     978                 :            :         "%d smallest_snapshot: %d",
#     979                 :            :         ikey.user_key.ToString().c_str(),
#     980                 :            :         (int)ikey.sequence, ikey.type, kTypeValue, drop,
#     981                 :            :         compact->compaction->IsBaseLevelForKey(ikey.user_key),
#     982                 :            :         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
#     983                 :            : #endif
#     984                 :            : 
#     985         [ +  + ]:      35372 :     if (!drop) {
#     986                 :            :       // Open output file if necessary
#     987         [ +  + ]:      32179 :       if (compact->builder == nullptr) {
#     988                 :        108 :         status = OpenCompactionOutputFile(compact);
#     989         [ -  + ]:        108 :         if (!status.ok()) {
#     990                 :          0 :           break;
#     991                 :          0 :         }
#     992                 :        108 :       }
#     993         [ +  + ]:      32179 :       if (compact->builder->NumEntries() == 0) {
#     994                 :        108 :         compact->current_output()->smallest.DecodeFrom(key);
#     995                 :        108 :       }
#     996                 :      32179 :       compact->current_output()->largest.DecodeFrom(key);
#     997                 :      32179 :       compact->builder->Add(key, input->value());
#     998                 :            : 
#     999                 :            :       // Close output file if it is big enough
#    1000         [ -  + ]:      32179 :       if (compact->builder->FileSize() >=
#    1001                 :      32179 :           compact->compaction->MaxOutputFileSize()) {
#    1002                 :          0 :         status = FinishCompactionOutputFile(compact, input);
#    1003         [ #  # ]:          0 :         if (!status.ok()) {
#    1004                 :          0 :           break;
#    1005                 :          0 :         }
#    1006                 :          0 :       }
#    1007                 :      32179 :     }
#    1008                 :            : 
#    1009                 :      35372 :     input->Next();
#    1010                 :      35372 :   }
#    1011                 :            : 
#    1012 [ +  - ][ -  + ]:        108 :   if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
#    1013                 :          0 :     status = Status::IOError("Deleting DB during compaction");
#    1014                 :          0 :   }
#    1015 [ +  - ][ +  - ]:        108 :   if (status.ok() && compact->builder != nullptr) {
#    1016                 :        108 :     status = FinishCompactionOutputFile(compact, input);
#    1017                 :        108 :   }
#    1018         [ +  - ]:        108 :   if (status.ok()) {
#    1019                 :        108 :     status = input->status();
#    1020                 :        108 :   }
#    1021                 :        108 :   delete input;
#    1022                 :        108 :   input = nullptr;
#    1023                 :            : 
#    1024                 :        108 :   CompactionStats stats;
#    1025                 :        108 :   stats.micros = env_->NowMicros() - start_micros - imm_micros;
#    1026         [ +  + ]:        324 :   for (int which = 0; which < 2; which++) {
#    1027         [ +  + ]:        637 :     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
#    1028                 :        421 :       stats.bytes_read += compact->compaction->input(which, i)->file_size;
#    1029                 :        421 :     }
#    1030                 :        216 :   }
#    1031         [ +  + ]:        216 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
#    1032                 :        108 :     stats.bytes_written += compact->outputs[i].file_size;
#    1033                 :        108 :   }
#    1034                 :            : 
#    1035                 :        108 :   mutex_.Lock();
#    1036                 :        108 :   stats_[compact->compaction->level() + 1].Add(stats);
#    1037                 :            : 
#    1038         [ +  - ]:        108 :   if (status.ok()) {
#    1039                 :        108 :     status = InstallCompactionResults(compact);
#    1040                 :        108 :   }
#    1041         [ -  + ]:        108 :   if (!status.ok()) {
#    1042                 :          0 :     RecordBackgroundError(status);
#    1043                 :          0 :   }
#    1044                 :        108 :   VersionSet::LevelSummaryStorage tmp;
#    1045                 :        108 :   Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
#    1046                 :        108 :   return status;
#    1047                 :        108 : }
#    1048                 :            : 
#    1049                 :            : namespace {
#    1050                 :            : 
#    1051                 :            : struct IterState {
#    1052                 :            :   port::Mutex* const mu;
#    1053                 :            :   Version* const version GUARDED_BY(mu);
#    1054                 :            :   MemTable* const mem GUARDED_BY(mu);
#    1055                 :            :   MemTable* const imm GUARDED_BY(mu);
#    1056                 :            : 
#    1057                 :            :   IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
#    1058                 :       3379 :       : mu(mutex), version(version), mem(mem), imm(imm) {}
#    1059                 :            : };
#    1060                 :            : 
#    1061                 :       3379 : static void CleanupIteratorState(void* arg1, void* arg2) {
#    1062                 :       3379 :   IterState* state = reinterpret_cast<IterState*>(arg1);
#    1063                 :       3379 :   state->mu->Lock();
#    1064                 :       3379 :   state->mem->Unref();
#    1065         [ -  + ]:       3379 :   if (state->imm != nullptr) state->imm->Unref();
#    1066                 :       3379 :   state->version->Unref();
#    1067                 :       3379 :   state->mu->Unlock();
#    1068                 :       3379 :   delete state;
#    1069                 :       3379 : }
#    1070                 :            : 
#    1071                 :            : }  // anonymous namespace
#    1072                 :            : 
#    1073                 :            : Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
#    1074                 :            :                                       SequenceNumber* latest_snapshot,
#    1075                 :       3379 :                                       uint32_t* seed) {
#    1076                 :       3379 :   mutex_.Lock();
#    1077                 :       3379 :   *latest_snapshot = versions_->LastSequence();
#    1078                 :            : 
#    1079                 :            :   // Collect together all needed child iterators
#    1080                 :       3379 :   std::vector<Iterator*> list;
#    1081                 :       3379 :   list.push_back(mem_->NewIterator());
#    1082                 :       3379 :   mem_->Ref();
#    1083         [ -  + ]:       3379 :   if (imm_ != nullptr) {
#    1084                 :          0 :     list.push_back(imm_->NewIterator());
#    1085                 :          0 :     imm_->Ref();
#    1086                 :          0 :   }
#    1087                 :       3379 :   versions_->current()->AddIterators(options, &list);
#    1088                 :       3379 :   Iterator* internal_iter =
#    1089                 :       3379 :       NewMergingIterator(&internal_comparator_, &list[0], list.size());
#    1090                 :       3379 :   versions_->current()->Ref();
#    1091                 :            : 
#    1092                 :       3379 :   IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
#    1093                 :       3379 :   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
#    1094                 :            : 
#    1095                 :       3379 :   *seed = ++seed_;
#    1096                 :       3379 :   mutex_.Unlock();
#    1097                 :       3379 :   return internal_iter;
#    1098                 :       3379 : }
#    1099                 :            : 
#    1100                 :          0 : Iterator* DBImpl::TEST_NewInternalIterator() {
#    1101                 :          0 :   SequenceNumber ignored;
#    1102                 :          0 :   uint32_t ignored_seed;
#    1103                 :          0 :   return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
#    1104                 :          0 : }
#    1105                 :            : 
#    1106                 :          0 : int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
#    1107                 :          0 :   MutexLock l(&mutex_);
#    1108                 :          0 :   return versions_->MaxNextLevelOverlappingBytes();
#    1109                 :          0 : }
#    1110                 :            : 
#    1111                 :            : Status DBImpl::Get(const ReadOptions& options, const Slice& key,
#    1112                 :   10083933 :                    std::string* value) {
#    1113                 :   10083933 :   Status s;
#    1114                 :   10083933 :   MutexLock l(&mutex_);
#    1115                 :   10083933 :   SequenceNumber snapshot;
#    1116         [ -  + ]:   10083933 :   if (options.snapshot != nullptr) {
#    1117                 :          0 :     snapshot =
#    1118                 :          0 :         static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
#    1119                 :   10083933 :   } else {
#    1120                 :   10083933 :     snapshot = versions_->LastSequence();
#    1121                 :   10083933 :   }
#    1122                 :            : 
#    1123                 :   10083933 :   MemTable* mem = mem_;
#    1124                 :   10083933 :   MemTable* imm = imm_;
#    1125                 :   10083933 :   Version* current = versions_->current();
#    1126                 :   10083933 :   mem->Ref();
#    1127         [ +  + ]:   10083933 :   if (imm != nullptr) imm->Ref();
#    1128                 :   10083933 :   current->Ref();
#    1129                 :            : 
#    1130                 :   10083933 :   bool have_stat_update = false;
#    1131                 :   10083933 :   Version::GetStats stats;
#    1132                 :            : 
#    1133                 :            :   // Unlock while reading from files and memtables
#    1134                 :   10083933 :   {
#    1135                 :   10083933 :     mutex_.Unlock();
#    1136                 :            :     // First look in the memtable, then in the immutable memtable (if any).
#    1137                 :   10083933 :     LookupKey lkey(key, snapshot);
#    1138         [ +  + ]:   10083933 :     if (mem->Get(lkey, value, &s)) {
#    1139                 :            :       // Done
#    1140 [ +  + ][ +  + ]:    9723626 :     } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
#    1141                 :            :       // Done
#    1142                 :    9723156 :     } else {
#    1143                 :    9723156 :       s = current->Get(options, lkey, value, &stats);
#    1144                 :    9723156 :       have_stat_update = true;
#    1145                 :    9723156 :     }
#    1146                 :   10083933 :     mutex_.Lock();
#    1147                 :   10083933 :   }
#    1148                 :            : 
#    1149 [ +  + ][ +  + ]:   10083933 :   if (have_stat_update && current->UpdateStats(stats)) {
#    1150                 :         24 :     MaybeScheduleCompaction();
#    1151                 :         24 :   }
#    1152                 :   10083933 :   mem->Unref();
#    1153         [ +  + ]:   10083933 :   if (imm != nullptr) imm->Unref();
#    1154                 :   10083933 :   current->Unref();
#    1155                 :   10083933 :   return s;
#    1156                 :   10083933 : }
#    1157                 :            : 
#    1158                 :       3379 : Iterator* DBImpl::NewIterator(const ReadOptions& options) {
#    1159                 :       3379 :   SequenceNumber latest_snapshot;
#    1160                 :       3379 :   uint32_t seed;
#    1161                 :       3379 :   Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
#    1162                 :       3379 :   return NewDBIterator(this, user_comparator(), iter,
#    1163         [ -  + ]:       3379 :                        (options.snapshot != nullptr
#    1164                 :       3379 :                             ? static_cast<const SnapshotImpl*>(options.snapshot)
#    1165                 :          0 :                                   ->sequence_number()
#    1166                 :       3379 :                             : latest_snapshot),
#    1167                 :       3379 :                        seed);
#    1168                 :       3379 : }
#    1169                 :            : 
#    1170                 :        333 : void DBImpl::RecordReadSample(Slice key) {
#    1171                 :        333 :   MutexLock l(&mutex_);
#    1172         [ -  + ]:        333 :   if (versions_->current()->RecordReadSample(key)) {
#    1173                 :          0 :     MaybeScheduleCompaction();
#    1174                 :          0 :   }
#    1175                 :        333 : }
#    1176                 :            : 
#    1177                 :          0 : const Snapshot* DBImpl::GetSnapshot() {
#    1178                 :          0 :   MutexLock l(&mutex_);
#    1179                 :          0 :   return snapshots_.New(versions_->LastSequence());
#    1180                 :          0 : }
#    1181                 :            : 
#    1182                 :          0 : void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
#    1183                 :          0 :   MutexLock l(&mutex_);
#    1184                 :          0 :   snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
#    1185                 :          0 : }
#    1186                 :            : 
#    1187                 :            : // Convenience methods
#    1188                 :          0 : Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
#    1189                 :          0 :   return DB::Put(o, key, val);
#    1190                 :          0 : }
#    1191                 :            : 
#    1192                 :          0 : Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
#    1193                 :          0 :   return DB::Delete(options, key);
#    1194                 :          0 : }
#    1195                 :            : 
#    1196                 :      13678 : Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
#    1197                 :      13678 :   Writer w(&mutex_);
#    1198                 :      13678 :   w.batch = updates;
#    1199                 :      13678 :   w.sync = options.sync;
#    1200                 :      13678 :   w.done = false;
#    1201                 :            : 
#    1202                 :      13678 :   MutexLock l(&mutex_);
#    1203                 :      13678 :   writers_.push_back(&w);
#    1204 [ +  - ][ -  + ]:      13678 :   while (!w.done && &w != writers_.front()) {
#    1205                 :          0 :     w.cv.Wait();
#    1206                 :          0 :   }
#    1207         [ -  + ]:      13678 :   if (w.done) {
#    1208                 :          0 :     return w.status;
#    1209                 :          0 :   }
#    1210                 :            : 
#    1211                 :            :   // May temporarily unlock and wait.
#    1212                 :      13678 :   Status status = MakeRoomForWrite(updates == nullptr);
#    1213                 :      13678 :   uint64_t last_sequence = versions_->LastSequence();
#    1214                 :      13678 :   Writer* last_writer = &w;
#    1215 [ +  - ][ +  - ]:      13678 :   if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
#    1216                 :      13678 :     WriteBatch* write_batch = BuildBatchGroup(&last_writer);
#    1217                 :      13678 :     WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
#    1218                 :      13678 :     last_sequence += WriteBatchInternal::Count(write_batch);
#    1219                 :            : 
#    1220                 :            :     // Add to log and apply to memtable.  We can release the lock
#    1221                 :            :     // during this phase since &w is currently responsible for logging
#    1222                 :            :     // and protects against concurrent loggers and concurrent writes
#    1223                 :            :     // into mem_.
#    1224                 :      13678 :     {
#    1225                 :      13678 :       mutex_.Unlock();
#    1226                 :      13678 :       status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
#    1227                 :      13678 :       bool sync_error = false;
#    1228 [ +  - ][ +  + ]:      13678 :       if (status.ok() && options.sync) {
#    1229                 :       1623 :         status = logfile_->Sync();
#    1230         [ -  + ]:       1623 :         if (!status.ok()) {
#    1231                 :          0 :           sync_error = true;
#    1232                 :          0 :         }
#    1233                 :       1623 :       }
#    1234         [ +  - ]:      13678 :       if (status.ok()) {
#    1235                 :      13678 :         status = WriteBatchInternal::InsertInto(write_batch, mem_);
#    1236                 :      13678 :       }
#    1237                 :      13678 :       mutex_.Lock();
#    1238         [ -  + ]:      13678 :       if (sync_error) {
#    1239                 :            :         // The state of the log file is indeterminate: the log record we
#    1240                 :            :         // just added may or may not show up when the DB is re-opened.
#    1241                 :            :         // So we force the DB into a mode where all future writes fail.
#    1242                 :          0 :         RecordBackgroundError(status);
#    1243                 :          0 :       }
#    1244                 :      13678 :     }
#    1245         [ -  + ]:      13678 :     if (write_batch == tmp_batch_) tmp_batch_->Clear();
#    1246                 :            : 
#    1247                 :      13678 :     versions_->SetLastSequence(last_sequence);
#    1248                 :      13678 :   }
#    1249                 :            : 
#    1250                 :      13678 :   while (true) {
#    1251                 :      13678 :     Writer* ready = writers_.front();
#    1252                 :      13678 :     writers_.pop_front();
#    1253         [ -  + ]:      13678 :     if (ready != &w) {
#    1254                 :          0 :       ready->status = status;
#    1255                 :          0 :       ready->done = true;
#    1256                 :          0 :       ready->cv.Signal();
#    1257                 :          0 :     }
#    1258         [ +  - ]:      13678 :     if (ready == last_writer) break;
#    1259                 :      13678 :   }
#    1260                 :            : 
#    1261                 :            :   // Notify new head of write queue
#    1262         [ -  + ]:      13678 :   if (!writers_.empty()) {
#    1263                 :          0 :     writers_.front()->cv.Signal();
#    1264                 :          0 :   }
#    1265                 :            : 
#    1266                 :      13678 :   return status;
#    1267                 :      13678 : }
#    1268                 :            : 
#    1269                 :            : // REQUIRES: Writer list must be non-empty
#    1270                 :            : // REQUIRES: First writer must have a non-null batch
#    1271                 :      13678 : WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
#    1272                 :      13678 :   mutex_.AssertHeld();
#    1273                 :      13678 :   assert(!writers_.empty());
#    1274                 :          0 :   Writer* first = writers_.front();
#    1275                 :      13678 :   WriteBatch* result = first->batch;
#    1276                 :      13678 :   assert(result != nullptr);
#    1277                 :            : 
#    1278                 :          0 :   size_t size = WriteBatchInternal::ByteSize(first->batch);
#    1279                 :            : 
#    1280                 :            :   // Allow the group to grow up to a maximum size, but if the
#    1281                 :            :   // original write is small, limit the growth so we do not slow
#    1282                 :            :   // down the small write too much.
#    1283                 :      13678 :   size_t max_size = 1 << 20;
#    1284         [ +  + ]:      13678 :   if (size <= (128 << 10)) {
#    1285                 :      13651 :     max_size = size + (128 << 10);
#    1286                 :      13651 :   }
#    1287                 :            : 
#    1288                 :      13678 :   *last_writer = first;
#    1289                 :      13678 :   std::deque<Writer*>::iterator iter = writers_.begin();
#    1290                 :      13678 :   ++iter;  // Advance past "first"
#    1291         [ -  + ]:      13678 :   for (; iter != writers_.end(); ++iter) {
#    1292                 :          0 :     Writer* w = *iter;
#    1293 [ #  # ][ #  # ]:          0 :     if (w->sync && !first->sync) {
#    1294                 :            :       // Do not include a sync write into a batch handled by a non-sync write.
#    1295                 :          0 :       break;
#    1296                 :          0 :     }
#    1297                 :            : 
#    1298         [ #  # ]:          0 :     if (w->batch != nullptr) {
#    1299                 :          0 :       size += WriteBatchInternal::ByteSize(w->batch);
#    1300         [ #  # ]:          0 :       if (size > max_size) {
#    1301                 :            :         // Do not make batch too big
#    1302                 :          0 :         break;
#    1303                 :          0 :       }
#    1304                 :            : 
#    1305                 :            :       // Append to *result
#    1306         [ #  # ]:          0 :       if (result == first->batch) {
#    1307                 :            :         // Switch to temporary batch instead of disturbing caller's batch
#    1308                 :          0 :         result = tmp_batch_;
#    1309                 :          0 :         assert(WriteBatchInternal::Count(result) == 0);
#    1310                 :          0 :         WriteBatchInternal::Append(result, first->batch);
#    1311                 :          0 :       }
#    1312                 :          0 :       WriteBatchInternal::Append(result, w->batch);
#    1313                 :          0 :     }
#    1314                 :          0 :     *last_writer = w;
#    1315                 :          0 :   }
#    1316                 :      13678 :   return result;
#    1317                 :      13678 : }
#    1318                 :            : 
#    1319                 :            : // REQUIRES: mutex_ is held
#    1320                 :            : // REQUIRES: this thread is currently at the front of the writer queue
#    1321                 :      13678 : Status DBImpl::MakeRoomForWrite(bool force) {
#    1322                 :      13678 :   mutex_.AssertHeld();
#    1323                 :      13678 :   assert(!writers_.empty());
#    1324                 :          0 :   bool allow_delay = !force;
#    1325                 :      13678 :   Status s;
#    1326                 :      13681 :   while (true) {
#    1327         [ -  + ]:      13681 :     if (!bg_error_.ok()) {
#    1328                 :            :       // Yield previous error
#    1329                 :          0 :       s = bg_error_;
#    1330                 :          0 :       break;
#    1331 [ +  - ][ -  + ]:      13681 :     } else if (allow_delay && versions_->NumLevelFiles(0) >=
#    1332                 :      13681 :                                   config::kL0_SlowdownWritesTrigger) {
#    1333                 :            :       // We are getting close to hitting a hard limit on the number of
#    1334                 :            :       // L0 files.  Rather than delaying a single write by several
#    1335                 :            :       // seconds when we hit the hard limit, start delaying each
#    1336                 :            :       // individual write by 1ms to reduce latency variance.  Also,
#    1337                 :            :       // this delay hands over some CPU to the compaction thread in
#    1338                 :            :       // case it is sharing the same core as the writer.
#    1339                 :          0 :       mutex_.Unlock();
#    1340                 :          0 :       env_->SleepForMicroseconds(1000);
#    1341                 :          0 :       allow_delay = false;  // Do not delay a single write more than once
#    1342                 :          0 :       mutex_.Lock();
#    1343         [ +  - ]:      13681 :     } else if (!force &&
#    1344         [ +  + ]:      13681 :                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
#    1345                 :            :       // There is room in current memtable
#    1346                 :      13678 :       break;
#    1347         [ -  + ]:      13678 :     } else if (imm_ != nullptr) {
#    1348                 :            :       // We have filled up the current memtable, but the previous
#    1349                 :            :       // one is still being compacted, so we wait.
#    1350                 :          0 :       Log(options_.info_log, "Current memtable full; waiting...\n");
#    1351                 :          0 :       background_work_finished_signal_.Wait();
#    1352         [ -  + ]:          3 :     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
#    1353                 :            :       // There are too many level-0 files.
#    1354                 :          0 :       Log(options_.info_log, "Too many L0 files; waiting...\n");
#    1355                 :          0 :       background_work_finished_signal_.Wait();
#    1356                 :          3 :     } else {
#    1357                 :            :       // Attempt to switch to a new memtable and trigger compaction of old
#    1358                 :          3 :       assert(versions_->PrevLogNumber() == 0);
#    1359                 :          0 :       uint64_t new_log_number = versions_->NewFileNumber();
#    1360                 :          3 :       WritableFile* lfile = nullptr;
#    1361                 :          3 :       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
#    1362         [ -  + ]:          3 :       if (!s.ok()) {
#    1363                 :            :         // Avoid chewing through file number space in a tight loop.
#    1364                 :          0 :         versions_->ReuseFileNumber(new_log_number);
#    1365                 :          0 :         break;
#    1366                 :          0 :       }
#    1367                 :          3 :       delete log_;
#    1368                 :          3 :       delete logfile_;
#    1369                 :          3 :       logfile_ = lfile;
#    1370                 :          3 :       logfile_number_ = new_log_number;
#    1371                 :          3 :       log_ = new log::Writer(lfile);
#    1372                 :          3 :       imm_ = mem_;
#    1373                 :          3 :       has_imm_.store(true, std::memory_order_release);
#    1374                 :          3 :       mem_ = new MemTable(internal_comparator_);
#    1375                 :          3 :       mem_->Ref();
#    1376                 :          3 :       force = false;  // Do not force another compaction if have room
#    1377                 :          3 :       MaybeScheduleCompaction();
#    1378                 :          3 :     }
#    1379                 :      13681 :   }
#    1380                 :      13678 :   return s;
#    1381                 :      13678 : }
#    1382                 :            : 
#    1383                 :          0 : bool DBImpl::GetProperty(const Slice& property, std::string* value) {
#    1384                 :          0 :   value->clear();
#    1385                 :            : 
#    1386                 :          0 :   MutexLock l(&mutex_);
#    1387                 :          0 :   Slice in = property;
#    1388                 :          0 :   Slice prefix("leveldb.");
#    1389         [ #  # ]:          0 :   if (!in.starts_with(prefix)) return false;
#    1390                 :          0 :   in.remove_prefix(prefix.size());
#    1391                 :            : 
#    1392         [ #  # ]:          0 :   if (in.starts_with("num-files-at-level")) {
#    1393                 :          0 :     in.remove_prefix(strlen("num-files-at-level"));
#    1394                 :          0 :     uint64_t level;
#    1395 [ #  # ][ #  # ]:          0 :     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
#    1396 [ #  # ][ #  # ]:          0 :     if (!ok || level >= config::kNumLevels) {
#    1397                 :          0 :       return false;
#    1398                 :          0 :     } else {
#    1399                 :          0 :       char buf[100];
#    1400                 :          0 :       snprintf(buf, sizeof(buf), "%d",
#    1401                 :          0 :                versions_->NumLevelFiles(static_cast<int>(level)));
#    1402                 :          0 :       *value = buf;
#    1403                 :          0 :       return true;
#    1404                 :          0 :     }
#    1405         [ #  # ]:          0 :   } else if (in == "stats") {
#    1406                 :          0 :     char buf[200];
#    1407                 :          0 :     snprintf(buf, sizeof(buf),
#    1408                 :          0 :              "                               Compactions\n"
#    1409                 :          0 :              "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
#    1410                 :          0 :              "--------------------------------------------------\n");
#    1411                 :          0 :     value->append(buf);
#    1412         [ #  # ]:          0 :     for (int level = 0; level < config::kNumLevels; level++) {
#    1413                 :          0 :       int files = versions_->NumLevelFiles(level);
#    1414 [ #  # ][ #  # ]:          0 :       if (stats_[level].micros > 0 || files > 0) {
#    1415                 :          0 :         snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
#    1416                 :          0 :                  files, versions_->NumLevelBytes(level) / 1048576.0,
#    1417                 :          0 :                  stats_[level].micros / 1e6,
#    1418                 :          0 :                  stats_[level].bytes_read / 1048576.0,
#    1419                 :          0 :                  stats_[level].bytes_written / 1048576.0);
#    1420                 :          0 :         value->append(buf);
#    1421                 :          0 :       }
#    1422                 :          0 :     }
#    1423                 :          0 :     return true;
#    1424         [ #  # ]:          0 :   } else if (in == "sstables") {
#    1425                 :          0 :     *value = versions_->current()->DebugString();
#    1426                 :          0 :     return true;
#    1427         [ #  # ]:          0 :   } else if (in == "approximate-memory-usage") {
#    1428                 :          0 :     size_t total_usage = options_.block_cache->TotalCharge();
#    1429         [ #  # ]:          0 :     if (mem_) {
#    1430                 :          0 :       total_usage += mem_->ApproximateMemoryUsage();
#    1431                 :          0 :     }
#    1432         [ #  # ]:          0 :     if (imm_) {
#    1433                 :          0 :       total_usage += imm_->ApproximateMemoryUsage();
#    1434                 :          0 :     }
#    1435                 :          0 :     char buf[50];
#    1436                 :          0 :     snprintf(buf, sizeof(buf), "%llu",
#    1437                 :          0 :              static_cast<unsigned long long>(total_usage));
#    1438                 :          0 :     value->append(buf);
#    1439                 :          0 :     return true;
#    1440                 :          0 :   }
#    1441                 :            : 
#    1442                 :          0 :   return false;
#    1443                 :          0 : }
#    1444                 :            : 
#    1445                 :         27 : void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
#    1446                 :            :   // TODO(opt): better implementation
#    1447                 :         27 :   MutexLock l(&mutex_);
#    1448                 :         27 :   Version* v = versions_->current();
#    1449                 :         27 :   v->Ref();
#    1450                 :            : 
#    1451         [ +  + ]:         54 :   for (int i = 0; i < n; i++) {
#    1452                 :            :     // Convert user_key into a corresponding internal key.
#    1453                 :         27 :     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
#    1454                 :         27 :     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
#    1455                 :         27 :     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
#    1456                 :         27 :     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
#    1457         [ +  - ]:         27 :     sizes[i] = (limit >= start ? limit - start : 0);
#    1458                 :         27 :   }
#    1459                 :            : 
#    1460                 :         27 :   v->Unref();
#    1461                 :         27 : }
#    1462                 :            : 
#    1463                 :            : // Default implementations of convenience methods that subclasses of DB
#    1464                 :            : // can call if they wish
#    1465                 :          0 : Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
#    1466                 :          0 :   WriteBatch batch;
#    1467                 :          0 :   batch.Put(key, value);
#    1468                 :          0 :   return Write(opt, &batch);
#    1469                 :          0 : }
#    1470                 :            : 
#    1471                 :          0 : Status DB::Delete(const WriteOptions& opt, const Slice& key) {
#    1472                 :          0 :   WriteBatch batch;
#    1473                 :          0 :   batch.Delete(key);
#    1474                 :          0 :   return Write(opt, &batch);
#    1475                 :          0 : }
#    1476                 :            : 
#    1477                 :       2193 : DB::~DB() = default;
#    1478                 :            : 
#    1479                 :       2193 : Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
#    1480                 :       2193 :   *dbptr = nullptr;
#    1481                 :            : 
#    1482                 :       2193 :   DBImpl* impl = new DBImpl(options, dbname);
#    1483                 :       2193 :   impl->mutex_.Lock();
#    1484                 :       2193 :   VersionEdit edit;
#    1485                 :            :   // Recover handles create_if_missing, error_if_exists
#    1486                 :       2193 :   bool save_manifest = false;
#    1487                 :       2193 :   Status s = impl->Recover(&edit, &save_manifest);
#    1488 [ +  + ][ +  - ]:       2193 :   if (s.ok() && impl->mem_ == nullptr) {
#    1489                 :            :     // Create new log and a corresponding memtable.
#    1490                 :       2191 :     uint64_t new_log_number = impl->versions_->NewFileNumber();
#    1491                 :       2191 :     WritableFile* lfile;
#    1492                 :       2191 :     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
#    1493                 :       2191 :                                      &lfile);
#    1494         [ +  - ]:       2191 :     if (s.ok()) {
#    1495                 :       2191 :       edit.SetLogNumber(new_log_number);
#    1496                 :       2191 :       impl->logfile_ = lfile;
#    1497                 :       2191 :       impl->logfile_number_ = new_log_number;
#    1498                 :       2191 :       impl->log_ = new log::Writer(lfile);
#    1499                 :       2191 :       impl->mem_ = new MemTable(impl->internal_comparator_);
#    1500                 :       2191 :       impl->mem_->Ref();
#    1501                 :       2191 :     }
#    1502                 :       2191 :   }
#    1503 [ +  + ][ +  - ]:       2193 :   if (s.ok() && save_manifest) {
#    1504                 :       2191 :     edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
#    1505                 :       2191 :     edit.SetLogNumber(impl->logfile_number_);
#    1506                 :       2191 :     s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
#    1507                 :       2191 :   }
#    1508         [ +  + ]:       2193 :   if (s.ok()) {
#    1509                 :       2191 :     impl->DeleteObsoleteFiles();
#    1510                 :       2191 :     impl->MaybeScheduleCompaction();
#    1511                 :       2191 :   }
#    1512                 :       2193 :   impl->mutex_.Unlock();
#    1513         [ +  + ]:       2193 :   if (s.ok()) {
#    1514                 :       2191 :     assert(impl->mem_ != nullptr);
#    1515                 :          0 :     *dbptr = impl;
#    1516                 :       2191 :   } else {
#    1517                 :          2 :     delete impl;
#    1518                 :          2 :   }
#    1519                 :          0 :   return s;
#    1520                 :       2193 : }
#    1521                 :            : 
#    1522                 :       2193 : Snapshot::~Snapshot() = default;
#    1523                 :            : 
#    1524                 :         29 : Status DestroyDB(const std::string& dbname, const Options& options) {
#    1525                 :         29 :   Env* env = options.env;
#    1526                 :         29 :   std::vector<std::string> filenames;
#    1527                 :         29 :   Status result = env->GetChildren(dbname, &filenames);
#    1528         [ +  + ]:         29 :   if (!result.ok()) {
#    1529                 :            :     // Ignore error in case directory does not exist
#    1530                 :          4 :     return Status::OK();
#    1531                 :          4 :   }
#    1532                 :            : 
#    1533                 :         25 :   FileLock* lock;
#    1534                 :         25 :   const std::string lockname = LockFileName(dbname);
#    1535                 :         25 :   result = env->LockFile(lockname, &lock);
#    1536         [ +  - ]:         25 :   if (result.ok()) {
#    1537                 :         25 :     uint64_t number;
#    1538                 :         25 :     FileType type;
#    1539         [ +  + ]:        210 :     for (size_t i = 0; i < filenames.size(); i++) {
#    1540         [ +  + ]:        185 :       if (ParseFileName(filenames[i], &number, &type) &&
#    1541         [ +  + ]:        185 :           type != kDBLockFile) {  // Lock file will be deleted at end
#    1542                 :        110 :         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
#    1543 [ +  - ][ -  + ]:        110 :         if (result.ok() && !del.ok()) {
#    1544                 :          0 :           result = del;
#    1545                 :          0 :         }
#    1546                 :        110 :       }
#    1547                 :        185 :     }
#    1548                 :         25 :     env->UnlockFile(lock);  // Ignore error since state is already gone
#    1549                 :         25 :     env->DeleteFile(lockname);
#    1550                 :         25 :     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
#    1551                 :         25 :   }
#    1552                 :         25 :   return result;
#    1553                 :         29 : }
#    1554                 :            : 
#    1555                 :            : }  // namespace leveldb

Generated by: LCOV version 0-eol-96201-ge66f56f4af6a