Leveldb每次在进行数据写入的时候都会先将数据写到日志文件中,然后再往Memtable中写,这样做的目的是为了防止在写入Memtable的过程中因为进程异常、系统掉电等情况导致数据丢失。 日志文件是按照Block来组织的,每个Block大小为32K,一个Block中会包含多个记录。每条记录都包含了四个部分,分别是: Checksum、Length、RecordType、Data。格式如下图:
其中这里的RecordType
其取值范围如下:
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
这个RecordType的目的主要是为了解决那些单条数据超过32K大小的问题,对于这类数据只能分成多条记录来写,因此有了RecordType来表示当前的记录是否是完整的,还是片段,如果是片段,那那个是第一个片段、那些是中间片段、那个是最后一个片段,通过RecordType就可以将多个片段组装一条完整的记录。Chechsum部分很好理解,就是记录了当前记录的校验和,用来做数据正确性校验的。Length部分则是用来标识。Leveldb规定Checksum占4个字节(正好满足crc32算法的需求),Length占2个字节(2^16次方正好是32K,一个block的大小,所以是足够的)、 RecordType占用1个字节。这三个部分被称为固定的Header,总共占用7个字节,下面这个方法就是用来写入一条完整记录的方法。
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
// Format the header
// 1. 先写入header和RecordType
char buf[kHeaderSize];
buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
buf[6] = static_cast<char>(t);
// 2. 开始计算crc32,然后写入
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// 3. 最后写入Data
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + length;
return s;
}
到此为止日志的写入过程就分析完了,接下来我们看下WAL实际是如何使用的,下面是Leveldb批量写的实现。
Leveldb的写入总是批量的,即使有单次写入的API,最终在内部使用的时候,也是使用批量写的方式来做的。
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
......
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
// 1. 先写WAL
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 2. 再写入Memtable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
......
通过上面的代码我们可以看到,实际上一条记录中的数据包含了WriteBatch中的多个key/value,那么问题来了,我通过WAL读取到记录后,也成功拿到了保存的数据,那么我如何从这个数据中获取到多个key/value呢?WAL是不管数据中有多少key/value的,他只负责存储和读取数据,至于读到的数据如何解析成多个key/value,这个是交由使用者来处理的。WAL可以在多个地方被使用,其Data部分的格式可以是任意的,在Leveldb中不仅仅是用来存放key/value,还被用来存放Manifest了。WAL文件何时删除呢? 如果一直持续的写入那么WAL文件迟早会撑暴。而且WAL文件那么大,在出现故障的时候恢复也是一个问题,另外在Leveldb进行Compaction后部分key/value将会落盘,那么WAL中对应的数据也应该要被清理掉。否则就会导致数据重复的问题。在Memtable
部分我们介绍过,Memtable
的大小到达一个阀值时会被冻结变成immutable
,只能被读取无法进行写入,这个时候对应的WAL文件也会被冻结,然后创建一个新的WAL文件,等immutable
进行Compaction后被写入到磁盘时,对应的WAL文件才能安全的删除。
Status DBImpl::MakeRoomForWrite(bool force) {
......
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\\n");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\\n");
background_work_finished_signal_.Wait();
} else {
// 创建immutable,获取新的WAL文件名,然后重新打开,后续使用新的WAL文件。
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
MakeRoomForWrite
每次在发生写入的时候都会调用,会判断当前的Memtable大小是否超过阀值,如果超过了就切换成功immutable,并重新打开一个新的WAL文件用于记录。
接着我们来看下Compaction的实现。
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);
// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
// 将Memtable写成sstable
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
// Compaction完成后,删除不需要的文件,包含了WAL文件。
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}