Skip to content

Commit

Permalink
fix: Search for pk using raw data to reduce the overhead caused by vi…
Browse files Browse the repository at this point in the history
…ews (#37202)

issue: #37152

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Nov 5, 2024
1 parent 0645d46 commit 625b617
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 207 deletions.
37 changes: 35 additions & 2 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,46 @@ class StringChunk : public Chunk {
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}

std::string_view
operator[](const int i) const {
if (i < 0 || i > row_nums_) {
PanicInfo(ErrorCode::OutOfRange, "index out of range");
}

return {data_ + offsets_[i], offsets_[i + 1] - offsets_[i]};
}

std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews();

int
binary_search_string(std::string_view target) {
// only supported sorted pk
int left = 0;
int right = row_nums_ - 1; // `right` should be num_rows_ - 1
int result =
-1; // Initialize result to store the first occurrence index

while (left <= right) {
int mid = left + (right - left) / 2;
std::string_view midString = (*this)[mid];
if (midString == target) {
result = mid; // Store the index of match
right = mid - 1; // Continue searching in the left half
} else if (midString < target) {
// midString < target
left = mid + 1;
} else {
// midString > target
right = mid - 1;
}
}
return result;
}

const char*
ValueAt(int64_t idx) const override {
PanicInfo(ErrorCode::Unsupported,
"StringChunk::ValueAt is not supported");
return (*this)[idx].data();
}

uint64_t*
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/mmap/ChunkedColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
->StringViews();
}

std::shared_ptr<Chunk>
GetChunk(int64_t chunk_id) const {
return chunks_[chunk_id];
}

BufferView
GetBatchBuffer(int64_t start_offset, int64_t length) override {
if (start_offset < 0 || start_offset > num_rows_ ||
Expand Down
24 changes: 24 additions & 0 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,30 @@ class SingleChunkVariableColumn : public SingleChunkColumnBase {
return ViewType(pos + sizeof(uint32_t), size);
}

int
binary_search_string(std::string_view target) {
int left = 0;
int right = num_rows_ - 1; // `right` should be num_rows_ - 1
int result =
-1; // Initialize result to store the first occurrence index

while (left <= right) {
int mid = left + (right - left) / 2;
std::string_view midString = this->RawAt(mid);
if (midString == target) {
result = mid; // Store the index of match
right = mid - 1; // Continue searching in the left half
} else if (midString < target) {
// midString < target
left = mid + 1;
} else {
// midString > target
right = mid - 1;
}
}
return result;
}

std::string_view
RawAt(const int i) const {
return std::string_view((*this)[i]);
Expand Down
182 changes: 40 additions & 142 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,16 +863,17 @@ ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

if (!is_sorted_by_pk_) {
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp);
} else {
bitmap_holder = get_deleted_bitmap_s(
del_barrier, ins_barrier, deleted_record_, timestamp);
}
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
Expand Down Expand Up @@ -1182,72 +1183,34 @@ ChunkedSegmentSealedImpl::check_search(const query::Plan* plan) const {
std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
Timestamp timestamp) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;
switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
// get int64 pks
auto num_chunk = pk_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto src = reinterpret_cast<const int64_t*>(pk_column->Data(i));
auto chunk_row_num = pk_column->chunk_row_nums(i);
auto it = std::lower_bound(
src,
src + chunk_row_num,
target,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + chunk_row_num && *it == target; it++) {
auto offset = it - src;
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(offset);
}
}
}
break;
}
case DataType::VARCHAR: {
auto target = std::get<std::string>(pk);
// get varchar pks
auto var_column =
std::dynamic_pointer_cast<ChunkedVariableColumn<std::string>>(
pk_column);
auto num_chunk = var_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto views = var_column->StringViews(i).first;
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
auto offset = std::distance(views.begin(), it);
if (insert_record_.timestamps_[offset] <= timestamp) {
pk_offsets.emplace_back(offset);
}
}
}
break;
}
default: {
PanicInfo(
DataTypeInvalid,
fmt::format(
"unsupported type {}",
schema_->get_fields().at(pk_field_id).get_data_type()));
}
if (!is_sorted_by_pk_) {
return insert_record_.search_pk(pk, timestamp);
}

return pk_offsets;
return search_sorted_pk(pk, [this, timestamp](int64_t offset) {
return insert_record_.timestamps_[offset] <= timestamp;
});
}

std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
int64_t insert_barrier) const {
if (!is_sorted_by_pk_) {
return insert_record_.search_pk(pk, insert_barrier);
}
return search_sorted_pk(pk, [insert_barrier](int64_t offset) {
return offset < insert_barrier;
});
}

template <typename Condition>
std::vector<SegOffset>
ChunkedSegmentSealedImpl::search_sorted_pk(const PkType& pk,
Condition condition) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id);
std::vector<SegOffset> pk_offsets;

switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: {
auto target = std::get<int64_t>(pk);
Expand All @@ -1264,9 +1227,10 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
[](const int64_t& elem, const int64_t& value) {
return elem < value;
});
for (; it != src + chunk_row_num && *it == target; it++) {
for (; it != src + pk_column->NumRows() && *it == target;
++it) {
auto offset = it - src;
if (offset < insert_barrier) {
if (condition(offset)) {
pk_offsets.emplace_back(offset);
}
}
Expand All @@ -1283,11 +1247,14 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,

auto num_chunk = var_column->num_chunks();
for (int i = 0; i < num_chunk; ++i) {
auto views = var_column->StringViews(i).first;
auto it = std::lower_bound(views.begin(), views.end(), target);
for (; it != views.end() && *it == target; it++) {
auto offset = std::distance(views.begin(), it);
if (offset < insert_barrier) {
// TODO @xiaocai2333, @sunby: chunk need to record the min/max.
auto string_chunk = std::dynamic_pointer_cast<StringChunk>(
var_column->GetChunk(i));
auto offset = string_chunk->binary_search_string(target);
for (; offset != -1 && offset < var_column->NumRows() &&
var_column->RawAt(offset) == target;
++offset) {
if (condition(offset)) {
pk_offsets.emplace_back(offset);
}
}
Expand All @@ -1306,75 +1273,6 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
return pk_offsets;
}

std::shared_ptr<DeletedRecord::TmpBitmap>
ChunkedSegmentSealedImpl::get_deleted_bitmap_s(
int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record_.timestamps_[offset.get()] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

std::pair<std::vector<OffsetMap::OffsetType>, bool>
ChunkedSegmentSealedImpl::find_first(int64_t limit,
const BitsetType& bitset) const {
Expand Down
8 changes: 3 additions & 5 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const;

std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap_s(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
Timestamp query_timestamp) const;
template <typename Condition>
std::vector<SegOffset>
search_sorted_pk(const PkType& pk, Condition condition) const;

std::unique_ptr<DataArray>
get_vector(FieldId field_id,
Expand Down
Loading

0 comments on commit 625b617

Please sign in to comment.