简介

LevelDB的官方注释是这么介绍WriteBatch的:

WriteBatch holds a collection of updates to apply atomically to a DB

如何保证原子性可能需要看完对WriteBatch的使用才能理清楚,这里只能确定一个WriteBatch对象可以包含多条更新记录(插入/删除),支持批量写入。

WriteBatch的很多操作是通过辅助类来实现的,辅助类会直接操作WriteBatch的成员变量,本文会先介绍WriteBatch的成员变量和这些辅助类,最后介绍WrteBatch的成员函数

WriteBatch的成员变量

WriteBatch只有一个成员,是个字符串类型。所有更新记录都会编码后写入到这个 rep_ 中。

std::string rep_;

rep_ 的编码格式如下:

字节数 8字节 4字节 变长 变长 变长 变长
内容 sequence count record 1 reocrd 2 record 3 record …

sequence是一个64bit的序列号,每个WriteBatch都有一个唯一序列号。

count为rep_包含的record(更新记录)数量。

count后面是record列表,每个record的编码格式相同,格式如下:

字节数 1字节 变长 key size 变长 value size
内容 ValueType key size key value size value

key size和value size使用的是LevelDB的变长编码格式

WriteBatchInternal 辅助类

WriteBatchInternal没有成员变量,只有成员函数。

SetCount与Count

void WriteBatchInternal::SetCount(WriteBatch* b, int n)
{
  EncodeFixed32(&b->rep_[8], n);
}

int WriteBatchInternal::Count(const WriteBatch* b)
{
  return DecodeFixed32(b->rep_.data() + 8);
}

SetCount调用EncodeFixed32把输入的参数n以小端编码方式写入WriteBatch的rep_的count域。

Count调用DecodeFixed32把rep_的count域解码后返回。

SetSequence与Sequence

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq)
{
  EncodeFixed64(&b->rep_[0], seq);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b)
{
  return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

SetSequence调用EncodeFixed64把序列号seq以小端编码写入到rep_的sequence域。这里的SequenceNumber其实就是std::uint64_t类型。

Sequence函数调用DecodeFixed64把rep_的sequence域解码后返回。

SetContents与Contents、ByteSize

void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents)
{
  assert(contents.size() >= kHeader);
  b->rep_.assign(contents.data(), contents.size());
}

static Slice WriteBatchInternal::Contents(const WriteBatch* batch){ return Slice(batch->rep_); }

static size_t WriteBatchInternal::ByteSize(const WriteBatch* batch){ return batch->rep_.size(); }

SetContent用conents覆盖了rep_。kHeader的值为12,就是sequence域和count域的总长度,这两个域是必须有的。

Contents把rep_包装成Slice对象返回。Slice在前面的文章提过,当成字符串看待就可以了。

ByteSize返回rep_的size,即rep_的字节数。

Append

void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src)
{
  SetCount(dst, Count(dst) + Count(src));
  assert(src->rep_.size() >= kHeader);
  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

Append是比较重要的一个函数,有两个参数,dst和src。Append把src的数据附加合并(append)到dst中。

合并后,dst的sequence域不变,dst的count域值等于dst和src的count之和。src的record列表附加合并到了dst的reocrd列表后面。

InsertInto

Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable)
{
  MemTableInserter inserter;
  inserter.sequence_ = WriteBatchInternal::Sequence(b);
  inserter.mem_ = memtable;
  return b->Iterate(&inserter);
}

构造MemTableInserter对象,填充sequence和Memtable对象指针。然后调用WriteBatch的Iterate把当前batch的更新记录插入到memtable中。

MemTableInserter辅助类

MemTableInserter辅助类用于把记录插入MemTable。MemTableInserter继承了Handler,Handler是一个抽象类,只有Put和Delete两个纯虚函数。

Handler使用了模板方法设计模式,这样可以很方便地搞出不同版本的Handler,比如MemTableInserterPro等。

namespace
{
  class MemTableInserter : public WriteBatch::Handler
  {
  public:
    SequenceNumber sequence_;
    MemTable* mem_;

    void Put(const Slice& key, const Slice& value) override
    {
      mem_->Add(sequence_, kTypeValue, key, value);
      sequence_++;
    }

    void Delete(const Slice& key) override
    {
      mem_->Add(sequence_, kTypeDeletion, key, Slice());
      sequence_++;
    }
  };
}

笔者看到代码的时候非常困惑,为什么要写一个没有名字的namespace呢?特意去查了一下,这种叫做匿名namespace,可以让namespace内部的函数、类、对象等只能在当前文件里被访问,和C语言的static作用域类似。

成员变量

MemTableInserter包含两个成员变量,sequence_和mem_。sequence_是插入Memtable时要填写的序列号。mem_是Memtable对象指针。

Put

调用Memtable的Add函数,添加一条记录,记录类型为kTypeValue,表示这是一个插入记录。

Delete

调用Memtable的Add函数,添加一条记录,记录类型为kTypeDeletion,表示这是一个删除记录。

WriteBatch 成员函数

Put

void WriteBatch::Put(const Slice& key, const Slice& value)
{
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

首先是把rep_的count域加1。然后把ValueType、key和value编码成一个record,写到rep_的后面。

Delete

void WriteBatch::Delete(const Slice& key)
{
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeDeletion));
  PutLengthPrefixedSlice(&rep_, key);
}

LevelDB中删除元素是通过添加一条删除记录来实现的,所以删除还是添加一条记录,和Put的区别有两个,一个是record的ValueType是kTypeDeletetion,另一个是表示删除的record只有key,没有value。

Append

void WriteBatch::Append(const WriteBatch& source)
{
  WriteBatchInternal::Append(this, &source);
}

调用WriteBatchInternal的Append,把source合并到当前的WriteBatch对象。

Iterate

Status WriteBatch::Iterate(Handler* handler) const
{
  Slice input(rep_);
  if( input.size() < kHeader )
  {
    return Status::Corruption("malformed WriteBatch (too small)");
  }

  input.remove_prefix(kHeader);
  Slice key, value;
  int found = 0;
  while( !input.empty())
  {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch( tag )
    {
      case kTypeValue:
        if( GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value))
        {
          handler->Put(key, value);
        }else
        {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if( GetLengthPrefixedSlice(&input, &key))
        {
          handler->Delete(key);
        }else
        {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if( found != WriteBatchInternal::Count(this))
  {
    return Status::Corruption("WriteBatch has wrong count");
  }else
  {
    return Status::OK();
  }
}

上面的代码挺多,但是逻辑很简单,就是遍历。把WriteBatch的rep_解码,对每个record,根据ValueType域,调用handler的Put添加插入记录或者调用Delete添加删除记录。

遍历过程中和遍历完成后都会进行一些校验,如果发现数据有问题,就返回一个”Corruption”状态。如果数据ok,返回一个”OK”状态。

ApproximateSize 与 Clear

ApproximateSize返回rep_的size,Clear清除rep_的record列表,并把sequence域和count域置0。

为什么要独立出来一个WriteBatchInternal

从代码功能来看,作者把对WriteBatch的rep_的解码和编码工作都收纳到了WriteBatchInternal里,而WriteBatch只有供外部使用的封装接口。使让代码结构更加清晰。

但是有一个例外,WriteBatch的Iterate成员函数是有对rep_进行解码的。按照上面的想法,Iterate应当由WriteBatchInternal来实现,WriteBatch再调用才对。可能是笔者对这里的理解还有些偏差,欢迎在评论区发表想法。

源码版本

https://github.com/google/leveldb/releases/tag/1.22