Skip to content

Commit

Permalink
feat: Add HashStringAllocator::InputStream
Browse files Browse the repository at this point in the history
Summary:
When we get `ByteInputStream` from `HashStringAllocator`, we used to
have to materialize all the byte ranges in a vector, which is not efficient.
This change improve the efficiency by creating a `ByteInputStream` directly over
the linked list of a multi-part allocation.

Differential Revision: D69750088
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 17, 2025
1 parent 7b5b4d5 commit 2be69b0
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 119 deletions.
6 changes: 2 additions & 4 deletions velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ FileInputStream::~FileInputStream() {

void FileInputStream::readNextRange() {
VELOX_CHECK(current_ == nullptr || current_->availableBytes() == 0);
ranges_.clear();
current_ = nullptr;

int32_t readBytes{0};
Expand All @@ -77,9 +76,8 @@ void FileInputStream::readNextRange() {
}
}

ranges_.resize(1);
ranges_[0] = {buffer()->asMutable<uint8_t>(), readBytes, 0};
current_ = ranges_.data();
range_ = {buffer()->asMutable<uint8_t>(), readBytes, 0};
current_ = &range_;
fileOffset_ += readBytes;

updateStats(readBytes, readTimeNs);
Expand Down
2 changes: 2 additions & 0 deletions velox/common/file/FileInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class FileInputStream : public ByteInputStream {
folly::SemiFuture<uint64_t> readAheadWait_{
folly::SemiFuture<uint64_t>::makeEmpty()};

ByteRange range_;

Stats stats_;
};
} // namespace facebook::velox::common
18 changes: 5 additions & 13 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,9 @@ class ByteInputStream {
current_->position += sizeof(T);
return folly::loadUnaligned<T>(source);
}
// The number straddles two buffers. We read byte by byte and make a
// little-endian uint64_t. The bytes can be cast to any integer or floating
// point type since the wire format has the machine byte order.
static_assert(sizeof(T) <= sizeof(uint64_t));
union {
uint64_t bits;
T typed;
} value{};
for (int32_t i = 0; i < sizeof(T); ++i) {
value.bits |= static_cast<uint64_t>(readByte()) << (i * 8);
}
return value.typed;
T value;
readBytes(&value, sizeof(T));
return value;
}

template <typename Char>
Expand All @@ -222,7 +213,6 @@ class ByteInputStream {
protected:
// Points to the current buffered byte range.
ByteRange* current_{nullptr};
std::vector<ByteRange> ranges_;
};

/// Read-only input stream backed by a set of buffers.
Expand Down Expand Up @@ -268,6 +258,8 @@ class BufferInputStream : public ByteInputStream {
const std::vector<ByteRange>& ranges() const {
return ranges_;
}

std::vector<ByteRange> ranges_;
};

template <>
Expand Down
29 changes: 2 additions & 27 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,31 +165,6 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) {
pool()->free(ptr, size);
}

// static
std::unique_ptr<ByteInputStream> HashStringAllocator::prepareRead(
const Header* begin,
size_t maxBytes) {
std::vector<ByteRange> ranges;
auto* header = const_cast<Header*>(begin);

size_t totalBytes{0};
for (;;) {
ranges.push_back(ByteRange{
reinterpret_cast<uint8_t*>(header->begin()), header->usableSize(), 0});
totalBytes += ranges.back().size;
if (!header->isContinued()) {
break;
}

if (totalBytes >= maxBytes) {
break;
}

header = header->nextContinued();
}
return std::make_unique<BufferInputStream>(std::move(ranges));
}

HashStringAllocator::Position HashStringAllocator::newWrite(
ByteOutputStream& stream,
int32_t preferredSize) {
Expand Down Expand Up @@ -363,9 +338,9 @@ StringView HashStringAllocator::contiguousString(
return view;
}

auto stream = prepareRead(headerOf(view.data()));
InputStream stream(headerOf(view.data()));
storage.resize(view.size());
stream->readBytes(storage.data(), view.size());
stream.ByteInputStream::readBytes(storage.data(), view.size());
return StringView(storage);
}

Expand Down
158 changes: 151 additions & 7 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,7 @@ class HashStringAllocator : public StreamArena {
return header->size() + kHeaderSize;
}

