在Leveldb中Compation分为两种,第一种被称为Minor Compation,只有当immutable存在的时候才会触发,将immutable变成sstable,其过程如下:

void DBImpl::CompactMemTable() {
  mutex_.AssertHeld();
  assert(imm_ != nullptr);

  // Save the contents of the memtable as a new Table
  VersionEdit edit;
  Version* base = versions_->current();
  base->Ref();
  // 1. 产生新的sstable文件,因此这里初始化了VersionEdit来记录sstable文件的变化
  Status s = WriteLevel0Table(imm_, &edit, base);
  base->Unref();

  if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
    s = Status::IOError("Deleting DB during memtable compaction");
  }

  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    // 2. 应用到VersionSet中。
    s = versions_->LogAndApply(&edit, &mutex_);
  }

  if (s.ok()) {
    // 3. 释放immutable,然后异常一些废弃的文件
    // Commit to the new state
    imm_->Unref();
    imm_ = nullptr;
    has_imm_.store(false, std::memory_order_release);
    RemoveObsoleteFiles();
  } else {
    RecordBackgroundError(s);
  }
}

产生的sstable文件也是分为多个level的。在sstable中查找的时候,先从高level的sstable文件中查找,然后依次从下一个level中进行查找,高level的sstable包含的key范围越广。上面的WriteLevel0Table也并非只是将sstable写入到level0中,而是会通过下面这个函数找到一个合适的level。

int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
                                        const Slice& largest_user_key) {
  int level = 0;
  // 1. 先判断level0中的sstable和新增的sstable是否重叠,如果重叠就直接添加进去等着Major Compaction
  if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
    // Push to next level if there is no overlap in next level,
    // and the #bytes overlapping in the level after that are limited.
    InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
    std::vector<FileMetaData*> overlaps;
    // 2. 如果不重叠就继续往下一个level查找,如果下一个level也不重叠,并且下下个level重叠的大小小于 10 * 2M。
    while (level < config::kMaxMemCompactLevel) {
      if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
        break;
      }
      if (level + 2 < config::kNumLevels) {
        // Check that file does not overlap too many grandparent bytes.
        GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
        const int64_t sum = TotalFileSize(overlaps);
        if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
          break;
        }
      }
      // 3. 符合条件递增level,继续查找。
      level++;
    }
  }
  return level;
}

这么做的目的是为了避免Compation,当一个sstable文件准备添加进去的时候,如果和level0并没有任何键重叠,并且上一层也没有重叠,而且上上层重叠大小不超过20M,那么可以直接将文件放到上一层,避免Major Compaction,算是一种优化吧。不过这个条件还是挺苛刻的。后面在谈到Major Compaction的时候会说到leveldb中更多关于Compation的优化。到此为止minor Compation算是讲完了,还是比较简单的,这也是leveldb对于Minor Compation的要求,要求其尽可能短的时间内完成,否则会堵塞正常的写入操作,因此Minor Compation的优先级是高于major compation。

Leveldb中除了上面谈到的Minor Compation外,还有一个最重要的Compation就是Major Compation了,上面说到一次minor compaction的产出是一个0层的sstable文件(也有可能因为优化产生一个高level的文件),其中包含了所有的内存数据。但是若干个0层文件中是可能存在数据overlap的。如果只有minor compaction,那么要查询一个数据的时候,就导致了我们需要遍历所有的level0文件。为此有了Major Compation。通过Major Compation将level0层中重叠的文件进行merge变成不重叠的文件,并放到level1中。以确保level1的文件都是不重叠的,那么我们只需要在level1中搜索依次就可以找到目的文件了。那么到底何时触发Major Compation呢? Major Compation的过程是如何的? 首先来看下Major Compation发生的条件:

  1. 当0层文件数超过预定的上限(默认为4个)
  2. 当level i层文件的总大小超过(10 ^ i) MB;
  3. 当某个文件无效读取的次数过多

每次做完Minor Compation的时候都会调用Finalize来计算Major Compation需要操作的文件和level,计算的条件就是上面的第一条和第二条,下面这个函数会将第一个和第二条件转换分数进行计算。

void VersionSet::Finalize(Version* v) {
  // Precomputed best level for next compaction
  int best_level = -1;
  double best_score = -1;

  for (int level = 0; level < config::kNumLevels - 1; level++) {
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
      score = v->files_[level].size() /
              static_cast<double>(config::kL0_CompactionTrigger);
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score =
          static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}

每次进行数据查询的时候,都会产生stats,这些stats记录了哪个文件未查询到的次数,如果无效读取次数太多就会记录下来,保存在file_to_compact_中,这就是对应到上面的第三个条件。

bool Version::UpdateStats(const GetStats& stats) {
  FileMetaData* f = stats.seek_file;
  if (f != nullptr) {
    f->allowed_seeks--;
    if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
      file_to_compact_ = f;
      file_to_compact_level_ = stats.seek_file_level;
      return true;
    }
  }
  return false;
}

最后通过compaction_score_和file_to_compact_两个值来决定是否需要做Major Compation。

// Returns true iff some level needs a compaction.
  bool NeedsCompaction() const {
    Version* v = current_;
    return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
  }

  void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (background_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.load(std::memory_order_acquire)) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    // 如果满足Compation的条件,开始进行下一次Compation
    background_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);
  }
}

MaybeScheduleCompaction会在以下几个地方执行。

  1. 上一次Compation完成后,会调用MaybeScheduleCompaction,决定是否需要进行下一次Compation
  2. 每次查询完数据后,会调用MaybeScheduleCompaction
  3. Mmemtab切换成immutable的时候
  4. RecordReadSample

说完了Major Compation触发的条件,那么接下来我们来看看Major Compation的过程是如何的?

Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  // We prefer compactions triggered by too much data in a level over
  // the compactions triggered by seeks.
  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != nullptr);
  if (size_compaction) {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level + 1 < config::kNumLevels);
    c = new Compaction(options_, level);

    // Pick the first file that comes after compact_pointer_[level]
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      FileMetaData* f = current_->files_[level][i];
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // Wrap-around to the beginning of the key space
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {
    level = current_->file_to_compact_level_;
    c = new Compaction(options_, level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return nullptr;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  SetupOtherInputs(c);

  return c;
}