在Leveldb中Compation分为两种,第一种被称为Minor Compation,只有当immutable存在的时候才会触发,将immutable变成sstable,其过程如下:
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);
// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
// 1. 产生新的sstable文件,因此这里初始化了VersionEdit来记录sstable文件的变化
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
// 2. 应用到VersionSet中。
s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
// 3. 释放immutable,然后异常一些废弃的文件
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}
产生的sstable文件也是分为多个level的。在sstable中查找的时候,先从高level的sstable文件中查找,然后依次从下一个level中进行查找,高level的sstable包含的key范围越广。上面的WriteLevel0Table也并非只是将sstable写入到level0中,而是会通过下面这个函数找到一个合适的level。
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
// 1. 先判断level0中的sstable和新增的sstable是否重叠,如果重叠就直接添加进去等着Major Compaction
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
// 2. 如果不重叠就继续往下一个level查找,如果下一个level也不重叠,并且下下个level重叠的大小小于 10 * 2M。
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break;
}
}
// 3. 符合条件递增level,继续查找。
level++;
}
}
return level;
}
这么做的目的是为了避免Compation,当一个sstable文件准备添加进去的时候,如果和level0并没有任何键重叠,并且上一层也没有重叠,而且上上层重叠大小不超过20M,那么可以直接将文件放到上一层,避免Major Compaction,算是一种优化吧。不过这个条件还是挺苛刻的。后面在谈到Major Compaction的时候会说到leveldb中更多关于Compation的优化。到此为止minor Compation算是讲完了,还是比较简单的,这也是leveldb对于Minor Compation的要求,要求其尽可能短的时间内完成,否则会堵塞正常的写入操作,因此Minor Compation的优先级是高于major compation。
Leveldb中除了上面谈到的Minor Compation外,还有一个最重要的Compation就是Major Compation了,上面说到一次minor compaction的产出是一个0层的sstable文件(也有可能因为优化产生一个高level的文件),其中包含了所有的内存数据。但是若干个0层文件中是可能存在数据overlap的。如果只有minor compaction,那么要查询一个数据的时候,就导致了我们需要遍历所有的level0文件。为此有了Major Compation。通过Major Compation将level0层中重叠的文件进行merge变成不重叠的文件,并放到level1中。以确保level1的文件都是不重叠的,那么我们只需要在level1中搜索依次就可以找到目的文件了。那么到底何时触发Major Compation呢? Major Compation的过程是如何的? 首先来看下Major Compation发生的条件:
每次做完Minor Compation的时候都会调用Finalize来计算Major Compation需要操作的文件和level,计算的条件就是上面的第一条和第二条,下面这个函数会将第一个和第二条件转换分数进行计算。
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
每次进行数据查询的时候,都会产生stats,这些stats记录了哪个文件未查询到的次数,如果无效读取次数太多就会记录下来,保存在file_to_compact_中,这就是对应到上面的第三个条件。
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
最后通过compaction_score_和file_to_compact_两个值来决定是否需要做Major Compation。
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
// 如果满足Compation的条件,开始进行下一次Compation
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
MaybeScheduleCompaction会在以下几个地方执行。
说完了Major Compation触发的条件,那么接下来我们来看看Major Compation的过程是如何的?
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);
return c;
}