Leveldb每次在进行数据写入的时候都会先将数据写到日志文件中,然后再往Memtable中写,这样做的目的是为了防止在写入Memtable的过程中因为进程异常、系统掉电等情况导致数据丢失。 日志文件是按照Block来组织的,每个Block大小为32K,一个Block中会包含多个记录。每条记录都包含了四个部分,分别是: Checksum、Length、RecordType、Data。格式如下图:

journal.jpeg

其中这里的RecordType其取值范围如下:

enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,

  kFullType = 1,

  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};

这个RecordType的目的主要是为了解决那些单条数据超过32K大小的问题,对于这类数据只能分成多条记录来写,因此有了RecordType来表示当前的记录是否是完整的,还是片段,如果是片段,那那个是第一个片段、那些是中间片段、那个是最后一个片段,通过RecordType就可以将多个片段组装一条完整的记录。Chechsum部分很好理解,就是记录了当前记录的校验和,用来做数据正确性校验的。Length部分则是用来标识。Leveldb规定Checksum占4个字节(正好满足crc32算法的需求),Length占2个字节(2^16次方正好是32K,一个block的大小,所以是足够的)、 RecordType占用1个字节。这三个部分被称为固定的Header,总共占用7个字节,下面这个方法就是用来写入一条完整记录的方法。

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
                                  size_t length) {
  assert(length <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + length <= kBlockSize);

  // Format the header
  // 1. 先写入header和RecordType
  char buf[kHeaderSize];
  buf[4] = static_cast<char>(length & 0xff);
  buf[5] = static_cast<char>(length >> 8);
  buf[6] = static_cast<char>(t);
  // 2. 开始计算crc32,然后写入
  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
  crc = crc32c::Mask(crc);  // Adjust for storage
  EncodeFixed32(buf, crc);
  // 3. 最后写入Data
  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, length));
    if (s.ok()) {
      s = dest_->Flush();
    }
  }
  block_offset_ += kHeaderSize + length;
  return s;
}

到此为止日志的写入过程就分析完了,接下来我们看下WAL实际是如何使用的,下面是Leveldb批量写的实现。

Leveldb的写入总是批量的,即使有单次写入的API,最终在内部使用的时候,也是使用批量写的方式来做的。

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
        WriteBatch batch;
        batch.Put(key, value);
        return Write(opt, &batch);
    }
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  ......
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      // 1. 先写WAL
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
       // 2. 再写入Memtable
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }
  ......

通过上面的代码我们可以看到,实际上一条记录中的数据包含了WriteBatch中的多个key/value,那么问题来了,我通过WAL读取到记录后,也成功拿到了保存的数据,那么我如何从这个数据中获取到多个key/value呢?WAL是不管数据中有多少key/value的,他只负责存储和读取数据,至于读到的数据如何解析成多个key/value,这个是交由使用者来处理的。WAL可以在多个地方被使用,其Data部分的格式可以是任意的,在Leveldb中不仅仅是用来存放key/value,还被用来存放Manifest了。WAL文件何时删除呢? 如果一直持续的写入那么WAL文件迟早会撑暴。而且WAL文件那么大,在出现故障的时候恢复也是一个问题,另外在Leveldb进行Compaction后部分key/value将会落盘,那么WAL中对应的数据也应该要被清理掉。否则就会导致数据重复的问题。在Memtable部分我们介绍过,Memtable的大小到达一个阀值时会被冻结变成immutable,只能被读取无法进行写入,这个时候对应的WAL文件也会被冻结,然后创建一个新的WAL文件,等immutable进行Compaction后被写入到磁盘时,对应的WAL文件才能安全的删除。

Status DBImpl::MakeRoomForWrite(bool force) {
  ......
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (allow_delay && versions_->NumLevelFiles(0) >=
                                  config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
    } else if (imm_ != nullptr) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\\n");
      background_work_finished_signal_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\\n");
      background_work_finished_signal_.Wait();
    } else {
      // 创建immutable,获取新的WAL文件名,然后重新打开,后续使用新的WAL文件。
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.store(true, std::memory_order_release);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;  // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

MakeRoomForWrite每次在发生写入的时候都会调用,会判断当前的Memtable大小是否超过阀值,如果超过了就切换成功immutable,并重新打开一个新的WAL文件用于记录。 接着我们来看下Compaction的实现。

void DBImpl::CompactMemTable() {
  mutex_.AssertHeld();
  assert(imm_ != nullptr);

  // Save the contents of the memtable as a new Table
  VersionEdit edit;
  Version* base = versions_->current();
  base->Ref();
  // 将Memtable写成sstable
  Status s = WriteLevel0Table(imm_, &edit, base);
  base->Unref();

  if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
    s = Status::IOError("Deleting DB during memtable compaction");
  }

  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    s = versions_->LogAndApply(&edit, &mutex_);
  }

  if (s.ok()) {
    // Commit to the new state
    imm_->Unref();
    imm_ = nullptr;
    has_imm_.store(false, std::memory_order_release);
    // Compaction完成后,删除不需要的文件,包含了WAL文件。
    RemoveObsoleteFiles();
  } else {
    RecordBackgroundError(s);
  }
}