Branch data Line data Source code
# 1 : : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
# 2 : : // Use of this source code is governed by a BSD-style license that can be
# 3 : : // found in the LICENSE file. See the AUTHORS file for names of contributors.
# 4 : :
# 5 : : #include "db/db_impl.h"
# 6 : :
# 7 : : #include <stdint.h>
# 8 : : #include <stdio.h>
# 9 : :
# 10 : : #include <algorithm>
# 11 : : #include <atomic>
# 12 : : #include <set>
# 13 : : #include <string>
# 14 : : #include <vector>
# 15 : :
# 16 : : #include "db/builder.h"
# 17 : : #include "db/db_iter.h"
# 18 : : #include "db/dbformat.h"
# 19 : : #include "db/filename.h"
# 20 : : #include "db/log_reader.h"
# 21 : : #include "db/log_writer.h"
# 22 : : #include "db/memtable.h"
# 23 : : #include "db/table_cache.h"
# 24 : : #include "db/version_set.h"
# 25 : : #include "db/write_batch_internal.h"
# 26 : : #include "leveldb/db.h"
# 27 : : #include "leveldb/env.h"
# 28 : : #include "leveldb/status.h"
# 29 : : #include "leveldb/table.h"
# 30 : : #include "leveldb/table_builder.h"
# 31 : : #include "port/port.h"
# 32 : : #include "table/block.h"
# 33 : : #include "table/merger.h"
# 34 : : #include "table/two_level_iterator.h"
# 35 : : #include "util/coding.h"
# 36 : : #include "util/logging.h"
# 37 : : #include "util/mutexlock.h"
# 38 : :
# 39 : : namespace leveldb {
# 40 : :
# 41 : : const int kNumNonTableCacheFiles = 10;
# 42 : :
# 43 : : // Information kept for every waiting writer
# 44 : : struct DBImpl::Writer {
# 45 : : explicit Writer(port::Mutex* mu)
# 46 : 12725 : : batch(nullptr), sync(false), done(false), cv(mu) {}
# 47 : :
# 48 : : Status status;
# 49 : : WriteBatch* batch;
# 50 : : bool sync;
# 51 : : bool done;
# 52 : : port::CondVar cv;
# 53 : : };
# 54 : :
# 55 : : struct DBImpl::CompactionState {
# 56 : : // Files produced by compaction
# 57 : : struct Output {
# 58 : : uint64_t number;
# 59 : : uint64_t file_size;
# 60 : : InternalKey smallest, largest;
# 61 : : };
# 62 : :
# 63 : 15546 : Output* current_output() { return &outputs[outputs.size() - 1]; }
# 64 : :
# 65 : : explicit CompactionState(Compaction* c)
# 66 : : : compaction(c),
# 67 : : smallest_snapshot(0),
# 68 : : outfile(nullptr),
# 69 : : builder(nullptr),
# 70 : 71 : total_bytes(0) {}
# 71 : :
# 72 : : Compaction* const compaction;
# 73 : :
# 74 : : // Sequence numbers < smallest_snapshot are not significant since we
# 75 : : // will never have to service a snapshot below smallest_snapshot.
# 76 : : // Therefore if we have seen a sequence number S <= smallest_snapshot,
# 77 : : // we can drop all entries for the same key with sequence numbers < S.
# 78 : : SequenceNumber smallest_snapshot;
# 79 : :
# 80 : : std::vector<Output> outputs;
# 81 : :
# 82 : : // State kept for output being generated
# 83 : : WritableFile* outfile;
# 84 : : TableBuilder* builder;
# 85 : :
# 86 : : uint64_t total_bytes;
# 87 : : };
# 88 : :
# 89 : : // Fix user-supplied options to be reasonable
# 90 : : template <class T, class V>
# 91 : 6564 : static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
# 92 [ - + ][ - + ]: 6564 : if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
# 93 [ + + ][ - + ]: 6564 : if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
# 94 : 6564 : }
# 95 : : Options SanitizeOptions(const std::string& dbname,
# 96 : : const InternalKeyComparator* icmp,
# 97 : : const InternalFilterPolicy* ipolicy,
# 98 : 1641 : const Options& src) {
# 99 : 1641 : Options result = src;
# 100 : 1641 : result.comparator = icmp;
# 101 [ + - ]: 1641 : result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
# 102 : 1641 : ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
# 103 : 1641 : ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
# 104 : 1641 : ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
# 105 : 1641 : ClipToRange(&result.block_size, 1 << 10, 4 << 20);
# 106 [ - + ]: 1641 : if (result.info_log == nullptr) {
# 107 : : // Open a log file in the same directory as the db
# 108 : 0 : src.env->CreateDir(dbname); // In case it does not exist
# 109 : 0 : src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
# 110 : 0 : Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
# 111 [ # # ]: 0 : if (!s.ok()) {
# 112 : : // No place suitable for logging
# 113 : 0 : result.info_log = nullptr;
# 114 : 0 : }
# 115 : 0 : }
# 116 [ - + ]: 1641 : if (result.block_cache == nullptr) {
# 117 : 0 : result.block_cache = NewLRUCache(8 << 20);
# 118 : 0 : }
# 119 : 1641 : return result;
# 120 : 1641 : }
# 121 : :
# 122 : 1641 : static int TableCacheSize(const Options& sanitized_options) {
# 123 : : // Reserve ten files or so for other uses and give the rest to TableCache.
# 124 : 1641 : return sanitized_options.max_open_files - kNumNonTableCacheFiles;
# 125 : 1641 : }
# 126 : :
# 127 : : DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
# 128 : : : env_(raw_options.env),
# 129 : : internal_comparator_(raw_options.comparator),
# 130 : : internal_filter_policy_(raw_options.filter_policy),
# 131 : : options_(SanitizeOptions(dbname, &internal_comparator_,
# 132 : : &internal_filter_policy_, raw_options)),
# 133 : : owns_info_log_(options_.info_log != raw_options.info_log),
# 134 : : owns_cache_(options_.block_cache != raw_options.block_cache),
# 135 : : dbname_(dbname),
# 136 : : table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
# 137 : : db_lock_(nullptr),
# 138 : : shutting_down_(false),
# 139 : : background_work_finished_signal_(&mutex_),
# 140 : : mem_(nullptr),
# 141 : : imm_(nullptr),
# 142 : : has_imm_(false),
# 143 : : logfile_(nullptr),
# 144 : : logfile_number_(0),
# 145 : : log_(nullptr),
# 146 : : seed_(0),
# 147 : : tmp_batch_(new WriteBatch),
# 148 : : background_compaction_scheduled_(false),
# 149 : : manual_compaction_(nullptr),
# 150 : : versions_(new VersionSet(dbname_, &options_, table_cache_,
# 151 : 1641 : &internal_comparator_)) {}
# 152 : :
# 153 : 1641 : DBImpl::~DBImpl() {
# 154 : : // Wait for background work to finish.
# 155 : 1641 : mutex_.Lock();
# 156 : 1641 : shutting_down_.store(true, std::memory_order_release);
# 157 [ + + ]: 1642 : while (background_compaction_scheduled_) {
# 158 : 1 : background_work_finished_signal_.Wait();
# 159 : 1 : }
# 160 : 1641 : mutex_.Unlock();
# 161 : :
# 162 [ + - ]: 1641 : if (db_lock_ != nullptr) {
# 163 : 1641 : env_->UnlockFile(db_lock_);
# 164 : 1641 : }
# 165 : :
# 166 : 1641 : delete versions_;
# 167 [ + - ]: 1641 : if (mem_ != nullptr) mem_->Unref();
# 168 [ + + ]: 1641 : if (imm_ != nullptr) imm_->Unref();
# 169 : 1641 : delete tmp_batch_;
# 170 : 1641 : delete log_;
# 171 : 1641 : delete logfile_;
# 172 : 1641 : delete table_cache_;
# 173 : :
# 174 [ - + ]: 1641 : if (owns_info_log_) {
# 175 : 0 : delete options_.info_log;
# 176 : 0 : }
# 177 [ - + ]: 1641 : if (owns_cache_) {
# 178 : 0 : delete options_.block_cache;
# 179 : 0 : }
# 180 : 1641 : }
# 181 : :
# 182 : 879 : Status DBImpl::NewDB() {
# 183 : 879 : VersionEdit new_db;
# 184 : 879 : new_db.SetComparatorName(user_comparator()->Name());
# 185 : 879 : new_db.SetLogNumber(0);
# 186 : 879 : new_db.SetNextFile(2);
# 187 : 879 : new_db.SetLastSequence(0);
# 188 : :
# 189 : 879 : const std::string manifest = DescriptorFileName(dbname_, 1);
# 190 : 879 : WritableFile* file;
# 191 : 879 : Status s = env_->NewWritableFile(manifest, &file);
# 192 [ - + ]: 879 : if (!s.ok()) {
# 193 : 0 : return s;
# 194 : 0 : }
# 195 : 879 : {
# 196 : 879 : log::Writer log(file);
# 197 : 879 : std::string record;
# 198 : 879 : new_db.EncodeTo(&record);
# 199 : 879 : s = log.AddRecord(record);
# 200 [ + - ]: 879 : if (s.ok()) {
# 201 : 879 : s = file->Close();
# 202 : 879 : }
# 203 : 879 : }
# 204 : 879 : delete file;
# 205 [ + - ]: 879 : if (s.ok()) {
# 206 : : // Make "CURRENT" file that points to the new manifest file.
# 207 : 879 : s = SetCurrentFile(env_, dbname_, 1);
# 208 : 879 : } else {
# 209 : 0 : env_->DeleteFile(manifest);
# 210 : 0 : }
# 211 : 879 : return s;
# 212 : 879 : }
# 213 : :
# 214 : 2545 : void DBImpl::MaybeIgnoreError(Status* s) const {
# 215 [ + - ][ # # ]: 2545 : if (s->ok() || options_.paranoid_checks) {
# 216 : : // No change needed
# 217 : 2545 : } else {
# 218 : 0 : Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
# 219 : 0 : *s = Status::OK();
# 220 : 0 : }
# 221 : 2545 : }
# 222 : :
# 223 : 1720 : void DBImpl::DeleteObsoleteFiles() {
# 224 : 1720 : mutex_.AssertHeld();
# 225 : :
# 226 [ - + ]: 1720 : if (!bg_error_.ok()) {
# 227 : : // After a background error, we don't know whether a new version may
# 228 : : // or may not have been committed, so we cannot safely garbage collect.
# 229 : 0 : return;
# 230 : 0 : }
# 231 : :
# 232 : : // Make a set of all of the live files
# 233 : 1720 : std::set<uint64_t> live = pending_outputs_;
# 234 : 1720 : versions_->AddLiveFiles(&live);
# 235 : :
# 236 : 1720 : std::vector<std::string> filenames;
# 237 : 1720 : env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
# 238 : 1720 : uint64_t number;
# 239 : 1720 : FileType type;
# 240 : 1720 : std::vector<std::string> files_to_delete;
# 241 [ + + ]: 13362 : for (std::string& filename : filenames) {
# 242 [ + + ]: 13362 : if (ParseFileName(filename, &number, &type)) {
# 243 : 10636 : bool keep = true;
# 244 [ - + ]: 10636 : switch (type) {
# 245 [ + + ]: 2490 : case kLogFile:
# 246 [ + + ]: 2490 : keep = ((number >= versions_->LogNumber()) ||
# 247 [ - + ]: 2490 : (number == versions_->PrevLogNumber()));
# 248 : 2490 : break;
# 249 [ + + ]: 3361 : case kDescriptorFile:
# 250 : : // Keep my manifest file, and any newer incarnations'
# 251 : : // (in case there is a race that allows other incarnations)
# 252 : 3361 : keep = (number >= versions_->ManifestFileNumber());
# 253 : 3361 : break;
# 254 [ + + ]: 1702 : case kTableFile:
# 255 : 1702 : keep = (live.find(number) != live.end());
# 256 : 1702 : break;
# 257 [ - + ]: 0 : case kTempFile:
# 258 : : // Any temp files that are currently being written to must
# 259 : : // be recorded in pending_outputs_, which is inserted into "live"
# 260 : 0 : keep = (live.find(number) != live.end());
# 261 : 0 : break;
# 262 [ + + ]: 1720 : case kCurrentFile:
# 263 [ + + ]: 3083 : case kDBLockFile:
# 264 [ - + ]: 3083 : case kInfoLogFile:
# 265 : 3083 : keep = true;
# 266 : 3083 : break;
# 267 : 10636 : }
# 268 : :
# 269 [ + + ]: 10636 : if (!keep) {
# 270 : 2678 : files_to_delete.push_back(std::move(filename));
# 271 [ + + ]: 2678 : if (type == kTableFile) {
# 272 : 267 : table_cache_->Evict(number);
# 273 : 267 : }
# 274 : 2678 : Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
# 275 : 2678 : static_cast<unsigned long long>(number));
# 276 : 2678 : }
# 277 : 10636 : }
# 278 : 13362 : }
# 279 : :
# 280 : : // While deleting all files unblock other threads. All files being deleted
# 281 : : // have unique names which will not collide with newly created files and
# 282 : : // are therefore safe to delete while allowing other threads to proceed.
# 283 : 1720 : mutex_.Unlock();
# 284 [ + + ]: 2678 : for (const std::string& filename : files_to_delete) {
# 285 : 2678 : env_->DeleteFile(dbname_ + "/" + filename);
# 286 : 2678 : }
# 287 : 1720 : mutex_.Lock();
# 288 : 1720 : }
# 289 : :
# 290 : 1641 : Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
# 291 : 1641 : mutex_.AssertHeld();
# 292 : :
# 293 : : // Ignore error from CreateDir since the creation of the DB is
# 294 : : // committed only when the descriptor is created, and this directory
# 295 : : // may already exist from a previous failed creation attempt.
# 296 : 1641 : env_->CreateDir(dbname_);
# 297 : 1641 : assert(db_lock_ == nullptr);
# 298 : 1641 : Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
# 299 [ - + ]: 1641 : if (!s.ok()) {
# 300 : 0 : return s;
# 301 : 0 : }
# 302 : :
# 303 [ + + ]: 1641 : if (!env_->FileExists(CurrentFileName(dbname_))) {
# 304 [ + - ]: 879 : if (options_.create_if_missing) {
# 305 : 879 : s = NewDB();
# 306 [ - + ]: 879 : if (!s.ok()) {
# 307 : 0 : return s;
# 308 : 0 : }
# 309 : 0 : } else {
# 310 : 0 : return Status::InvalidArgument(
# 311 : 0 : dbname_, "does not exist (create_if_missing is false)");
# 312 : 0 : }
# 313 : 762 : } else {
# 314 [ - + ]: 762 : if (options_.error_if_exists) {
# 315 : 0 : return Status::InvalidArgument(dbname_,
# 316 : 0 : "exists (error_if_exists is true)");
# 317 : 0 : }
# 318 : 1641 : }
# 319 : :
# 320 : 1641 : s = versions_->Recover(save_manifest);
# 321 [ - + ]: 1641 : if (!s.ok()) {
# 322 : 0 : return s;
# 323 : 0 : }
# 324 : 1641 : SequenceNumber max_sequence(0);
# 325 : :
# 326 : : // Recover from all newer log files than the ones named in the
# 327 : : // descriptor (new log files may have been added by the previous
# 328 : : // incarnation without registering them in the descriptor).
# 329 : : //
# 330 : : // Note that PrevLogNumber() is no longer used, but we pay
# 331 : : // attention to it in case we are recovering a database
# 332 : : // produced by an older version of leveldb.
# 333 : 1641 : const uint64_t min_log = versions_->LogNumber();
# 334 : 1641 : const uint64_t prev_log = versions_->PrevLogNumber();
# 335 : 1641 : std::vector<std::string> filenames;
# 336 : 1641 : s = env_->GetChildren(dbname_, &filenames);
# 337 [ - + ]: 1641 : if (!s.ok()) {
# 338 : 0 : return s;
# 339 : 0 : }
# 340 : 1641 : std::set<uint64_t> expected;
# 341 : 1641 : versions_->AddLiveFiles(&expected);
# 342 : 1641 : uint64_t number;
# 343 : 1641 : FileType type;
# 344 : 1641 : std::vector<uint64_t> logs;
# 345 [ + + ]: 10133 : for (size_t i = 0; i < filenames.size(); i++) {
# 346 [ + + ]: 8492 : if (ParseFileName(filenames[i], &number, &type)) {
# 347 : 5920 : expected.erase(number);
# 348 [ + + ][ + - ]: 5920 : if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
# [ # # ]
# 349 : 762 : logs.push_back(number);
# 350 : 5920 : }
# 351 : 8492 : }
# 352 [ - + ]: 1641 : if (!expected.empty()) {
# 353 : 0 : char buf[50];
# 354 : 0 : snprintf(buf, sizeof(buf), "%d missing files; e.g.",
# 355 : 0 : static_cast<int>(expected.size()));
# 356 : 0 : return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
# 357 : 0 : }
# 358 : :
# 359 : : // Recover in the order in which the logs were generated
# 360 : 1641 : std::sort(logs.begin(), logs.end());
# 361 [ + + ]: 2403 : for (size_t i = 0; i < logs.size(); i++) {
# 362 : 762 : s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
# 363 : 762 : &max_sequence);
# 364 [ - + ]: 762 : if (!s.ok()) {
# 365 : 0 : return s;
# 366 : 0 : }
# 367 : :
# 368 : : // The previous incarnation may not have written any MANIFEST
# 369 : : // records after allocating this log number. So we manually
# 370 : : // update the file number allocation counter in VersionSet.
# 371 : 762 : versions_->MarkFileNumberUsed(logs[i]);
# 372 : 762 : }
# 373 : :
# 374 [ + + ]: 1641 : if (versions_->LastSequence() < max_sequence) {
# 375 : 762 : versions_->SetLastSequence(max_sequence);
# 376 : 762 : }
# 377 : :
# 378 : 1641 : return Status::OK();
# 379 : 1641 : }
# 380 : :
# 381 : : Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
# 382 : : bool* save_manifest, VersionEdit* edit,
# 383 : 762 : SequenceNumber* max_sequence) {
# 384 : 762 : struct LogReporter : public log::Reader::Reporter {
# 385 : 762 : Env* env;
# 386 : 762 : Logger* info_log;
# 387 : 762 : const char* fname;
# 388 : 762 : Status* status; // null if options_.paranoid_checks==false
# 389 : 762 : void Corruption(size_t bytes, const Status& s) override {
# 390 : 0 : Log(info_log, "%s%s: dropping %d bytes; %s",
# 391 [ # # ]: 0 : (this->status == nullptr ? "(ignoring error) " : ""), fname,
# 392 : 0 : static_cast<int>(bytes), s.ToString().c_str());
# 393 [ # # ][ # # ]: 0 : if (this->status != nullptr && this->status->ok()) *this->status = s;
# 394 : 0 : }
# 395 : 762 : };
# 396 : :
# 397 : 762 : mutex_.AssertHeld();
# 398 : :
# 399 : : // Open the log file
# 400 : 762 : std::string fname = LogFileName(dbname_, log_number);
# 401 : 762 : SequentialFile* file;
# 402 : 762 : Status status = env_->NewSequentialFile(fname, &file);
# 403 [ - + ]: 762 : if (!status.ok()) {
# 404 : 0 : MaybeIgnoreError(&status);
# 405 : 0 : return status;
# 406 : 0 : }
# 407 : :
# 408 : : // Create the log reader.
# 409 : 762 : LogReporter reporter;
# 410 : 762 : reporter.env = env_;
# 411 : 762 : reporter.info_log = options_.info_log;
# 412 : 762 : reporter.fname = fname.c_str();
# 413 [ + - ]: 762 : reporter.status = (options_.paranoid_checks ? &status : nullptr);
# 414 : : // We intentionally make log::Reader do checksumming even if
# 415 : : // paranoid_checks==false so that corruptions cause entire commits
# 416 : : // to be skipped instead of propagating bad information (like overly
# 417 : : // large sequence numbers).
# 418 : 762 : log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
# 419 : 762 : Log(options_.info_log, "Recovering log #%llu",
# 420 : 762 : (unsigned long long)log_number);
# 421 : :
# 422 : : // Read all the records and add to a memtable
# 423 : 762 : std::string scratch;
# 424 : 762 : Slice record;
# 425 : 762 : WriteBatch batch;
# 426 : 762 : int compactions = 0;
# 427 : 762 : MemTable* mem = nullptr;
# 428 [ + + ][ + - ]: 3307 : while (reader.ReadRecord(&record, &scratch) && status.ok()) {
# 429 [ - + ]: 2545 : if (record.size() < 12) {
# 430 : 0 : reporter.Corruption(record.size(),
# 431 : 0 : Status::Corruption("log record too small", fname));
# 432 : 0 : continue;
# 433 : 0 : }
# 434 : 2545 : WriteBatchInternal::SetContents(&batch, record);
# 435 : :
# 436 [ + + ]: 2545 : if (mem == nullptr) {
# 437 : 762 : mem = new MemTable(internal_comparator_);
# 438 : 762 : mem->Ref();
# 439 : 762 : }
# 440 : 2545 : status = WriteBatchInternal::InsertInto(&batch, mem);
# 441 : 2545 : MaybeIgnoreError(&status);
# 442 [ - + ]: 2545 : if (!status.ok()) {
# 443 : 0 : break;
# 444 : 0 : }
# 445 : 2545 : const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
# 446 : 2545 : WriteBatchInternal::Count(&batch) - 1;
# 447 [ + - ]: 2545 : if (last_seq > *max_sequence) {
# 448 : 2545 : *max_sequence = last_seq;
# 449 : 2545 : }
# 450 : :
# 451 [ - + ]: 2545 : if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
# 452 : 0 : compactions++;
# 453 : 0 : *save_manifest = true;
# 454 : 0 : status = WriteLevel0Table(mem, edit, nullptr);
# 455 : 0 : mem->Unref();
# 456 : 0 : mem = nullptr;
# 457 [ # # ]: 0 : if (!status.ok()) {
# 458 : : // Reflect errors immediately so that conditions like full
# 459 : : // file-systems cause the DB::Open() to fail.
# 460 : 0 : break;
# 461 : 0 : }
# 462 : 0 : }
# 463 : 2545 : }
# 464 : :
# 465 : 762 : delete file;
# 466 : :
# 467 : : // See if we should keep reusing the last log file.
# 468 [ + - ][ - + ]: 762 : if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
# [ # # ][ # # ]
# 469 : 0 : assert(logfile_ == nullptr);
# 470 : 0 : assert(log_ == nullptr);
# 471 : 0 : assert(mem_ == nullptr);
# 472 : 0 : uint64_t lfile_size;
# 473 [ # # ][ # # ]: 0 : if (env_->GetFileSize(fname, &lfile_size).ok() &&
# 474 [ # # ]: 0 : env_->NewAppendableFile(fname, &logfile_).ok()) {
# 475 : 0 : Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
# 476 : 0 : log_ = new log::Writer(logfile_, lfile_size);
# 477 : 0 : logfile_number_ = log_number;
# 478 [ # # ]: 0 : if (mem != nullptr) {
# 479 : 0 : mem_ = mem;
# 480 : 0 : mem = nullptr;
# 481 : 0 : } else {
# 482 : : // mem can be nullptr if lognum exists but was empty.
# 483 : 0 : mem_ = new MemTable(internal_comparator_);
# 484 : 0 : mem_->Ref();
# 485 : 0 : }
# 486 : 0 : }
# 487 : 0 : }
# 488 : :
# 489 [ + - ]: 762 : if (mem != nullptr) {
# 490 : : // mem did not get reused; compact it.
# 491 [ + - ]: 762 : if (status.ok()) {
# 492 : 762 : *save_manifest = true;
# 493 : 762 : status = WriteLevel0Table(mem, edit, nullptr);
# 494 : 762 : }
# 495 : 762 : mem->Unref();
# 496 : 762 : }
# 497 : :
# 498 : 762 : return status;
# 499 : 762 : }
# 500 : :
# 501 : : Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
# 502 : 771 : Version* base) {
# 503 : 771 : mutex_.AssertHeld();
# 504 : 771 : const uint64_t start_micros = env_->NowMicros();
# 505 : 771 : FileMetaData meta;
# 506 : 771 : meta.number = versions_->NewFileNumber();
# 507 : 771 : pending_outputs_.insert(meta.number);
# 508 : 771 : Iterator* iter = mem->NewIterator();
# 509 : 771 : Log(options_.info_log, "Level-0 table #%llu: started",
# 510 : 771 : (unsigned long long)meta.number);
# 511 : :
# 512 : 771 : Status s;
# 513 : 771 : {
# 514 : 771 : mutex_.Unlock();
# 515 : 771 : s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
# 516 : 771 : mutex_.Lock();
# 517 : 771 : }
# 518 : :
# 519 : 771 : Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
# 520 : 771 : (unsigned long long)meta.number, (unsigned long long)meta.file_size,
# 521 : 771 : s.ToString().c_str());
# 522 : 771 : delete iter;
# 523 : 771 : pending_outputs_.erase(meta.number);
# 524 : :
# 525 : : // Note that if file_size is zero, the file has been deleted and
# 526 : : // should not be added to the manifest.
# 527 : 771 : int level = 0;
# 528 [ + - ][ + - ]: 771 : if (s.ok() && meta.file_size > 0) {
# 529 : 771 : const Slice min_user_key = meta.smallest.user_key();
# 530 : 771 : const Slice max_user_key = meta.largest.user_key();
# 531 [ + + ]: 771 : if (base != nullptr) {
# 532 : 9 : level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
# 533 : 9 : }
# 534 : 771 : edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
# 535 : 771 : meta.largest);
# 536 : 771 : }
# 537 : :
# 538 : 771 : CompactionStats stats;
# 539 : 771 : stats.micros = env_->NowMicros() - start_micros;
# 540 : 771 : stats.bytes_written = meta.file_size;
# 541 : 771 : stats_[level].Add(stats);
# 542 : 771 : return s;
# 543 : 771 : }
# 544 : :
# 545 : 9 : void DBImpl::CompactMemTable() {
# 546 : 9 : mutex_.AssertHeld();
# 547 : 9 : assert(imm_ != nullptr);
# 548 : :
# 549 : : // Save the contents of the memtable as a new Table
# 550 : 9 : VersionEdit edit;
# 551 : 9 : Version* base = versions_->current();
# 552 : 9 : base->Ref();
# 553 : 9 : Status s = WriteLevel0Table(imm_, &edit, base);
# 554 : 9 : base->Unref();
# 555 : :
# 556 [ + - ][ + + ]: 9 : if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
# 557 : 1 : s = Status::IOError("Deleting DB during memtable compaction");
# 558 : 1 : }
# 559 : :
# 560 : : // Replace immutable memtable with the generated Table
# 561 [ + + ]: 9 : if (s.ok()) {
# 562 : 8 : edit.SetPrevLogNumber(0);
# 563 : 8 : edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
# 564 : 8 : s = versions_->LogAndApply(&edit, &mutex_);
# 565 : 8 : }
# 566 : :
# 567 [ + + ]: 9 : if (s.ok()) {
# 568 : : // Commit to the new state
# 569 : 8 : imm_->Unref();
# 570 : 8 : imm_ = nullptr;
# 571 : 8 : has_imm_.store(false, std::memory_order_release);
# 572 : 8 : DeleteObsoleteFiles();
# 573 : 8 : } else {
# 574 : 1 : RecordBackgroundError(s);
# 575 : 1 : }
# 576 : 9 : }
# 577 : :
# 578 : 0 : void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
# 579 : 0 : int max_level_with_files = 1;
# 580 : 0 : {
# 581 : 0 : MutexLock l(&mutex_);
# 582 : 0 : Version* base = versions_->current();
# 583 [ # # ]: 0 : for (int level = 1; level < config::kNumLevels; level++) {
# 584 [ # # ]: 0 : if (base->OverlapInLevel(level, begin, end)) {
# 585 : 0 : max_level_with_files = level;
# 586 : 0 : }
# 587 : 0 : }
# 588 : 0 : }
# 589 : 0 : TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
# 590 [ # # ]: 0 : for (int level = 0; level < max_level_with_files; level++) {
# 591 : 0 : TEST_CompactRange(level, begin, end);
# 592 : 0 : }
# 593 : 0 : }
# 594 : :
# 595 : : void DBImpl::TEST_CompactRange(int level, const Slice* begin,
# 596 : 0 : const Slice* end) {
# 597 : 0 : assert(level >= 0);
# 598 : 0 : assert(level + 1 < config::kNumLevels);
# 599 : :
# 600 : 0 : InternalKey begin_storage, end_storage;
# 601 : :
# 602 : 0 : ManualCompaction manual;
# 603 : 0 : manual.level = level;
# 604 : 0 : manual.done = false;
# 605 [ # # ]: 0 : if (begin == nullptr) {
# 606 : 0 : manual.begin = nullptr;
# 607 : 0 : } else {
# 608 : 0 : begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
# 609 : 0 : manual.begin = &begin_storage;
# 610 : 0 : }
# 611 [ # # ]: 0 : if (end == nullptr) {
# 612 : 0 : manual.end = nullptr;
# 613 : 0 : } else {
# 614 : 0 : end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
# 615 : 0 : manual.end = &end_storage;
# 616 : 0 : }
# 617 : :
# 618 : 0 : MutexLock l(&mutex_);
# 619 [ # # ][ # # ]: 0 : while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
# 620 [ # # ]: 0 : bg_error_.ok()) {
# 621 [ # # ]: 0 : if (manual_compaction_ == nullptr) { // Idle
# 622 : 0 : manual_compaction_ = &manual;
# 623 : 0 : MaybeScheduleCompaction();
# 624 : 0 : } else { // Running either my compaction or another compaction.
# 625 : 0 : background_work_finished_signal_.Wait();
# 626 : 0 : }
# 627 : 0 : }
# 628 [ # # ]: 0 : if (manual_compaction_ == &manual) {
# 629 : : // Cancel my manual compaction since we aborted early for some reason.
# 630 : 0 : manual_compaction_ = nullptr;
# 631 : 0 : }
# 632 : 0 : }
# 633 : :
# 634 : 0 : Status DBImpl::TEST_CompactMemTable() {
# 635 : : // nullptr batch means just wait for earlier writes to be done
# 636 : 0 : Status s = Write(WriteOptions(), nullptr);
# 637 [ # # ]: 0 : if (s.ok()) {
# 638 : : // Wait until the compaction completes
# 639 : 0 : MutexLock l(&mutex_);
# 640 [ # # ][ # # ]: 0 : while (imm_ != nullptr && bg_error_.ok()) {
# 641 : 0 : background_work_finished_signal_.Wait();
# 642 : 0 : }
# 643 [ # # ]: 0 : if (imm_ != nullptr) {
# 644 : 0 : s = bg_error_;
# 645 : 0 : }
# 646 : 0 : }
# 647 : 0 : return s;
# 648 : 0 : }
# 649 : :
# 650 : 1 : void DBImpl::RecordBackgroundError(const Status& s) {
# 651 : 1 : mutex_.AssertHeld();
# 652 [ + - ]: 1 : if (bg_error_.ok()) {
# 653 : 1 : bg_error_ = s;
# 654 : 1 : background_work_finished_signal_.SignalAll();
# 655 : 1 : }
# 656 : 1 : }
# 657 : :
# 658 : 1748 : void DBImpl::MaybeScheduleCompaction() {
# 659 : 1748 : mutex_.AssertHeld();
# 660 [ - + ]: 1748 : if (background_compaction_scheduled_) {
# 661 : : // Already scheduled
# 662 [ + + ]: 1748 : } else if (shutting_down_.load(std::memory_order_acquire)) {
# 663 : : // DB is being deleted; no more background compactions
# 664 [ - + ]: 1747 : } else if (!bg_error_.ok()) {
# 665 : : // Already got an error; no more changes
# 666 [ + + ][ + - ]: 1747 : } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
# 667 [ + + ]: 1747 : !versions_->NeedsCompaction()) {
# 668 : : // No work to be done
# 669 : 1667 : } else {
# 670 : 80 : background_compaction_scheduled_ = true;
# 671 : 80 : env_->Schedule(&DBImpl::BGWork, this);
# 672 : 80 : }
# 673 : 1748 : }
# 674 : :
# 675 : 80 : void DBImpl::BGWork(void* db) {
# 676 : 80 : reinterpret_cast<DBImpl*>(db)->BackgroundCall();
# 677 : 80 : }
# 678 : :
# 679 : 80 : void DBImpl::BackgroundCall() {
# 680 : 80 : MutexLock l(&mutex_);
# 681 : 80 : assert(background_compaction_scheduled_);
# 682 [ - + ]: 80 : if (shutting_down_.load(std::memory_order_acquire)) {
# 683 : : // No more background work when shutting down.
# 684 [ - + ]: 80 : } else if (!bg_error_.ok()) {
# 685 : : // No more background work after a background error.
# 686 : 80 : } else {
# 687 : 80 : BackgroundCompaction();
# 688 : 80 : }
# 689 : :
# 690 : 80 : background_compaction_scheduled_ = false;
# 691 : :
# 692 : : // Previous compaction may have produced too many files in a level,
# 693 : : // so reschedule another compaction if needed.
# 694 : 80 : MaybeScheduleCompaction();
# 695 : 80 : background_work_finished_signal_.SignalAll();
# 696 : 80 : }
# 697 : :
# 698 : 80 : void DBImpl::BackgroundCompaction() {
# 699 : 80 : mutex_.AssertHeld();
# 700 : :
# 701 [ + + ]: 80 : if (imm_ != nullptr) {
# 702 : 9 : CompactMemTable();
# 703 : 9 : return;
# 704 : 9 : }
# 705 : :
# 706 : 71 : Compaction* c;
# 707 : 71 : bool is_manual = (manual_compaction_ != nullptr);
# 708 : 71 : InternalKey manual_end;
# 709 [ - + ]: 71 : if (is_manual) {
# 710 : 0 : ManualCompaction* m = manual_compaction_;
# 711 : 0 : c = versions_->CompactRange(m->level, m->begin, m->end);
# 712 : 0 : m->done = (c == nullptr);
# 713 [ # # ]: 0 : if (c != nullptr) {
# 714 : 0 : manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
# 715 : 0 : }
# 716 : 0 : Log(options_.info_log,
# 717 : 0 : "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
# 718 [ # # ]: 0 : m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
# 719 [ # # ]: 0 : (m->end ? m->end->DebugString().c_str() : "(end)"),
# 720 [ # # ]: 0 : (m->done ? "(end)" : manual_end.DebugString().c_str()));
# 721 : 71 : } else {
# 722 : 71 : c = versions_->PickCompaction();
# 723 : 71 : }
# 724 : :
# 725 : 71 : Status status;
# 726 [ - + ]: 71 : if (c == nullptr) {
# 727 : : // Nothing to do
# 728 [ + - ][ - + ]: 71 : } else if (!is_manual && c->IsTrivialMove()) {
# 729 : : // Move file to next level
# 730 : 0 : assert(c->num_input_files(0) == 1);
# 731 : 0 : FileMetaData* f = c->input(0, 0);
# 732 : 0 : c->edit()->DeleteFile(c->level(), f->number);
# 733 : 0 : c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
# 734 : 0 : f->largest);
# 735 : 0 : status = versions_->LogAndApply(c->edit(), &mutex_);
# 736 [ # # ]: 0 : if (!status.ok()) {
# 737 : 0 : RecordBackgroundError(status);
# 738 : 0 : }
# 739 : 0 : VersionSet::LevelSummaryStorage tmp;
# 740 : 0 : Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
# 741 : 0 : static_cast<unsigned long long>(f->number), c->level() + 1,
# 742 : 0 : static_cast<unsigned long long>(f->file_size),
# 743 : 0 : status.ToString().c_str(), versions_->LevelSummary(&tmp));
# 744 : 71 : } else {
# 745 : 71 : CompactionState* compact = new CompactionState(c);
# 746 : 71 : status = DoCompactionWork(compact);
# 747 [ - + ]: 71 : if (!status.ok()) {
# 748 : 0 : RecordBackgroundError(status);
# 749 : 0 : }
# 750 : 71 : CleanupCompaction(compact);
# 751 : 71 : c->ReleaseInputs();
# 752 : 71 : DeleteObsoleteFiles();
# 753 : 71 : }
# 754 : 71 : delete c;
# 755 : :
# 756 [ + - ]: 71 : if (status.ok()) {
# 757 : : // Done
# 758 [ # # ]: 71 : } else if (shutting_down_.load(std::memory_order_acquire)) {
# 759 : : // Ignore compaction errors found during shutting down
# 760 : 0 : } else {
# 761 : 0 : Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
# 762 : 0 : }
# 763 : :
# 764 [ - + ]: 71 : if (is_manual) {
# 765 : 0 : ManualCompaction* m = manual_compaction_;
# 766 [ # # ]: 0 : if (!status.ok()) {
# 767 : 0 : m->done = true;
# 768 : 0 : }
# 769 [ # # ]: 0 : if (!m->done) {
# 770 : : // We only compacted part of the requested range. Update *m
# 771 : : // to the range that is left to be compacted.
# 772 : 0 : m->tmp_storage = manual_end;
# 773 : 0 : m->begin = &m->tmp_storage;
# 774 : 0 : }
# 775 : 0 : manual_compaction_ = nullptr;
# 776 : 0 : }
# 777 : 71 : }
# 778 : :
# 779 : 71 : void DBImpl::CleanupCompaction(CompactionState* compact) {
# 780 : 71 : mutex_.AssertHeld();
# 781 [ - + ]: 71 : if (compact->builder != nullptr) {
# 782 : : // May happen if we get a shutdown call in the middle of compaction
# 783 : 0 : compact->builder->Abandon();
# 784 : 0 : delete compact->builder;
# 785 : 71 : } else {
# 786 : 71 : assert(compact->outfile == nullptr);
# 787 : 71 : }
# 788 : 71 : delete compact->outfile;
# 789 [ + + ]: 142 : for (size_t i = 0; i < compact->outputs.size(); i++) {
# 790 : 71 : const CompactionState::Output& out = compact->outputs[i];
# 791 : 71 : pending_outputs_.erase(out.number);
# 792 : 71 : }
# 793 : 71 : delete compact;
# 794 : 71 : }
# 795 : :
# 796 : 71 : Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
# 797 : 71 : assert(compact != nullptr);
# 798 : 71 : assert(compact->builder == nullptr);
# 799 : 71 : uint64_t file_number;
# 800 : 71 : {
# 801 : 71 : mutex_.Lock();
# 802 : 71 : file_number = versions_->NewFileNumber();
# 803 : 71 : pending_outputs_.insert(file_number);
# 804 : 71 : CompactionState::Output out;
# 805 : 71 : out.number = file_number;
# 806 : 71 : out.smallest.Clear();
# 807 : 71 : out.largest.Clear();
# 808 : 71 : compact->outputs.push_back(out);
# 809 : 71 : mutex_.Unlock();
# 810 : 71 : }
# 811 : :
# 812 : : // Make the output file
# 813 : 71 : std::string fname = TableFileName(dbname_, file_number);
# 814 : 71 : Status s = env_->NewWritableFile(fname, &compact->outfile);
# 815 [ + - ]: 71 : if (s.ok()) {
# 816 : 71 : compact->builder = new TableBuilder(options_, compact->outfile);
# 817 : 71 : }
# 818 : 71 : return s;
# 819 : 71 : }
# 820 : :
# 821 : : Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
# 822 : 71 : Iterator* input) {
# 823 : 71 : assert(compact != nullptr);
# 824 : 71 : assert(compact->outfile != nullptr);
# 825 : 71 : assert(compact->builder != nullptr);
# 826 : :
# 827 : 71 : const uint64_t output_number = compact->current_output()->number;
# 828 : 71 : assert(output_number != 0);
# 829 : :
# 830 : : // Check for iterator errors
# 831 : 71 : Status s = input->status();
# 832 : 71 : const uint64_t current_entries = compact->builder->NumEntries();
# 833 [ + - ]: 71 : if (s.ok()) {
# 834 : 71 : s = compact->builder->Finish();
# 835 : 71 : } else {
# 836 : 0 : compact->builder->Abandon();
# 837 : 0 : }
# 838 : 71 : const uint64_t current_bytes = compact->builder->FileSize();
# 839 : 71 : compact->current_output()->file_size = current_bytes;
# 840 : 71 : compact->total_bytes += current_bytes;
# 841 : 71 : delete compact->builder;
# 842 : 71 : compact->builder = nullptr;
# 843 : :
# 844 : : // Finish and check for file errors
# 845 [ + - ]: 71 : if (s.ok()) {
# 846 : 71 : s = compact->outfile->Sync();
# 847 : 71 : }
# 848 [ + - ]: 71 : if (s.ok()) {
# 849 : 71 : s = compact->outfile->Close();
# 850 : 71 : }
# 851 : 71 : delete compact->outfile;
# 852 : 71 : compact->outfile = nullptr;
# 853 : :
# 854 [ + - ][ + - ]: 71 : if (s.ok() && current_entries > 0) {
# 855 : : // Verify that the table is usable
# 856 : 71 : Iterator* iter =
# 857 : 71 : table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
# 858 : 71 : s = iter->status();
# 859 : 71 : delete iter;
# 860 [ + - ]: 71 : if (s.ok()) {
# 861 : 71 : Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
# 862 : 71 : (unsigned long long)output_number, compact->compaction->level(),
# 863 : 71 : (unsigned long long)current_entries,
# 864 : 71 : (unsigned long long)current_bytes);
# 865 : 71 : }
# 866 : 71 : }
# 867 : 71 : return s;
# 868 : 71 : }
# 869 : :
# 870 : 71 : Status DBImpl::InstallCompactionResults(CompactionState* compact) {
# 871 : 71 : mutex_.AssertHeld();
# 872 : 71 : Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
# 873 : 71 : compact->compaction->num_input_files(0), compact->compaction->level(),
# 874 : 71 : compact->compaction->num_input_files(1), compact->compaction->level() + 1,
# 875 : 71 : static_cast<long long>(compact->total_bytes));
# 876 : :
# 877 : : // Add compaction outputs
# 878 : 71 : compact->compaction->AddInputDeletions(compact->compaction->edit());
# 879 : 71 : const int level = compact->compaction->level();
# 880 [ + + ]: 142 : for (size_t i = 0; i < compact->outputs.size(); i++) {
# 881 : 71 : const CompactionState::Output& out = compact->outputs[i];
# 882 : 71 : compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
# 883 : 71 : out.smallest, out.largest);
# 884 : 71 : }
# 885 : 71 : return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
# 886 : 71 : }
# 887 : :
# 888 : 71 : Status DBImpl::DoCompactionWork(CompactionState* compact) {
# 889 : 71 : const uint64_t start_micros = env_->NowMicros();
# 890 : 71 : int64_t imm_micros = 0; // Micros spent doing imm_ compactions
# 891 : :
# 892 : 71 : Log(options_.info_log, "Compacting %d@%d + %d@%d files",
# 893 : 71 : compact->compaction->num_input_files(0), compact->compaction->level(),
# 894 : 71 : compact->compaction->num_input_files(1),
# 895 : 71 : compact->compaction->level() + 1);
# 896 : :
# 897 : 71 : assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
# 898 : 71 : assert(compact->builder == nullptr);
# 899 : 71 : assert(compact->outfile == nullptr);
# 900 [ + - ]: 71 : if (snapshots_.empty()) {
# 901 : 71 : compact->smallest_snapshot = versions_->LastSequence();
# 902 : 71 : } else {
# 903 : 0 : compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
# 904 : 0 : }
# 905 : :
# 906 : 71 : Iterator* input = versions_->MakeInputIterator(compact->compaction);
# 907 : :
# 908 : : // Release mutex while we're actually doing the compaction work
# 909 : 71 : mutex_.Unlock();
# 910 : :
# 911 : 71 : input->SeekToFirst();
# 912 : 71 : Status status;
# 913 : 71 : ParsedInternalKey ikey;
# 914 : 71 : std::string current_user_key;
# 915 : 71 : bool has_current_user_key = false;
# 916 : 71 : SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
# 917 [ + + ][ + - ]: 17643 : while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
# 918 : : // Prioritize immutable compaction work
# 919 [ - + ]: 17572 : if (has_imm_.load(std::memory_order_relaxed)) {
# 920 : 0 : const uint64_t imm_start = env_->NowMicros();
# 921 : 0 : mutex_.Lock();
# 922 [ # # ]: 0 : if (imm_ != nullptr) {
# 923 : 0 : CompactMemTable();
# 924 : : // Wake up MakeRoomForWrite() if necessary.
# 925 : 0 : background_work_finished_signal_.SignalAll();
# 926 : 0 : }
# 927 : 0 : mutex_.Unlock();
# 928 : 0 : imm_micros += (env_->NowMicros() - imm_start);
# 929 : 0 : }
# 930 : :
# 931 : 17572 : Slice key = input->key();
# 932 [ - + ]: 17572 : if (compact->compaction->ShouldStopBefore(key) &&
# 933 [ # # ]: 17572 : compact->builder != nullptr) {
# 934 : 0 : status = FinishCompactionOutputFile(compact, input);
# 935 [ # # ]: 0 : if (!status.ok()) {
# 936 : 0 : break;
# 937 : 0 : }
# 938 : 17572 : }
# 939 : :
# 940 : : // Handle key/value, add to state, etc.
# 941 : 17572 : bool drop = false;
# 942 [ - + ]: 17572 : if (!ParseInternalKey(key, &ikey)) {
# 943 : : // Do not hide error keys
# 944 : 0 : current_user_key.clear();
# 945 : 0 : has_current_user_key = false;
# 946 : 0 : last_sequence_for_key = kMaxSequenceNumber;
# 947 : 17572 : } else {
# 948 [ + + ][ + + ]: 17572 : if (!has_current_user_key ||
# 949 [ + + ]: 17572 : user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
# 950 : 17501 : 0) {
# 951 : : // First occurrence of this user key
# 952 : 15489 : current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
# 953 : 15489 : has_current_user_key = true;
# 954 : 15489 : last_sequence_for_key = kMaxSequenceNumber;
# 955 : 15489 : }
# 956 : :
# 957 [ + + ]: 17572 : if (last_sequence_for_key <= compact->smallest_snapshot) {
# 958 : : // Hidden by an newer entry for same user key
# 959 : 2083 : drop = true; // (A)
# 960 [ + + ]: 15489 : } else if (ikey.type == kTypeDeletion &&
# 961 [ + - ]: 15489 : ikey.sequence <= compact->smallest_snapshot &&
# 962 [ + - ]: 15489 : compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
# 963 : : // For this user key:
# 964 : : // (1) there is no data in higher levels
# 965 : : // (2) data in lower levels will have larger sequence numbers
# 966 : : // (3) data in layers that are being compacted here and have
# 967 : : // smaller sequence numbers will be dropped in the next
# 968 : : // few iterations of this loop (by rule (A) above).
# 969 : : // Therefore this deletion marker is obsolete and can be dropped.
# 970 : 156 : drop = true;
# 971 : 156 : }
# 972 : :
# 973 : 17572 : last_sequence_for_key = ikey.sequence;
# 974 : 17572 : }
# 975 : : #if 0
# 976 : : Log(options_.info_log,
# 977 : : " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
# 978 : : "%d smallest_snapshot: %d",
# 979 : : ikey.user_key.ToString().c_str(),
# 980 : : (int)ikey.sequence, ikey.type, kTypeValue, drop,
# 981 : : compact->compaction->IsBaseLevelForKey(ikey.user_key),
# 982 : : (int)last_sequence_for_key, (int)compact->smallest_snapshot);
# 983 : : #endif
# 984 : :
# 985 [ + + ]: 17572 : if (!drop) {
# 986 : : // Open output file if necessary
# 987 [ + + ]: 15333 : if (compact->builder == nullptr) {
# 988 : 71 : status = OpenCompactionOutputFile(compact);
# 989 [ - + ]: 71 : if (!status.ok()) {
# 990 : 0 : break;
# 991 : 0 : }
# 992 : 15333 : }
# 993 [ + + ]: 15333 : if (compact->builder->NumEntries() == 0) {
# 994 : 71 : compact->current_output()->smallest.DecodeFrom(key);
# 995 : 71 : }
# 996 : 15333 : compact->current_output()->largest.DecodeFrom(key);
# 997 : 15333 : compact->builder->Add(key, input->value());
# 998 : :
# 999 : : // Close output file if it is big enough
# 1000 [ - + ]: 15333 : if (compact->builder->FileSize() >=
# 1001 : 15333 : compact->compaction->MaxOutputFileSize()) {
# 1002 : 0 : status = FinishCompactionOutputFile(compact, input);
# 1003 [ # # ]: 0 : if (!status.ok()) {
# 1004 : 0 : break;
# 1005 : 0 : }
# 1006 : 17572 : }
# 1007 : 15333 : }
# 1008 : :
# 1009 : 17572 : input->Next();
# 1010 : 17572 : }
# 1011 : :
# 1012 [ + - ][ - + ]: 71 : if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
# 1013 : 0 : status = Status::IOError("Deleting DB during compaction");
# 1014 : 0 : }
# 1015 [ + - ][ + - ]: 71 : if (status.ok() && compact->builder != nullptr) {
# 1016 : 71 : status = FinishCompactionOutputFile(compact, input);
# 1017 : 71 : }
# 1018 [ + - ]: 71 : if (status.ok()) {
# 1019 : 71 : status = input->status();
# 1020 : 71 : }
# 1021 : 71 : delete input;
# 1022 : 71 : input = nullptr;
# 1023 : :
# 1024 : 71 : CompactionStats stats;
# 1025 : 71 : stats.micros = env_->NowMicros() - start_micros - imm_micros;
# 1026 [ + + ]: 213 : for (int which = 0; which < 2; which++) {
# 1027 [ + + ]: 409 : for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
# 1028 : 267 : stats.bytes_read += compact->compaction->input(which, i)->file_size;
# 1029 : 267 : }
# 1030 : 142 : }
# 1031 [ + + ]: 142 : for (size_t i = 0; i < compact->outputs.size(); i++) {
# 1032 : 71 : stats.bytes_written += compact->outputs[i].file_size;
# 1033 : 71 : }
# 1034 : :
# 1035 : 71 : mutex_.Lock();
# 1036 : 71 : stats_[compact->compaction->level() + 1].Add(stats);
# 1037 : :
# 1038 [ + - ]: 71 : if (status.ok()) {
# 1039 : 71 : status = InstallCompactionResults(compact);
# 1040 : 71 : }
# 1041 [ - + ]: 71 : if (!status.ok()) {
# 1042 : 0 : RecordBackgroundError(status);
# 1043 : 0 : }
# 1044 : 71 : VersionSet::LevelSummaryStorage tmp;
# 1045 : 71 : Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
# 1046 : 71 : return status;
# 1047 : 71 : }
# 1048 : :
# 1049 : : namespace {
# 1050 : :
# 1051 : : struct IterState {
# 1052 : : port::Mutex* const mu;
# 1053 : : Version* const version GUARDED_BY(mu);
# 1054 : : MemTable* const mem GUARDED_BY(mu);
# 1055 : : MemTable* const imm GUARDED_BY(mu);
# 1056 : :
# 1057 : : IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
# 1058 : 2902 : : mu(mutex), version(version), mem(mem), imm(imm) {}
# 1059 : : };
# 1060 : :
# 1061 : 2902 : static void CleanupIteratorState(void* arg1, void* arg2) {
# 1062 : 2902 : IterState* state = reinterpret_cast<IterState*>(arg1);
# 1063 : 2902 : state->mu->Lock();
# 1064 : 2902 : state->mem->Unref();
# 1065 [ - + ]: 2902 : if (state->imm != nullptr) state->imm->Unref();
# 1066 : 2902 : state->version->Unref();
# 1067 : 2902 : state->mu->Unlock();
# 1068 : 2902 : delete state;
# 1069 : 2902 : }
# 1070 : :
# 1071 : : } // anonymous namespace
# 1072 : :
# 1073 : : Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
# 1074 : : SequenceNumber* latest_snapshot,
# 1075 : 2902 : uint32_t* seed) {
# 1076 : 2902 : mutex_.Lock();
# 1077 : 2902 : *latest_snapshot = versions_->LastSequence();
# 1078 : :
# 1079 : : // Collect together all needed child iterators
# 1080 : 2902 : std::vector<Iterator*> list;
# 1081 : 2902 : list.push_back(mem_->NewIterator());
# 1082 : 2902 : mem_->Ref();
# 1083 [ - + ]: 2902 : if (imm_ != nullptr) {
# 1084 : 0 : list.push_back(imm_->NewIterator());
# 1085 : 0 : imm_->Ref();
# 1086 : 0 : }
# 1087 : 2902 : versions_->current()->AddIterators(options, &list);
# 1088 : 2902 : Iterator* internal_iter =
# 1089 : 2902 : NewMergingIterator(&internal_comparator_, &list[0], list.size());
# 1090 : 2902 : versions_->current()->Ref();
# 1091 : :
# 1092 : 2902 : IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
# 1093 : 2902 : internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
# 1094 : :
# 1095 : 2902 : *seed = ++seed_;
# 1096 : 2902 : mutex_.Unlock();
# 1097 : 2902 : return internal_iter;
# 1098 : 2902 : }
# 1099 : :
# 1100 : 0 : Iterator* DBImpl::TEST_NewInternalIterator() {
# 1101 : 0 : SequenceNumber ignored;
# 1102 : 0 : uint32_t ignored_seed;
# 1103 : 0 : return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
# 1104 : 0 : }
# 1105 : :
# 1106 : 0 : int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
# 1107 : 0 : MutexLock l(&mutex_);
# 1108 : 0 : return versions_->MaxNextLevelOverlappingBytes();
# 1109 : 0 : }
# 1110 : :
# 1111 : : Status DBImpl::Get(const ReadOptions& options, const Slice& key,
# 1112 : 7879843 : std::string* value) {
# 1113 : 7879843 : Status s;
# 1114 : 7879843 : MutexLock l(&mutex_);
# 1115 : 7879843 : SequenceNumber snapshot;
# 1116 [ - + ]: 7879843 : if (options.snapshot != nullptr) {
# 1117 : 0 : snapshot =
# 1118 : 0 : static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
# 1119 : 7879843 : } else {
# 1120 : 7879843 : snapshot = versions_->LastSequence();
# 1121 : 7879843 : }
# 1122 : :
# 1123 : 7879843 : MemTable* mem = mem_;
# 1124 : 7879843 : MemTable* imm = imm_;
# 1125 : 7879843 : Version* current = versions_->current();
# 1126 : 7879843 : mem->Ref();
# 1127 [ + + ]: 7879843 : if (imm != nullptr) imm->Ref();
# 1128 : 7879843 : current->Ref();
# 1129 : :
# 1130 : 7879843 : bool have_stat_update = false;
# 1131 : 7879843 : Version::GetStats stats;
# 1132 : :
# 1133 : : // Unlock while reading from files and memtables
# 1134 : 7879843 : {
# 1135 : 7879843 : mutex_.Unlock();
# 1136 : : // First look in the memtable, then in the immutable memtable (if any).
# 1137 : 7879843 : LookupKey lkey(key, snapshot);
# 1138 [ + + ]: 7879843 : if (mem->Get(lkey, value, &s)) {
# 1139 : : // Done
# 1140 [ + + ][ + + ]: 7534498 : } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
# 1141 : : // Done
# 1142 : 7534196 : } else {
# 1143 : 7534196 : s = current->Get(options, lkey, value, &stats);
# 1144 : 7534196 : have_stat_update = true;
# 1145 : 7534196 : }
# 1146 : 7879843 : mutex_.Lock();
# 1147 : 7879843 : }
# 1148 : :
# 1149 [ + + ][ + + ]: 7879843 : if (have_stat_update && current->UpdateStats(stats)) {
# 1150 : 18 : MaybeScheduleCompaction();
# 1151 : 18 : }
# 1152 : 7879843 : mem->Unref();
# 1153 [ + + ]: 7879843 : if (imm != nullptr) imm->Unref();
# 1154 : 7879843 : current->Unref();
# 1155 : 7879843 : return s;
# 1156 : 7879843 : }
# 1157 : :
# 1158 : 2902 : Iterator* DBImpl::NewIterator(const ReadOptions& options) {
# 1159 : 2902 : SequenceNumber latest_snapshot;
# 1160 : 2902 : uint32_t seed;
# 1161 : 2902 : Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
# 1162 : 2902 : return NewDBIterator(this, user_comparator(), iter,
# 1163 [ - + ]: 2902 : (options.snapshot != nullptr
# 1164 : 2902 : ? static_cast<const SnapshotImpl*>(options.snapshot)
# 1165 : 0 : ->sequence_number()
# 1166 : 2902 : : latest_snapshot),
# 1167 : 2902 : seed);
# 1168 : 2902 : }
# 1169 : :
# 1170 : 268 : void DBImpl::RecordReadSample(Slice key) {
# 1171 : 268 : MutexLock l(&mutex_);
# 1172 [ - + ]: 268 : if (versions_->current()->RecordReadSample(key)) {
# 1173 : 0 : MaybeScheduleCompaction();
# 1174 : 0 : }
# 1175 : 268 : }
# 1176 : :
# 1177 : 0 : const Snapshot* DBImpl::GetSnapshot() {
# 1178 : 0 : MutexLock l(&mutex_);
# 1179 : 0 : return snapshots_.New(versions_->LastSequence());
# 1180 : 0 : }
# 1181 : :
# 1182 : 0 : void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
# 1183 : 0 : MutexLock l(&mutex_);
# 1184 : 0 : snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
# 1185 : 0 : }
# 1186 : :
# 1187 : : // Convenience methods
# 1188 : 0 : Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
# 1189 : 0 : return DB::Put(o, key, val);
# 1190 : 0 : }
# 1191 : :
# 1192 : 0 : Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
# 1193 : 0 : return DB::Delete(options, key);
# 1194 : 0 : }
# 1195 : :
# 1196 : 12725 : Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
# 1197 : 12725 : Writer w(&mutex_);
# 1198 : 12725 : w.batch = updates;
# 1199 : 12725 : w.sync = options.sync;
# 1200 : 12725 : w.done = false;
# 1201 : :
# 1202 : 12725 : MutexLock l(&mutex_);
# 1203 : 12725 : writers_.push_back(&w);
# 1204 [ + - ][ - + ]: 12725 : while (!w.done && &w != writers_.front()) {
# 1205 : 0 : w.cv.Wait();
# 1206 : 0 : }
# 1207 [ - + ]: 12725 : if (w.done) {
# 1208 : 0 : return w.status;
# 1209 : 0 : }
# 1210 : :
# 1211 : : // May temporarily unlock and wait.
# 1212 : 12725 : Status status = MakeRoomForWrite(updates == nullptr);
# 1213 : 12725 : uint64_t last_sequence = versions_->LastSequence();
# 1214 : 12725 : Writer* last_writer = &w;
# 1215 [ + - ][ + - ]: 12725 : if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
# 1216 : 12725 : WriteBatch* write_batch = BuildBatchGroup(&last_writer);
# 1217 : 12725 : WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
# 1218 : 12725 : last_sequence += WriteBatchInternal::Count(write_batch);
# 1219 : :
# 1220 : : // Add to log and apply to memtable. We can release the lock
# 1221 : : // during this phase since &w is currently responsible for logging
# 1222 : : // and protects against concurrent loggers and concurrent writes
# 1223 : : // into mem_.
# 1224 : 12725 : {
# 1225 : 12725 : mutex_.Unlock();
# 1226 : 12725 : status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
# 1227 : 12725 : bool sync_error = false;
# 1228 [ + - ][ + + ]: 12725 : if (status.ok() && options.sync) {
# 1229 : 1588 : status = logfile_->Sync();
# 1230 [ - + ]: 1588 : if (!status.ok()) {
# 1231 : 0 : sync_error = true;
# 1232 : 0 : }
# 1233 : 1588 : }
# 1234 [ + - ]: 12725 : if (status.ok()) {
# 1235 : 12725 : status = WriteBatchInternal::InsertInto(write_batch, mem_);
# 1236 : 12725 : }
# 1237 : 12725 : mutex_.Lock();
# 1238 [ - + ]: 12725 : if (sync_error) {
# 1239 : : // The state of the log file is indeterminate: the log record we
# 1240 : : // just added may or may not show up when the DB is re-opened.
# 1241 : : // So we force the DB into a mode where all future writes fail.
# 1242 : 0 : RecordBackgroundError(status);
# 1243 : 0 : }
# 1244 : 12725 : }
# 1245 [ - + ]: 12725 : if (write_batch == tmp_batch_) tmp_batch_->Clear();
# 1246 : :
# 1247 : 12725 : versions_->SetLastSequence(last_sequence);
# 1248 : 12725 : }
# 1249 : :
# 1250 : 12725 : while (true) {
# 1251 : 12725 : Writer* ready = writers_.front();
# 1252 : 12725 : writers_.pop_front();
# 1253 [ - + ]: 12725 : if (ready != &w) {
# 1254 : 0 : ready->status = status;
# 1255 : 0 : ready->done = true;
# 1256 : 0 : ready->cv.Signal();
# 1257 : 0 : }
# 1258 [ + - ]: 12725 : if (ready == last_writer) break;
# 1259 : 12725 : }
# 1260 : :
# 1261 : : // Notify new head of write queue
# 1262 [ - + ]: 12725 : if (!writers_.empty()) {
# 1263 : 0 : writers_.front()->cv.Signal();
# 1264 : 0 : }
# 1265 : :
# 1266 : 12725 : return status;
# 1267 : 12725 : }
# 1268 : :
# 1269 : : // REQUIRES: Writer list must be non-empty
# 1270 : : // REQUIRES: First writer must have a non-null batch
# 1271 : 12725 : WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
# 1272 : 12725 : mutex_.AssertHeld();
# 1273 : 12725 : assert(!writers_.empty());
# 1274 : 12725 : Writer* first = writers_.front();
# 1275 : 12725 : WriteBatch* result = first->batch;
# 1276 : 12725 : assert(result != nullptr);
# 1277 : :
# 1278 : 12725 : size_t size = WriteBatchInternal::ByteSize(first->batch);
# 1279 : :
# 1280 : : // Allow the group to grow up to a maximum size, but if the
# 1281 : : // original write is small, limit the growth so we do not slow
# 1282 : : // down the small write too much.
# 1283 : 12725 : size_t max_size = 1 << 20;
# 1284 [ + + ]: 12725 : if (size <= (128 << 10)) {
# 1285 : 12693 : max_size = size + (128 << 10);
# 1286 : 12693 : }
# 1287 : :
# 1288 : 12725 : *last_writer = first;
# 1289 : 12725 : std::deque<Writer*>::iterator iter = writers_.begin();
# 1290 : 12725 : ++iter; // Advance past "first"
# 1291 [ - + ]: 12725 : for (; iter != writers_.end(); ++iter) {
# 1292 : 0 : Writer* w = *iter;
# 1293 [ # # ][ # # ]: 0 : if (w->sync && !first->sync) {
# 1294 : : // Do not include a sync write into a batch handled by a non-sync write.
# 1295 : 0 : break;
# 1296 : 0 : }
# 1297 : :
# 1298 [ # # ]: 0 : if (w->batch != nullptr) {
# 1299 : 0 : size += WriteBatchInternal::ByteSize(w->batch);
# 1300 [ # # ]: 0 : if (size > max_size) {
# 1301 : : // Do not make batch too big
# 1302 : 0 : break;
# 1303 : 0 : }
# 1304 : :
# 1305 : : // Append to *result
# 1306 [ # # ]: 0 : if (result == first->batch) {
# 1307 : : // Switch to temporary batch instead of disturbing caller's batch
# 1308 : 0 : result = tmp_batch_;
# 1309 : 0 : assert(WriteBatchInternal::Count(result) == 0);
# 1310 : 0 : WriteBatchInternal::Append(result, first->batch);
# 1311 : 0 : }
# 1312 : 0 : WriteBatchInternal::Append(result, w->batch);
# 1313 : 0 : }
# 1314 : 0 : *last_writer = w;
# 1315 : 0 : }
# 1316 : 12725 : return result;
# 1317 : 12725 : }
# 1318 : :
# 1319 : : // REQUIRES: mutex_ is held
# 1320 : : // REQUIRES: this thread is currently at the front of the writer queue
# 1321 : 12725 : Status DBImpl::MakeRoomForWrite(bool force) {
# 1322 : 12725 : mutex_.AssertHeld();
# 1323 : 12725 : assert(!writers_.empty());
# 1324 : 12725 : bool allow_delay = !force;
# 1325 : 12725 : Status s;
# 1326 : 12734 : while (true) {
# 1327 [ - + ]: 12734 : if (!bg_error_.ok()) {
# 1328 : : // Yield previous error
# 1329 : 0 : s = bg_error_;
# 1330 : 0 : break;
# 1331 [ + - ][ - + ]: 12734 : } else if (allow_delay && versions_->NumLevelFiles(0) >=
# 1332 : 12734 : config::kL0_SlowdownWritesTrigger) {
# 1333 : : // We are getting close to hitting a hard limit on the number of
# 1334 : : // L0 files. Rather than delaying a single write by several
# 1335 : : // seconds when we hit the hard limit, start delaying each
# 1336 : : // individual write by 1ms to reduce latency variance. Also,
# 1337 : : // this delay hands over some CPU to the compaction thread in
# 1338 : : // case it is sharing the same core as the writer.
# 1339 : 0 : mutex_.Unlock();
# 1340 : 0 : env_->SleepForMicroseconds(1000);
# 1341 : 0 : allow_delay = false; // Do not delay a single write more than once
# 1342 : 0 : mutex_.Lock();
# 1343 [ + - ]: 12734 : } else if (!force &&
# 1344 [ + + ]: 12734 : (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
# 1345 : : // There is room in current memtable
# 1346 : 12725 : break;
# 1347 [ - + ]: 12725 : } else if (imm_ != nullptr) {
# 1348 : : // We have filled up the current memtable, but the previous
# 1349 : : // one is still being compacted, so we wait.
# 1350 : 0 : Log(options_.info_log, "Current memtable full; waiting...\n");
# 1351 : 0 : background_work_finished_signal_.Wait();
# 1352 [ - + ]: 9 : } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
# 1353 : : // There are too many level-0 files.
# 1354 : 0 : Log(options_.info_log, "Too many L0 files; waiting...\n");
# 1355 : 0 : background_work_finished_signal_.Wait();
# 1356 : 9 : } else {
# 1357 : : // Attempt to switch to a new memtable and trigger compaction of old
# 1358 : 9 : assert(versions_->PrevLogNumber() == 0);
# 1359 : 9 : uint64_t new_log_number = versions_->NewFileNumber();
# 1360 : 9 : WritableFile* lfile = nullptr;
# 1361 : 9 : s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
# 1362 [ - + ]: 9 : if (!s.ok()) {
# 1363 : : // Avoid chewing through file number space in a tight loop.
# 1364 : 0 : versions_->ReuseFileNumber(new_log_number);
# 1365 : 0 : break;
# 1366 : 0 : }
# 1367 : 9 : delete log_;
# 1368 : 9 : delete logfile_;
# 1369 : 9 : logfile_ = lfile;
# 1370 : 9 : logfile_number_ = new_log_number;
# 1371 : 9 : log_ = new log::Writer(lfile);
# 1372 : 9 : imm_ = mem_;
# 1373 : 9 : has_imm_.store(true, std::memory_order_release);
# 1374 : 9 : mem_ = new MemTable(internal_comparator_);
# 1375 : 9 : mem_->Ref();
# 1376 : 9 : force = false; // Do not force another compaction if have room
# 1377 : 9 : MaybeScheduleCompaction();
# 1378 : 9 : }
# 1379 : 12734 : }
# 1380 : 12725 : return s;
# 1381 : 12725 : }
# 1382 : :
# 1383 : 0 : bool DBImpl::GetProperty(const Slice& property, std::string* value) {
# 1384 : 0 : value->clear();
# 1385 : :
# 1386 : 0 : MutexLock l(&mutex_);
# 1387 : 0 : Slice in = property;
# 1388 : 0 : Slice prefix("leveldb.");
# 1389 [ # # ]: 0 : if (!in.starts_with(prefix)) return false;
# 1390 : 0 : in.remove_prefix(prefix.size());
# 1391 : :
# 1392 [ # # ]: 0 : if (in.starts_with("num-files-at-level")) {
# 1393 : 0 : in.remove_prefix(strlen("num-files-at-level"));
# 1394 : 0 : uint64_t level;
# 1395 [ # # ][ # # ]: 0 : bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
# 1396 [ # # ][ # # ]: 0 : if (!ok || level >= config::kNumLevels) {
# 1397 : 0 : return false;
# 1398 : 0 : } else {
# 1399 : 0 : char buf[100];
# 1400 : 0 : snprintf(buf, sizeof(buf), "%d",
# 1401 : 0 : versions_->NumLevelFiles(static_cast<int>(level)));
# 1402 : 0 : *value = buf;
# 1403 : 0 : return true;
# 1404 : 0 : }
# 1405 [ # # ]: 0 : } else if (in == "stats") {
# 1406 : 0 : char buf[200];
# 1407 : 0 : snprintf(buf, sizeof(buf),
# 1408 : 0 : " Compactions\n"
# 1409 : 0 : "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
# 1410 : 0 : "--------------------------------------------------\n");
# 1411 : 0 : value->append(buf);
# 1412 [ # # ]: 0 : for (int level = 0; level < config::kNumLevels; level++) {
# 1413 : 0 : int files = versions_->NumLevelFiles(level);
# 1414 [ # # ][ # # ]: 0 : if (stats_[level].micros > 0 || files > 0) {
# 1415 : 0 : snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
# 1416 : 0 : files, versions_->NumLevelBytes(level) / 1048576.0,
# 1417 : 0 : stats_[level].micros / 1e6,
# 1418 : 0 : stats_[level].bytes_read / 1048576.0,
# 1419 : 0 : stats_[level].bytes_written / 1048576.0);
# 1420 : 0 : value->append(buf);
# 1421 : 0 : }
# 1422 : 0 : }
# 1423 : 0 : return true;
# 1424 [ # # ]: 0 : } else if (in == "sstables") {
# 1425 : 0 : *value = versions_->current()->DebugString();
# 1426 : 0 : return true;
# 1427 [ # # ]: 0 : } else if (in == "approximate-memory-usage") {
# 1428 : 0 : size_t total_usage = options_.block_cache->TotalCharge();
# 1429 [ # # ]: 0 : if (mem_) {
# 1430 : 0 : total_usage += mem_->ApproximateMemoryUsage();
# 1431 : 0 : }
# 1432 [ # # ]: 0 : if (imm_) {
# 1433 : 0 : total_usage += imm_->ApproximateMemoryUsage();
# 1434 : 0 : }
# 1435 : 0 : char buf[50];
# 1436 : 0 : snprintf(buf, sizeof(buf), "%llu",
# 1437 : 0 : static_cast<unsigned long long>(total_usage));
# 1438 : 0 : value->append(buf);
# 1439 : 0 : return true;
# 1440 : 0 : }
# 1441 : :
# 1442 : 0 : return false;
# 1443 : 0 : }
# 1444 : :
# 1445 : 26 : void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
# 1446 : : // TODO(opt): better implementation
# 1447 : 26 : MutexLock l(&mutex_);
# 1448 : 26 : Version* v = versions_->current();
# 1449 : 26 : v->Ref();
# 1450 : :
# 1451 [ + + ]: 52 : for (int i = 0; i < n; i++) {
# 1452 : : // Convert user_key into a corresponding internal key.
# 1453 : 26 : InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
# 1454 : 26 : InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
# 1455 : 26 : uint64_t start = versions_->ApproximateOffsetOf(v, k1);
# 1456 : 26 : uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
# 1457 [ + - ]: 26 : sizes[i] = (limit >= start ? limit - start : 0);
# 1458 : 26 : }
# 1459 : :
# 1460 : 26 : v->Unref();
# 1461 : 26 : }
# 1462 : :
# 1463 : : // Default implementations of convenience methods that subclasses of DB
# 1464 : : // can call if they wish
# 1465 : 0 : Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
# 1466 : 0 : WriteBatch batch;
# 1467 : 0 : batch.Put(key, value);
# 1468 : 0 : return Write(opt, &batch);
# 1469 : 0 : }
# 1470 : :
# 1471 : 0 : Status DB::Delete(const WriteOptions& opt, const Slice& key) {
# 1472 : 0 : WriteBatch batch;
# 1473 : 0 : batch.Delete(key);
# 1474 : 0 : return Write(opt, &batch);
# 1475 : 0 : }
# 1476 : :
# 1477 : 1641 : DB::~DB() = default;
# 1478 : :
# 1479 : 1641 : Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
# 1480 : 1641 : *dbptr = nullptr;
# 1481 : :
# 1482 : 1641 : DBImpl* impl = new DBImpl(options, dbname);
# 1483 : 1641 : impl->mutex_.Lock();
# 1484 : 1641 : VersionEdit edit;
# 1485 : : // Recover handles create_if_missing, error_if_exists
# 1486 : 1641 : bool save_manifest = false;
# 1487 : 1641 : Status s = impl->Recover(&edit, &save_manifest);
# 1488 [ + - ][ + - ]: 1641 : if (s.ok() && impl->mem_ == nullptr) {
# 1489 : : // Create new log and a corresponding memtable.
# 1490 : 1641 : uint64_t new_log_number = impl->versions_->NewFileNumber();
# 1491 : 1641 : WritableFile* lfile;
# 1492 : 1641 : s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
# 1493 : 1641 : &lfile);
# 1494 [ + - ]: 1641 : if (s.ok()) {
# 1495 : 1641 : edit.SetLogNumber(new_log_number);
# 1496 : 1641 : impl->logfile_ = lfile;
# 1497 : 1641 : impl->logfile_number_ = new_log_number;
# 1498 : 1641 : impl->log_ = new log::Writer(lfile);
# 1499 : 1641 : impl->mem_ = new MemTable(impl->internal_comparator_);
# 1500 : 1641 : impl->mem_->Ref();
# 1501 : 1641 : }
# 1502 : 1641 : }
# 1503 [ + - ][ + - ]: 1641 : if (s.ok() && save_manifest) {
# 1504 : 1641 : edit.SetPrevLogNumber(0); // No older logs needed after recovery.
# 1505 : 1641 : edit.SetLogNumber(impl->logfile_number_);
# 1506 : 1641 : s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
# 1507 : 1641 : }
# 1508 [ + - ]: 1641 : if (s.ok()) {
# 1509 : 1641 : impl->DeleteObsoleteFiles();
# 1510 : 1641 : impl->MaybeScheduleCompaction();
# 1511 : 1641 : }
# 1512 : 1641 : impl->mutex_.Unlock();
# 1513 [ + - ]: 1641 : if (s.ok()) {
# 1514 : 1641 : assert(impl->mem_ != nullptr);
# 1515 : 1641 : *dbptr = impl;
# 1516 : 1641 : } else {
# 1517 : 0 : delete impl;
# 1518 : 0 : }
# 1519 : 1641 : return s;
# 1520 : 1641 : }
# 1521 : :
# 1522 : 1641 : Snapshot::~Snapshot() = default;
# 1523 : :
# 1524 : 29 : Status DestroyDB(const std::string& dbname, const Options& options) {
# 1525 : 29 : Env* env = options.env;
# 1526 : 29 : std::vector<std::string> filenames;
# 1527 : 29 : Status result = env->GetChildren(dbname, &filenames);
# 1528 [ + + ]: 29 : if (!result.ok()) {
# 1529 : : // Ignore error in case directory does not exist
# 1530 : 4 : return Status::OK();
# 1531 : 4 : }
# 1532 : :
# 1533 : 25 : FileLock* lock;
# 1534 : 25 : const std::string lockname = LockFileName(dbname);
# 1535 : 25 : result = env->LockFile(lockname, &lock);
# 1536 [ + - ]: 25 : if (result.ok()) {
# 1537 : 25 : uint64_t number;
# 1538 : 25 : FileType type;
# 1539 [ + + ]: 220 : for (size_t i = 0; i < filenames.size(); i++) {
# 1540 [ + + ]: 195 : if (ParseFileName(filenames[i], &number, &type) &&
# 1541 [ + + ]: 195 : type != kDBLockFile) { // Lock file will be deleted at end
# 1542 : 120 : Status del = env->DeleteFile(dbname + "/" + filenames[i]);
# 1543 [ + - ][ - + ]: 120 : if (result.ok() && !del.ok()) {
# 1544 : 0 : result = del;
# 1545 : 0 : }
# 1546 : 120 : }
# 1547 : 195 : }
# 1548 : 25 : env->UnlockFile(lock); // Ignore error since state is already gone
# 1549 : 25 : env->DeleteFile(lockname);
# 1550 : 25 : env->DeleteDir(dbname); // Ignore error in case dir contains other files
# 1551 : 25 : }
# 1552 : 25 : return result;
# 1553 : 25 : }
# 1554 : :
# 1555 : : } // namespace leveldb
|