/// Returns ByteInputStream over the data in the range of 'header' and
/// possible continuation ranges.
/// @param maxBytes If provided, the returned stream will cover at most that
/// many bytes.
static std::unique_ptr<ByteInputStream> prepareRead(
const Header* header,
size_t maxBytes = std::numeric_limits<size_t>::max());
class InputStream;

/// Returns the number of payload bytes between 'header->begin()' and
/// 'position'.
Expand Down Expand Up @@ -516,6 +510,156 @@ class HashStringAllocator : public StreamArena {
State state_;
};

/// A ByteInputStream over the data in the range of begin and possible
/// continuation ranges.
class HashStringAllocator::InputStream : public ByteInputStream {
public:
explicit InputStream(const Header* begin)
: begin_(const_cast<Header*>(begin)), header_(begin_) {
resetRange();
current_ = &range_;
}

InputStream(const InputStream& other) {
*this = other;
}

InputStream& operator=(const InputStream& other) {
begin_ = other.begin_;
header_ = other.header_;
range_ = other.range_;
current_ = &range_;
return *this;
}

size_t size() const final {
auto* header = begin_;
size_t total = 0;
for (;;) {
total += header->usableSize();
if (!header->isContinued()) {
break;
}
header = header->nextContinued();
}
return total;
}

bool atEnd() const final {
return range_.position == range_.size && !header_->isContinued();
}

std::streampos tellp() const final {
auto* header = begin_;
int64_t pos = 0;
while (header != header_) {
pos += header->usableSize();
header = header->nextContinued();
}
return pos + range_.position;
}

void seekp(std::streampos pos) final {
header_ = begin_;
resetRange();
skipImpl(pos);
}

void skip(int32_t size) final {
nextRange();
skipImpl(size);
}

size_t remainingSize() const final {
return size() - tellp();
}

uint8_t readByte() final {
uint8_t byte;
readBytes(&byte, 1);
return byte;
}

void readBytes(uint8_t* bytes, int32_t size) final {
nextRange();
for (;;) {
auto available = range_.size - range_.position;
if (size <= available) {
std::memcpy(bytes, range_.buffer + range_.position, size);
range_.position += size;
return;
}
std::memcpy(bytes, range_.buffer + range_.position, available);
bytes += available;
size -= available;
if (!header_->isContinued()) {
VELOX_CHECK_EQ(size, 0, "Reading past end of stream");
range_.position = range_.size;
return;
}
header_ = header_->nextContinued();
resetRange();
}
}

std::string_view nextView(int32_t size) final {
if (atEnd()) {
return {};
}
nextRange();
size = std::min(size, range_.size - range_.position);
std::string_view result(
reinterpret_cast<char*>(range_.buffer) + range_.position, size);
range_.position += size;
return result;
}

std::string toString() const final {
return fmt::format(
"HashStringAllocator::InputStream: begin_={} header_={} range_={}",
begin_->toString(),
header_->toString(),
range_.toString());
}

private:
void resetRange() {
VELOX_DCHECK_GT(header_->usableSize(), 0);
range_.buffer = reinterpret_cast<uint8_t*>(header_->begin());
range_.size = header_->usableSize();
range_.position = 0;
}

void nextRange() {
if (range_.position == range_.size && header_->isContinued()) {
header_ = header_->nextContinued();
resetRange();
}
}

void skipImpl(int64_t size) {
for (;;) {
auto available = range_.size - range_.position;
if (size <= available) {
range_.position += size;
return;
}
size -= available;
if (!header_->isContinued()) {
VELOX_CHECK_EQ(size, 0, "Seeking past end of stream");
range_.position = range_.size;
return;
}
header_ = header_->nextContinued();
resetRange();
}
}

Header* begin_;
Header* header_;
ByteRange range_;
};

/// Utility for keeping track of allocation between two points in time. A
/// counter on a row supplied at construction is incremented by the change in
/// allocation between construction and destruction. This is a scoped guard to
Expand Down
68 changes: 59 additions & 9 deletions velox/common/memory/tests/HashStringAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,15 @@ TEST_F(HashStringAllocatorTest, finishWrite) {
replaceStart.offset() + 4);

// Read back long and short strings.
auto inputStream = HSA::prepareRead(longStart.header);
HSA::InputStream inputStream(longStart.header);

