背景

LevelDB每次写入key-value都是写入到内存中的Memtable中的,但是Memtable的空间不是无限的,Memtable写满后,就需要调用MakeRoomForWrite把Memtable转存为Immutable Memtable,并创建新的Memtable来存储写入数据。必要时还会调度后台线程把Immutable Memtable落盘,以及合并SST文件。

MakeRoomForWrite

Status DBImpl::MakeRoomForWrite(bool force)
{
  mutex_.AssertHeld();

  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while ( true )
  {
    //Step0
    if ( !bg_error_.ok())
    {
      //Step1
      s = bg_error_;
      break;
    } else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger )
    {
      //Step2
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if ( !force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size))
    {
      //Step3
      // There is room in current memtable
      break;
    } else if ( imm_ != nullptr )
    {
      //Step4
      Log(options_.info_log, "Current memtable full; waiting...\n");
      background_work_finished_signal_.Wait();
    } else if ( versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger )
    {
      //Step5
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      background_work_finished_signal_.Wait();
    } else
    {
      //Step6
      assert(versions_->PrevLogNumber() == 0);

      uint64_t new_log_number = versions_->NewFileNumber();

      WritableFile *lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if ( !s.ok())
      {
        versions_->ReuseFileNumber(new_log_number);
        break;
      }

      delete log_; //log::Writer* log_;
      delete logfile_; //WritableFile
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.store(true, std::memory_order_release);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;  // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

force表示是否强制把Memtable转存为Immutable Memtable。allow_delay的初始值和force相反,表示是否允许delay。如果force为true,allow_delay就是false。在Writer主流程中调用MakeRoomForWrite时,force一定为false。MakeRoomForWrite的主要逻辑都在while循环中,为了方便表述,我们把循环内部的起始位置定为Step0。

  • Step1:检查后台线程是否发生错误,如果发生错误,直接返回错误Status。
  • Step2:如果允许delay,且L0层文件数量超过慢写阈值(默认为8),就等待1ms,然后把allow_delay置为false,回到Step0。allow_delay在下次执行时就是false了,所以慢写最多执行一次,避免Write主流程被阻塞太久。
  • Step3:如果Memtable未写满(空间占用没有超过阈值),说明当前Memtable是有空间写入的。如果force为false,即不强制转存Memtable,那么结束循环,继续使用当前的Memtable。
  • Step4:能走到这一步,要么force为true,要么当前Memtable已经写满了。都需要把Memtable转为Immutable Memtable。所以检查imm_是否为null,不为null的话,就要等待后台程序先把当前Immutable Memtable落盘(落盘后imm_会被置为null)。所以要在条件变量background_work_finished_signal_上Wait。被条件变量唤醒后将回到Step0重新执行。重新执行后会再来到Step4,如果imm_为空了,就进入Step5进行判断,如果imm_不为空,那么继续Wait。
  • Step5:能走到这里说明需要转存Memtable,并且Immutable Memtable已经落盘了(imm_为null)。判断L0层文件数量是否超过停写阈值(默认为12),如果超过了,就停止写入,并在background_work_finished_signal_上Wait,被唤醒后回到Step0。因为读操作需要检查L0层的所有文件,所以L0层文件数量过多会降低读的速度。Step2和Step5的目的都是为了避免L0层的文件数量过多,所以要减慢或者停止写入,给后台程序足够的时间去完成文件合并,不然L0层文件数量就会无限膨胀。
  • Step6:走到这一步,说明需要转存Memtable,且imm_为null。那么就把Memtable转存为Immutable Memtable,然后创建一个新的Memtable。每个Memtable都有一个WAL日志文件,所以会为这个新的Memtable创建新的WAL日志文件。MaybeScheduleCompaction尝试调度后台线程。imm_落盘和SST文件合并都是由后台线程完成的。

MaybeScheduleCompaction

void DBImpl::MaybeScheduleCompaction()
{
  mutex_.AssertHeld();
  if( background_compaction_scheduled_ )
  {
    // Already scheduled
  }else if( shutting_down_.load(std::memory_order_acquire))
  {
    // DB is being deleted; no more background compactions
  }else if( !bg_error_.ok())
  {
    // Already got an error; no more changes
  }else if( imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction())
  {
    // No work to be done
  }else
  {
    background_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);
  }
}

先检查线程是否已经被调度了,如果已经被调度了,就直接退出。如果DB已经被关闭,那么就不调度了。如果后台线程出错,也不调度。

前面几个都是常规检查,第三个判断是关键逻辑。只有三个子条件都会true,才会直接返回,只要任意一个子条件为false,就会走到else语句调度后台线程。即:

  • 如果imm_不为空,调度
  • 如果设置了手动合并,调度
  • 如果版本系统认为需要合并,调度。
void DBImpl::BGWork(void* db)
{
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

任务队列池存储的是BGWork函数指针,参数是DBImpl对象。执行任务也就是执行BGWork,而BGWork实际执行的是BackgroundCall。

BackgroundCall

void DBImpl::BackgroundCall()
{
  MutexLock l(&mutex_);
  assert(background_compaction_scheduled_);
  if( shutting_down_.load(std::memory_order_acquire))
  {
    // No more background work when shutting down.
  }else if( !bg_error_.ok())
  {
    // No more background work after a background error.
  }else
  {
    BackgroundCompaction();
  }

  background_compaction_scheduled_ = false;

  // Previous compaction may have produced too many files in a level,
  // so reschedule another compaction if needed.
  MaybeScheduleCompaction();
  background_work_finished_signal_.SignalAll();
}

BackgroundCall先判断DB是否被关闭,后台线程有无错误,如果没有问题,就调用BackgroundCompaction完成后台合并(详见LevelDB源码解析(13) BackgroundCompaction SST文件合并),完成BackgroundCompaction后把background_compaction_scheduled_置为false,允许其他线程发起新的调度。然后再调用MaybeScheduleCompaction检查是否需要再次调度。最后唤醒等待条件变量的线程。比如在MakeRoomForWrite中Wait的线程。

后台线程是不存在并发的,同一时刻只会有一个后台线程在执行。后台线程和Write线程存在并发竞争,所以在关键区域要使用成员变量mutex_加锁。