Skip to content

Commit

Permalink
enhance: record memory size (uncompressed) item for index
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 30, 2024
1 parent dd81a13 commit feb811a
Show file tree
Hide file tree
Showing 64 changed files with 1,499 additions and 1,323 deletions.
30 changes: 15 additions & 15 deletions cmd/tools/migration/meta/210_to_220.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,21 @@ func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexB
}

segmentIndexModel := &model.SegmentIndex{
SegmentID: segID,
CollectionID: record.GetCollectionID(),
PartitionID: record.GetPartitionID(),
NumRows: buildMeta.GetReq().GetNumRows(),
IndexID: indexID,
BuildID: record.GetBuildID(),
NodeID: buildMeta.GetNodeID(),
IndexVersion: buildMeta.GetIndexVersion(),
IndexState: buildMeta.GetState(),
FailReason: buildMeta.GetFailReason(),
IsDeleted: buildMeta.GetMarkDeleted(),
CreatedUTCTime: record.GetCreateTime(),
IndexFileKeys: fileKeys,
IndexSize: buildMeta.GetSerializeSize(),
WriteHandoff: buildMeta.GetState() == commonpb.IndexState_Finished,
SegmentID: segID,
CollectionID: record.GetCollectionID(),
PartitionID: record.GetPartitionID(),
NumRows: buildMeta.GetReq().GetNumRows(),
IndexID: indexID,
BuildID: record.GetBuildID(),
NodeID: buildMeta.GetNodeID(),
IndexVersion: buildMeta.GetIndexVersion(),
IndexState: buildMeta.GetState(),
FailReason: buildMeta.GetFailReason(),
IsDeleted: buildMeta.GetMarkDeleted(),
CreatedUTCTime: record.GetCreateTime(),
IndexFileKeys: fileKeys,
IndexSerializedSize: buildMeta.GetSerializeSize(),
WriteHandoff: buildMeta.GetState() == commonpb.IndexState_Finished,
}
segmentIndexModels.AddRecord(segID, indexID, segmentIndexModel)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/core/src/common/protobuf_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "common/protobuf_utils.h"
#include "common/protobuf_utils_c.h"

static_assert(
sizeof(milvus::ProtoLayout) == sizeof(ProtoLayout),
"Size of milvus::ProtoLayout is not equal to size of ProtoLayoutInterface");

static_assert(alignof(milvus::ProtoLayout) == alignof(ProtoLayout),
"Alignment of milvus::ProtoLayout is not equal to alignment of "
"ProtoLayoutInterface");

ProtoLayoutInterface
CreateProtoLayout() {
auto ptr = new milvus::ProtoLayout();
return reinterpret_cast<ProtoLayoutInterface>(ptr);
}

void
ReleaseProtoLayout(ProtoLayoutInterface proto) {
delete reinterpret_cast<milvus::ProtoLayout*>(proto);
}
33 changes: 33 additions & 0 deletions internal/core/src/common/protobuf_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,37 @@ RepeatedKeyValToMap(
}
return mapping;
}

class ProtoLayout;
using ProtoLayoutPtr = std::unique_ptr<ProtoLayout>;

class ProtoLayout {
public:
static ProtoLayoutPtr
CreateEmpty() {
return std::make_unique<ProtoLayout>();
}

ProtoLayout() : blob(nullptr), size(0) {
}

~ProtoLayout() {
if (blob != nullptr) {
delete[] static_cast<uint8_t*>(blob);
blob = nullptr;
}
}

void
Initialize(size_t initial_size) {
assert(size == 0);
auto new_blob = new uint8_t[initial_size];
this->blob = new_blob;
this->size = initial_size;
}

void* blob;
size_t size;
};

} //namespace milvus
22 changes: 22 additions & 0 deletions internal/core/src/common/protobuf_utils_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif

typedef struct ProtoLayout {
void* blob;
size_t size;
} ProtoLayout;

typedef ProtoLayout* ProtoLayoutInterface;

