leveldb对外提供的写入接口是Put,Put的实现如下:

Status DB::Put(const WriteOptions &opt, const Slice &key, const Slice &value)
{
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

WriteBatch是一个比较简单的类,只有一个string类型的成员变量rep_。提供了插入、删除、遍历等操作,详细实现可以参考LevelDB源码解析(5) WriteBatch。这里可以把WriteBatch理解为一个key-value的容器,一个WriteBatch可能包含多个key-value。

最终写入是通过Write函数来实现的,Writer的主流程如下:

leveldb_Write主流程.png

函数声明

Status DBImpl::Write(const WriteOptions &options, WriteBatch *updates)

  • options:用于设置一些写入时的参数
  • updates:要写入的数据

Writer结构

Writer类的定义如下:

struct DBImpl::Writer
{
  explicit Writer(port::Mutex *mu) : batch(nullptr), sync(false), done(false), cv(mu)
  {}

  Status status;
  WriteBatch *batch;
  bool sync;
  bool done;
  port::CondVar cv;
};

Writer类是写入任务的载体,只有5个成员变量:

  • status:存储写入的状态
  • batch:指向要写入的数据
  • sync:表示是否要把预写日志(Write Ahead Log, WAL)Sync到磁盘上
  • done:表示Writer是否完成
  • cv:对条件变量的封装

构造Writer的逻辑很简单,设置batch和sync,然后把done设置为false,表示当前任务未完成。

Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

加入任务队列并等待

MutexLock l(&mutex_);
writers_.push_back(&w);
while ( !w.done && &w != writers_.front())
{
  w.cv.Wait();
}
if ( w.done )
{
  return w.status;
}
  • Step1:加锁,mutex保证了同一时刻只有一个写线程能够执行。
  • Step2:把任务w加入到任务队列writers_中。
  • Step3:基于条件变量,循环检查w是否被完成。(其他Write线程如果先执行了,可能会把队列中多个任务一起完成),如果执行完成进入Step4,如果未完成进入Step5。这里能够检查,说明条件变量被Signal唤醒,并且拿到mutex锁了。
  • Step4:结束执行,返回保存在w.status中的执行结果。
  • Step5:判断当前任务w是否在writers_的队列头,如果是,那么本线程应当开始执行,如果不是,那么继续等待。

上面的逻辑意味着,如果一个Write线程能够执行,必须同时满足以下两个条件:

  1. 当前线程的任务没有被完成
  2. 当前任务位于队列头

Memtable腾空间

Status status = MakeRoomForWrite(updates == nullptr);

腾空间是调用MakeRoomForWrite来完成的,虽然只有一行代码,但是是最复杂的一部分了。虽然实现复杂,但是作用很简单,就是确保Memtable有足够空间可以写入,如果Memtable空间不足,就需要创建新的Memtable,旧的Memtable变成Immutable Memtable,Immutable Memtable可能要落盘。详细逻辑下一篇文章介绍。

合并Writer

把并发的多个Writer任务合并,一次完成写入可以降低写入开销,提高写操作的吞吐量,因为进入到这里是在加锁状态,一次最多一个Write线程进入到这里,所以可以安全地对任务队列writers_里的Writer进行合并。

uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

合并的主要逻辑在BuildBatchGroup里面,BuildBatchGroup合并是是按顺序合并的,即从队列的头部开始,往后遍历,只要碰到第一个不符合合并要求Writer,就结束合并。合并完成后,last_writer指向最后一个被合并的Writer。如果合并的任务中只有当前线程的任务,即没有合并,那么last_writer指向的就是当前线程的任务w。合并后的任务保存在返回的write_batch中。

LastSequence返回下一个可用的sequence,SetSequence在write_batch中写入本次的sequence。这个sequence是递增的,每写入一个key-value就会加1。Count返回write_batch中的key-value个数,last_sequence加上这个count就是下一次写入时应该使用的sequence了。这个sequence在写入成功后会被保存到版本信息库versions_中。

代码如下,核心逻辑上面介绍过了,详细逻辑直接看注释吧。

WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer)
{
  mutex_.AssertHeld();
  assert(!writers_.empty());

  //从第一个write开始
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != nullptr);

  //返回rep_的size
  size_t size = WriteBatchInternal::ByteSize(first->batch);

  //如果第一个writer的batch_size小于阈值,就把max_size调低,避免合并影响比较小的key-value的写入耗时。
  //如果key-value的size比较下,那么max_size不变,合并的writer数量可能非常多,影响当前写的的key-value的响应耗时。
  size_t max_size = 1 << 20;
  if( size <= (128 << 10)) // 128 << 10 ==  1 << 17
  {
    max_size = size + (128 << 10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  //从第二个开始遍历(第一个已经在result中了)
  ++iter;
  for( ; iter != writers_.end(); ++iter )
  {
    Writer* w = *iter;
    if( w->sync && !first->sync )
    {
      //如果第一个writer不sync,而当前writer要求sync,那么停止合并。
      break;
    }

    if( w->batch != nullptr )
    {
      size += WriteBatchInternal::ByteSize(w->batch);
      if( size > max_size )
      {
        // 合并后的size超过max_size,停止合并
        break;
      }

      if( result == first->batch )
      {
        // 判断为真,说明至少有两个writer可以合并(包括第一个),那么久使用tmp_batch_来承载合并结果,并把第一个batch加入到tmp_batch_中
        // 如果直接往result里面添加,那么会影响到调用方传入的write batch。
        // 如果只有一个writer(即不需要合并),那么返回的就是第一个writer的batch。
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }

    //把last_writer指向新加入的Writer
    *last_writer = w;
  }

  //最后返回result,这个result可能只有第一个writer的batch,也可能是有多个w的batch合体
  return result;
}

result是第1个到第N个batch的合并结果,N可能是1,即result只包含第一个writer的结果。last_writer指向result并入的最后一个batch对应的Writer。

只有一个batch时,tmp_batch_ != result,反之tmp_batch_ == result。三种情况会导致result只有一个batch:

  1. writers_只有一个Writer。
  2. writers_的第二个Writer是要求sync的,但是第一个Writer不要求sync,那么不能合并,遍历到第二个Writer时就结束了。
  3. writers_的第一个和第二个Writer的合并size超过了max_size,提前终止合并。

写操作流水和写入Memtable

  // 这里解锁是为了让其他非Write的写线程可以拿到mutex,
  // 其他Write线程即使拿到了锁,也会发现自己的Writer不在队列头,所以不会执行。
  mutex_.Unlock();
  
  status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
  bool sync_error = false;
  if ( status.ok() && options.sync )
  {
    status = logfile_->Sync();
    if ( !status.ok())
    {
      sync_error = true;
    }
  }

  if ( status.ok())
  {
    status = WriteBatchInternal::InsertInto(write_batch, mem_);
  }
  mutex_.Lock();

写入分为两步,第一步是写预写日志(Write Ahead Log, WAL),这个主要目的是故障恢复,因为新写入的key-value不会直接写入到磁盘里,而是先放到Memtable中,写满后会转为Immutable Memtable,最后才会写到磁盘上。如果出现故障,那么Memtable和Immutable Memtable中的数据就会丢失,所以每次写key-value时会先写WAL并且落盘。然后才会写入到Memtable。

在看这里的时候产生一个奇怪的想法,分享一下吧,想法是如果还没写WAL或者没写完就宕机了,这些key-value数据不也丢失了吗?后来想了一下,确实是这样,但是没关系,因为并没有告诉调用方写入成功了,在调用方的视角里写入就是失败的。而Memtable和Immutable Table中的数据在调用方视角是已经写成功了,那么这部分的数据需要保证故障后能够恢复,不然数据就不完整了。

写WAL是调用AddRecord来实现的,这个函数会把write_batch的完整数据都写入到磁盘里。如果options设置了sync,那么每次AddReocrd之后都要调用Sync做同步,强制刷磁盘,这个也就是普遍意义上的Sync了。如果sync为false,AddRecord实际上还是会在内存中缓存一下的,不保证每次写磁盘。调用方可以根据需要设置sync选项,比如数据丢失可以接受,那么就可以把sync设置为false,以获得更好的写性能。

第二步是写入Memtable,调用WriteBatch的接口把WriteBatch的所有key-value插入到Memtable中,具体的实现在LevelDB源码解析(5) WriteBatch中介绍了,这里不说了。

写完后就是错误检查和一些收尾工作,代码如下:

  // 重新加锁
  mutex_.Lock();
  if( sync_error )
  {
    // WAL落盘出错,让后续的写操作都失败(还没没有看到回复机制,不知道有没有)
    RecordBackgroundError(status);
  }
  // write_batch有可能是使用tmp_batch_来返回的,写入成功后要把tmp_batch_情况,以便下次写入
  if( write_batch == tmp_batch_ )
  {
    tmp_batch_->Clear();
  }
  // 设置Sequence
  versions_->SetLastSequence(last_sequence);

唤醒被条件变量阻塞的其他线程

  while ( true )
  {
    Writer *ready = writers_.front();
    writers_.pop_front();
    if ( ready != &w )
    {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if( ready == last_writer )
    {
      break;
    }
  }
  if( !writers_.empty())
  {
    writers_.front()->cv.Signal();
  }
  return status;

从队列头开始遍历检查Writer,直到碰到last_writer。本次写入的数据就是由这些任务合并而来的。把遍历到的Writer出队,并设置status,把done设置为true,并Signal唤醒该线程。被唤醒的线程拿到锁后,就会发现自己的Writer已经被完成了,直接返回。

显然,当前线程不需要被Signal,也不需要同步状态,所以上面的操作跳过了当前线程的任务.

最后检查任务队列writers_是否为空,如果不为空,就唤醒队头的Writer对应的线程,开始下一轮写入。

最后返回status,Writer主流程到此结束,返回时MutexLock的析构函数会触发mutex的解锁操作,释放锁。锁被释放后,其他Write线程就可以拿到锁启动了。