std::string copy;
copy.resize(longString.size());
inputStream->readBytes(copy.data(), copy.size());
inputStream.ByteInputStream::readBytes(copy.data(), copy.size());
ASSERT_EQ(copy, longString);

copy.resize(4);
inputStream->readBytes(copy.data(), 4);
inputStream.ByteInputStream::readBytes(copy.data(), 4);
ASSERT_EQ(copy, "abcd");

auto allocatedBytes = allocator_->checkConsistency();
Expand All @@ -277,10 +277,10 @@ TEST_F(HashStringAllocatorTest, finishWrite) {
stream.appendStringView(largeString);
allocator_->finishWrite(stream, 0);

auto inStream = HSA::prepareRead(start.header);
HSA::InputStream inStream(start.header);
std::string copy;
copy.resize(largeString.size());
inStream->readBytes(copy.data(), copy.size());
inStream.ByteInputStream::readBytes(copy.data(), copy.size());
ASSERT_EQ(copy, largeString);
allocatedBytes = allocator_->checkConsistency();
ASSERT_EQ(allocatedBytes, allocator_->currentBytes());
Expand Down Expand Up @@ -396,10 +396,10 @@ TEST_F(HashStringAllocatorTest, rewrite) {
stream.appendOne(67890LL);
position = allocator_->finishWrite(stream, 0).second;
EXPECT_EQ(3 * sizeof(int64_t), HSA::offset(header, position));
auto inStream = HSA::prepareRead(header);
EXPECT_EQ(123456789012345LL, inStream->read<int64_t>());
EXPECT_EQ(12345LL, inStream->read<int64_t>());
EXPECT_EQ(67890LL, inStream->read<int64_t>());
HSA::InputStream inStream(header);
EXPECT_EQ(123456789012345LL, inStream.read<int64_t>());
EXPECT_EQ(12345LL, inStream.read<int64_t>());
EXPECT_EQ(67890LL, inStream.read<int64_t>());
}
// The stream contains 3 int64_t's.
auto end = HSA::seek(header, 3 * sizeof(int64_t));
Expand Down Expand Up @@ -765,5 +765,55 @@ TEST_F(HashStringAllocatorTest, freezeAndExecute) {
// mutable.
allocator_->freezeAndExecute([&]() { allocator_->currentBytes(); });
}

TEST_F(HashStringAllocatorTest, inputStream) {
std::string expected;
ByteOutputStream out(allocator_.get());
auto start = allocator_->newWrite(out, 1);
out.appendStringView(std::string_view("a"));
expected += "a";
auto last = allocator_->finishWrite(out, 0).second;
for (int i = 1; i < 10; ++i) {
allocator_->extendWrite(last, out);
std::string data(i + 1, 'a' + i);
out.appendStringView(data);
expected += data;
last = allocator_->finishWrite(out, 0).second;
}
ASSERT_TRUE(start.header->isContinued());
HSA::InputStream in(start.header);
ASSERT_GE(in.size(), out.size());
ASSERT_EQ(in.tellp(), 0);
ASSERT_FALSE(in.atEnd());
for (int i = 10, j = 0; i >= 1; --i) {
if (i % 2 == 0) {
char actual[10];
in.ByteInputStream::readBytes(actual, i);
ASSERT_LE(j + i, expected.size());
ASSERT_EQ(
std::string_view(actual, i),
std::string_view(expected.data() + j, i));
} else {
in.skip(i);
}
j += i;
}
ASSERT_EQ(in.tellp(), 55);
ASSERT_EQ(in.size(), 55 + in.remainingSize());
in.seekp(5);
ASSERT_EQ(in.tellp(), 5);
ASSERT_FALSE(in.atEnd());
for (int j = 5; j < expected.size();) {
auto actual = in.nextView(5);
auto size = std::min(actual.size(), expected.size() - j);
ASSERT_EQ(
actual.substr(0, size), std::string_view(expected.data() + j, size));
j += size;
}
in.skip(in.remainingSize());
ASSERT_TRUE(in.atEnd());
ASSERT_EQ(in.tellp(), in.size());
}

} // namespace
} // namespace facebook::velox
Loading

0 comments on commit 2be69b0

Please sign in to comment.