ProtoLayoutInterface
CreateProtoLayout();

void
ReleaseProtoLayout(ProtoLayoutInterface proto);

#ifdef __cplusplus
}

#endif
10 changes: 6 additions & 4 deletions internal/core/src/index/BitmapIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,20 @@ BitmapIndex<T>::Serialize(const Config& config) {
}

template <typename T>
BinarySet
CreateIndexResultPtr
BitmapIndex<T>::Upload(const Config& config) {
auto binary_set = Serialize(config);

file_manager_->AddFile(binary_set);

auto remote_path_to_size = file_manager_->GetRemotePathsToFileSize();
BinarySet ret;
std::vector<SerializedIndexFileInfo> index_files;
index_files.reserve(remote_path_to_size.size());
for (auto& file : remote_path_to_size) {
ret.Append(file.first, nullptr, file.second);
index_files.emplace_back(file.first, file.second);
}
return ret;
return std::make_unique<CreateIndexResult>(
file_manager_->GetAddedTotalMemSize(), std::move(index_files));
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/BitmapIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class BitmapIndex : public ScalarIndex<T> {
return Count();
}

BinarySet
CreateIndexResultPtr
Upload(const Config& config = {}) override;

const bool
Expand Down
53 changes: 53 additions & 0 deletions internal/core/src/index/CreateIndexResult.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "index/CreateIndexResult.h"

namespace milvus::index {
CreateIndexResult::CreateIndexResult(
int64_t mem_size,
std::vector<SerializedIndexFileInfo>&& serialized_index_infos)
: mem_size_(mem_size), serialized_index_infos_(serialized_index_infos) {
}

void
CreateIndexResult::AppendSerializedIndexFileInfo(
SerializedIndexFileInfo&& info) {
serialized_index_infos_.push_back(std::move(info));
}

void
CreateIndexResult::SerializeAt(milvus::ProtoLayout* layout) {
milvus::proto::cgo::CreateIndexResult result;
result.set_mem_size(mem_size_);
for (auto& info : serialized_index_infos_) {
auto serialized_info = result.add_serialized_index_infos();
serialized_info->set_file_name(info.file_name);
serialized_info->set_file_size(info.file_size);
}
layout->Initialize(result.ByteSizeLong());
AssertInfo(result.SerializeToArray(layout->blob, layout->size),
"unmarshal CreateIndexResult failed");
}

std::vector<std::string>
CreateIndexResult::GetIndexFiles() const {
std::vector<std::string> files;
for (auto& info : serialized_index_infos_) {
files.push_back(info.file_name);
}
return files;
}

int64_t
CreateIndexResult::GetMemSize() const {
return mem_size_;
}

int64_t
CreateIndexResult::GetSerializedSize() const {
int64_t size = 0;
for (auto& info : serialized_index_infos_) {
size += info.file_size;
}
return size;
}

} // namespace milvus::index
48 changes: 48 additions & 0 deletions internal/core/src/index/CreateIndexResult.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <string>
#include <vector>
#include "common/protobuf_utils.h"
#include "pb/cgo_msg.pb.h"

namespace milvus::index {

class SerializedIndexFileInfo {
public:
SerializedIndexFileInfo(const std::string& file_name, int64_t file_size)
: file_name(file_name), file_size(file_size) {
}

std::string file_name;
int64_t file_size;
};

class CreateIndexResult {
public:
CreateIndexResult(
int64_t mem_size,
std::vector<SerializedIndexFileInfo>&& serialized_index_infos);

void
AppendSerializedIndexFileInfo(SerializedIndexFileInfo&& info);

void
SerializeAt(milvus::ProtoLayout* layout);

std::vector<std::string>
GetIndexFiles() const;

int64_t
GetMemSize() const;

int64_t
GetSerializedSize() const;

private:
int64_t mem_size_;
std::vector<SerializedIndexFileInfo> serialized_index_infos_;
};

using CreateIndexResultPtr = std::unique_ptr<CreateIndexResult>;

} // namespace milvus::index
6 changes: 3 additions & 3 deletions internal/core/src/index/HybridScalarIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,17 @@ HybridScalarIndex<T>::SerializeIndexType() {
}

template <typename T>
BinarySet
CreateIndexResultPtr
HybridScalarIndex<T>::Upload(const Config& config) {
auto internal_index = GetInternalIndex();
auto index_ret = internal_index->Upload(config);

auto index_type_ret = SerializeIndexType();

for (auto& [key, value] : index_type_ret.binary_map_) {
index_ret.Append(key, value);
index_ret->AppendSerializedIndexFileInfo(
SerializedIndexFileInfo(key, value->size));
}

return index_ret;
}

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/HybridScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class HybridScalarIndex : public ScalarIndex<T> {
return internal_index_->HasRawData();
}

BinarySet
CreateIndexResultPtr
Upload(const Config& config = {}) override;

private:
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "index/Meta.h"
#include "index/CreateIndexResult.h"

namespace milvus::index {

Expand Down Expand Up @@ -57,7 +58,7 @@ class IndexBase {
virtual int64_t
Count() = 0;

virtual BinarySet
virtual CreateIndexResultPtr
Upload(const Config& config = {}) = 0;

virtual const bool
Expand Down
20 changes: 12 additions & 8 deletions internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ InvertedIndexTantivy<T>::Serialize(const Config& config) {
}

template <typename T>
BinarySet
CreateIndexResultPtr
InvertedIndexTantivy<T>::Upload(const Config& config) {
finish();

Expand All @@ -151,20 +151,24 @@ InvertedIndexTantivy<T>::Upload(const Config& config) {
}
}

BinarySet ret;

auto remote_paths_to_size = disk_file_manager_->GetRemotePathsToFileSize();
for (auto& file : remote_paths_to_size) {
ret.Append(file.first, nullptr, file.second);
}

auto binary_set = Serialize(config);
mem_file_manager_->AddFile(binary_set);
auto remote_mem_path_to_size =
mem_file_manager_->GetRemotePathsToFileSize();

std::vector<SerializedIndexFileInfo> index_files;
index_files.reserve(remote_paths_to_size.size() +
remote_mem_path_to_size.size());
for (auto& file : remote_paths_to_size) {
index_files.emplace_back(file.first, file.second);
}
for (auto& file : remote_mem_path_to_size) {
ret.Append(file.first, nullptr, file.second);
index_files.emplace_back(file.first, file.second);
}
return ret;
return std::make_unique<CreateIndexResult>(
mem_file_manager_->GetAddedTotalMemSize(), std::move(index_files));
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/InvertedIndexTantivy.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
BinarySet
Serialize(const Config& config) override;

BinarySet
CreateIndexResultPtr
Upload(const Config& config = {}) override;

/*
Expand Down
11 changes: 6 additions & 5 deletions internal/core/src/index/ScalarIndexSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,19 @@ ScalarIndexSort<T>::Serialize(const Config& config) {
}

template <typename T>
BinarySet
CreateIndexResultPtr
ScalarIndexSort<T>::Upload(const Config& config) {
auto binary_set = Serialize(config);
file_manager_->AddFile(binary_set);

auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize();
BinarySet ret;
std::vector<SerializedIndexFileInfo> index_files;
index_files.reserve(remote_paths_to_size.size());
for (auto& file : remote_paths_to_size) {
ret.Append(file.first, nullptr, file.second);
index_files.emplace_back(file.first, file.second);
}

return ret;
return std::make_unique<CreateIndexResult>(
file_manager_->GetAddedTotalMemSize(), std::move(index_files));
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/ScalarIndexSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ScalarIndexSort : public ScalarIndex<T> {
return (int64_t)data_.size();
}

BinarySet
CreateIndexResultPtr
Upload(const Config& config = {}) override;

const bool
Expand Down
Loading

0 comments on commit feb811a

Please sign in to comment.