Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce unnecessary MutableCFOptions copies and parameters #13301

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1160,12 +1160,13 @@ MemTable* ColumnFamilyData::ConstructNewMemtable(
write_buffer_manager_, earliest_seq, id_);
}

void ColumnFamilyData::CreateNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
void ColumnFamilyData::CreateNewMemtable(SequenceNumber earliest_seq) {
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
// NOTE: db mutex must be locked for SetMemtable, so safe for
// GetLatestMutableCFOptions
SetMemtable(ConstructNewMemtable(GetLatestMutableCFOptions(), earliest_seq));
mem_->Ref();
}

Expand Down
5 changes: 3 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,11 @@ class ColumnFamilyData {
uint64_t OldestLogToKeep();

// See Memtable constructor for explanation of earliest_seq param.
// `mutable_cf_options` might need to be a saved copy if calling this without
// holding the DB mutex.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
void CreateNewMemtable(SequenceNumber earliest_seq);

TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
Expand Down
18 changes: 8 additions & 10 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,7 @@ Status CompactionJob::Run() {
return status;
}

Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released) {
Status CompactionJob::Install(bool* compaction_released) {
assert(compact_);

AutoThreadOperationStageUpdater stage_updater(
Expand All @@ -919,7 +918,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
compaction_stats_);

if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options, compaction_released);
status = InstallCompactionResults(compaction_released);
}
if (!versions_->io_status().ok()) {
io_status_ = versions_->io_status();
Expand Down Expand Up @@ -1800,8 +1799,7 @@ Status CompactionJob::FinishCompactionOutputFile(
return s;
}

Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options, bool* compaction_released) {
Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
assert(compact_);

db_mutex_->AssertHeld();
Expand Down Expand Up @@ -1890,11 +1888,11 @@ Status CompactionJob::InstallCompactionResults(
*compaction_released = true;
};

return versions_->LogAndApply(
compaction->column_family_data(), mutable_cf_options, read_options,
write_options, edit, db_mutex_, db_directory_,
/*new_descriptor_log=*/false,
/*column_family_options=*/nullptr, manifest_wcb);
return versions_->LogAndApply(compaction->column_family_data(), read_options,
write_options, edit, db_mutex_, db_directory_,
/*new_descriptor_log=*/false,
/*column_family_options=*/nullptr,
manifest_wcb);
}

void CompactionJob::RecordCompactionIOStats() {
Expand Down
6 changes: 2 additions & 4 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ class CompactionJob {
// Add compaction input/output to the current version
// Releases compaction file through Compaction::ReleaseCompactionFiles().
// Sets *compaction_released to true if compaction is released.
Status Install(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
Status Install(bool* compaction_released);

// Return the IO status
IOStatus io_status() const { return io_status_; }
Expand Down Expand Up @@ -282,8 +281,7 @@ class CompactionJob {
const Slice& next_table_min_key,
const Slice* comp_start_user_key,
const Slice* comp_end_user_key);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
bool* compaction_released);
Status InstallCompactionResults(bool* compaction_released);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs);

Expand Down
7 changes: 3 additions & 4 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ class CompactionJobTestBase : public testing::Test {

mutex_.Lock();
EXPECT_OK(versions_->LogAndApply(
versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
read_options_, write_options_, &edit, &mutex_, nullptr));
versions_->GetColumnFamilySet()->GetDefault(), read_options_,
write_options_, &edit, &mutex_, nullptr));
mutex_.Unlock();
}

Expand Down Expand Up @@ -684,8 +684,7 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_OK(compaction_job.io_status());
mutex_.Lock();
bool compaction_released = false;
ASSERT_OK(compaction_job.Install(cfd->GetLatestMutableCFOptions(),
&compaction_released));
ASSERT_OK(compaction_job.Install(&compaction_released));
ASSERT_OK(compaction_job.io_status());
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Expand Down
63 changes: 25 additions & 38 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
s = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
Expand Down Expand Up @@ -1290,7 +1289,7 @@ Status DBImpl::SetOptions(
}

InstrumentedMutexLock ol(&options_mutex_);
MutableCFOptions new_options_copy;
MutableCFOptions new_options_copy; // For logging outside of DB mutex
Status s;
Status persist_options_status;
SuperVersionContext sv_context(/* create_superversion */ true);
Expand All @@ -1302,9 +1301,8 @@ Status DBImpl::SetOptions(
new_options_copy = cfd->GetLatestMutableCFOptions();
// Append new version to recompute compaction score.
VersionEdit dummy_edit;
s = versions_->LogAndApply(cfd, new_options_copy, read_options,
write_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (!versions_->io_status().ok()) {
assert(!s.ok());
error_handler_.SetBGError(versions_->io_status(),
Expand Down Expand Up @@ -3652,9 +3650,9 @@ Status DBImpl::CreateColumnFamilyImpl(const ReadOptions& read_options,
write_thread_.EnterUnbatched(&w, &mutex_);
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir(), false, &cf_options);
s = versions_->LogAndApply(nullptr, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
Expand Down Expand Up @@ -3762,9 +3760,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
// we drop column family from a single write thread
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
Expand Down Expand Up @@ -4969,9 +4966,8 @@ Status DBImpl::DEPRECATED_DeleteFile(std::string name) {
}
edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, job_context.superversion_contexts.data());
Expand Down Expand Up @@ -5081,9 +5077,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
return status;
}
input_version->Ref();
status = versions_->LogAndApply(cfd, cfd->GetLatestMutableCFOptions(),
read_options, write_options, &edit, &mutex_,
directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, job_context.superversion_contexts.data());
Expand Down Expand Up @@ -6129,14 +6124,12 @@ Status DBImpl::IngestExternalFiles(
ReadOptions read_options;
read_options.fill_cache = args[0].options.fill_cache;
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
cfds_to_commit.push_back(cfd);
mutable_cf_options_list.push_back(&cfd->GetLatestMutableCFOptions());
autovector<VersionEdit*> edit_list;
edit_list.push_back(ingestion_jobs[i].edit());
edit_lists.push_back(edit_list);
Expand All @@ -6151,10 +6144,10 @@ Status DBImpl::IngestExternalFiles(
}
assert(0 == num_entries);
}
status = versions_->LogAndApply(
cfds_to_commit, mutable_cf_options_list, read_options, write_options,
status =
versions_->LogAndApply(cfds_to_commit, read_options, write_options,

edit_lists, &mutex_, directories_.GetDbDir());
edit_lists, &mutex_, directories_.GetDbDir());
// It is safe to update VersionSet last seqno here after LogAndApply since
// LogAndApply persists last sequence number from VersionEdits,
// which are from file's largest seqno and not from VersionSet.
Expand Down Expand Up @@ -6305,11 +6298,9 @@ Status DBImpl::CreateColumnFamilyWithImport(
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number = versions_->FetchAddFileNumber(total_file_num);
MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, mutable_cf_options_copy,
read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
status =
versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx);
}
Expand Down Expand Up @@ -6344,11 +6335,9 @@ Status DBImpl::CreateColumnFamilyWithImport(

// Install job edit [Mutex will be unlocked here]
if (status.ok()) {
MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(
cfd, mutable_cf_options_copy, read_options, write_options,
import_job.edit(), &mutex_, directories_.GetDbDir());
status = versions_->LogAndApply(cfd, read_options, write_options,
import_job.edit(), &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context);
}
Expand Down Expand Up @@ -6768,15 +6757,13 @@ Status DBImpl::ReserveFileNumbersBeforeIngestion(
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
MutableCFOptions mutable_cf_options_copy = cfd->GetLatestMutableCFOptions();
VersionEdit dummy_edit;
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
s = versions_->LogAndApply(cfd, mutable_cf_options_copy, read_options,
write_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
s = versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx);
}
Expand Down
2 changes: 0 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,6 @@ class DBImpl : public DB {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(&cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
Expand All @@ -1460,7 +1459,6 @@ class DBImpl : public DB {

std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// All existing data files (SST files and Blob files) found during DB::Open.
std::vector<std::string> existing_data_files_;
Expand Down
Loading
Loading