Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add HashStringAllocator::InputStream #12364

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ FileInputStream::~FileInputStream() {

void FileInputStream::readNextRange() {
VELOX_CHECK(current_ == nullptr || current_->availableBytes() == 0);
ranges_.clear();
current_ = nullptr;

int32_t readBytes{0};
Expand All @@ -77,9 +76,8 @@ void FileInputStream::readNextRange() {
}
}

ranges_.resize(1);
ranges_[0] = {buffer()->asMutable<uint8_t>(), readBytes, 0};
current_ = ranges_.data();
range_ = {buffer()->asMutable<uint8_t>(), readBytes, 0};
current_ = &range_;
fileOffset_ += readBytes;

updateStats(readBytes, readTimeNs);
Expand Down
2 changes: 2 additions & 0 deletions velox/common/file/FileInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class FileInputStream : public ByteInputStream {
folly::SemiFuture<uint64_t> readAheadWait_{
folly::SemiFuture<uint64_t>::makeEmpty()};

ByteRange range_;

Stats stats_;
};
} // namespace facebook::velox::common
18 changes: 5 additions & 13 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,9 @@ class ByteInputStream {
current_->position += sizeof(T);
return folly::loadUnaligned<T>(source);
}
// The number straddles two buffers. We read byte by byte and make a
// little-endian uint64_t. The bytes can be cast to any integer or floating
// point type since the wire format has the machine byte order.
static_assert(sizeof(T) <= sizeof(uint64_t));
union {
uint64_t bits;
T typed;
} value{};
for (int32_t i = 0; i < sizeof(T); ++i) {
value.bits |= static_cast<uint64_t>(readByte()) << (i * 8);
}
return value.typed;
T value;
readBytes(&value, sizeof(T));
return value;
}

template <typename Char>
Expand All @@ -222,7 +213,6 @@ class ByteInputStream {
protected:
// Points to the current buffered byte range.
ByteRange* current_{nullptr};
std::vector<ByteRange> ranges_;
};

/// Read-only input stream backed by a set of buffers.
Expand Down Expand Up @@ -268,6 +258,8 @@ class BufferInputStream : public ByteInputStream {
const std::vector<ByteRange>& ranges() const {
return ranges_;
}

std::vector<ByteRange> ranges_;
};

template <>
Expand Down
29 changes: 2 additions & 27 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,31 +165,6 @@ void HashStringAllocator::freeToPool(void* ptr, size_t size) {
pool()->free(ptr, size);
}

// static
std::unique_ptr<ByteInputStream> HashStringAllocator::prepareRead(
const Header* begin,
size_t maxBytes) {
std::vector<ByteRange> ranges;
auto* header = const_cast<Header*>(begin);

size_t totalBytes{0};
for (;;) {
ranges.push_back(ByteRange{
reinterpret_cast<uint8_t*>(header->begin()), header->usableSize(), 0});
totalBytes += ranges.back().size;
if (!header->isContinued()) {
break;
}

if (totalBytes >= maxBytes) {
break;
}

header = header->nextContinued();
}
return std::make_unique<BufferInputStream>(std::move(ranges));
}

HashStringAllocator::Position HashStringAllocator::newWrite(
ByteOutputStream& stream,
int32_t preferredSize) {
Expand Down Expand Up @@ -363,9 +338,9 @@ StringView HashStringAllocator::contiguousString(
return view;
}

auto stream = prepareRead(headerOf(view.data()));
InputStream stream(headerOf(view.data()));
storage.resize(view.size());
stream->readBytes(storage.data(), view.size());
stream.ByteInputStream::readBytes(storage.data(), view.size());
return StringView(storage);
}

Expand Down
150 changes: 143 additions & 7 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,7 @@ class HashStringAllocator : public StreamArena {
return header->size() + kHeaderSize;
}

/// Returns ByteInputStream over the data in the range of 'header' and
/// possible continuation ranges.
/// @param maxBytes If provided, the returned stream will cover at most that
/// many bytes.
static std::unique_ptr<ByteInputStream> prepareRead(
const Header* header,
size_t maxBytes = std::numeric_limits<size_t>::max());
class InputStream;

/// Returns the number of payload bytes between 'header->begin()' and
/// 'position'.
Expand Down Expand Up @@ -516,6 +510,148 @@ class HashStringAllocator : public StreamArena {
State state_;
};

/// A ByteInputStream over the data in the range of begin and possible
/// continuation ranges.
class HashStringAllocator::InputStream : public ByteInputStream {
public:
explicit InputStream(const Header* begin)
: begin_(const_cast<Header*>(begin)), header_(begin_) {
resetRange();
current_ = &range_;
}

InputStream(const InputStream& other) {
*this = other;
}

InputStream& operator=(const InputStream& other) {
begin_ = other.begin_;
header_ = other.header_;
range_ = other.range_;
current_ = &range_;
return *this;
}

size_t size() const final {
auto* header = begin_;
size_t total = 0;
for (;;) {
total += header->usableSize();
if (!header->isContinued()) {
break;
}
header = header->nextContinued();
}
return total;
}

bool atEnd() const final {
return range_.position == range_.size && !header_->isContinued();
}

std::streampos tellp() const final {
auto* header = begin_;
int64_t pos = 0;
while (header != header_) {
pos += header->usableSize();
header = header->nextContinued();
}
return pos + range_.position;
}

void seekp(std::streampos pos) final {
header_ = begin_;
resetRange();
skipImpl(pos);
}

void skip(int32_t size) final {
advanceRangeIfNeed();
skipImpl(size);
}

size_t remainingSize() const final {
return size() - tellp();
}

uint8_t readByte() final {
uint8_t byte;
readBytes(&byte, 1);
return byte;
}

void readBytes(uint8_t* bytes, int32_t size) final {
advanceRangeIfNeed();
for (;;) {
auto available = range_.size - range_.position;
if (size <= available) {
std::memcpy(bytes, range_.buffer + range_.position, size);
range_.position += size;
return;
}
std::memcpy(bytes, range_.buffer + range_.position, available);
bytes += available;
size -= available;
VELOX_CHECK(header_->isContinued(), "Reading past end of stream");
header_ = header_->nextContinued();
resetRange();
}
}

std::string_view nextView(int32_t size) final {
if (atEnd()) {
return {};
}
advanceRangeIfNeed();
size = std::min(size, range_.size - range_.position);
std::string_view result(
reinterpret_cast<char*>(range_.buffer) + range_.position, size);
range_.position += size;
return result;
}

std::string toString() const final {
return fmt::format(
"HashStringAllocator::InputStream: begin_={} header_={} range_={}",
begin_->toString(),
header_->toString(),
range_.toString());
}

private:
void resetRange() {
VELOX_DCHECK_GT(header_->usableSize(), 0);
range_.buffer = reinterpret_cast<uint8_t*>(header_->begin());
range_.size = header_->usableSize();
range_.position = 0;
}

void advanceRangeIfNeed() {
if (range_.position == range_.size && header_->isContinued()) {
header_ = header_->nextContinued();
resetRange();
}
}

void skipImpl(int64_t size) {
for (;;) {
auto available = range_.size - range_.position;
if (size <= available) {
range_.position += size;
return;
}
size -= available;
VELOX_CHECK(header_->isContinued(), "Seeking past end of stream");
header_ = header_->nextContinued();
resetRange();
}
}

Header* begin_;
Header* header_;
ByteRange range_;
};

/// Utility for keeping track of allocation between two points in time. A
/// counter on a row supplied at construction is incremented by the change in
/// allocation between construction and destruction. This is a scoped guard to
Expand Down
68 changes: 59 additions & 9 deletions velox/common/memory/tests/HashStringAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,15 @@ TEST_F(HashStringAllocatorTest, finishWrite) {
replaceStart.offset() + 4);

// Read back long and short strings.
auto inputStream = HSA::prepareRead(longStart.header);
HSA::InputStream inputStream(longStart.header);

std::string copy;
copy.resize(longString.size());
inputStream->readBytes(copy.data(), copy.size());
inputStream.ByteInputStream::readBytes(copy.data(), copy.size());
ASSERT_EQ(copy, longString);

copy.resize(4);
inputStream->readBytes(copy.data(), 4);
inputStream.ByteInputStream::readBytes(copy.data(), 4);
ASSERT_EQ(copy, "abcd");

auto allocatedBytes = allocator_->checkConsistency();
Expand All @@ -277,10 +277,10 @@ TEST_F(HashStringAllocatorTest, finishWrite) {
stream.appendStringView(largeString);
allocator_->finishWrite(stream, 0);

auto inStream = HSA::prepareRead(start.header);
HSA::InputStream inStream(start.header);
std::string copy;
copy.resize(largeString.size());
inStream->readBytes(copy.data(), copy.size());
inStream.ByteInputStream::readBytes(copy.data(), copy.size());
ASSERT_EQ(copy, largeString);
allocatedBytes = allocator_->checkConsistency();
ASSERT_EQ(allocatedBytes, allocator_->currentBytes());
Expand Down Expand Up @@ -396,10 +396,10 @@ TEST_F(HashStringAllocatorTest, rewrite) {
stream.appendOne(67890LL);
position = allocator_->finishWrite(stream, 0).second;
EXPECT_EQ(3 * sizeof(int64_t), HSA::offset(header, position));
auto inStream = HSA::prepareRead(header);
EXPECT_EQ(123456789012345LL, inStream->read<int64_t>());
EXPECT_EQ(12345LL, inStream->read<int64_t>());
EXPECT_EQ(67890LL, inStream->read<int64_t>());
HSA::InputStream inStream(header);
EXPECT_EQ(123456789012345LL, inStream.read<int64_t>());
EXPECT_EQ(12345LL, inStream.read<int64_t>());
EXPECT_EQ(67890LL, inStream.read<int64_t>());
}
// The stream contains 3 int64_t's.
auto end = HSA::seek(header, 3 * sizeof(int64_t));
Expand Down Expand Up @@ -765,5 +765,55 @@ TEST_F(HashStringAllocatorTest, freezeAndExecute) {
// mutable.
allocator_->freezeAndExecute([&]() { allocator_->currentBytes(); });
}

TEST_F(HashStringAllocatorTest, inputStream) {
std::string expected;
ByteOutputStream out(allocator_.get());
auto start = allocator_->newWrite(out, 1);
out.appendStringView(std::string_view("a"));
expected += "a";
auto last = allocator_->finishWrite(out, 0).second;
for (int i = 1; i < 10; ++i) {
allocator_->extendWrite(last, out);
std::string data(i + 1, 'a' + i);
out.appendStringView(data);
expected += data;
last = allocator_->finishWrite(out, 0).second;
}
ASSERT_TRUE(start.header->isContinued());
HSA::InputStream in(start.header);
ASSERT_GE(in.size(), out.size());
ASSERT_EQ(in.tellp(), 0);
ASSERT_FALSE(in.atEnd());
for (int i = 10, j = 0; i >= 1; --i) {
if (i % 2 == 0) {
char actual[10];
in.ByteInputStream::readBytes(actual, i);
ASSERT_LE(j + i, expected.size());
ASSERT_EQ(
std::string_view(actual, i),
std::string_view(expected.data() + j, i));
} else {
in.skip(i);
}
j += i;
}
ASSERT_EQ(in.tellp(), 55);
ASSERT_EQ(in.size(), 55 + in.remainingSize());
in.seekp(5);
ASSERT_EQ(in.tellp(), 5);
ASSERT_FALSE(in.atEnd());
for (int j = 5; j < expected.size();) {
auto actual = in.nextView(5);
auto size = std::min(actual.size(), expected.size() - j);
ASSERT_EQ(
actual.substr(0, size), std::string_view(expected.data() + j, size));
j += size;
}
in.skip(in.remainingSize());
ASSERT_TRUE(in.atEnd());
ASSERT_EQ(in.tellp(), in.size());
}

} // namespace
} // namespace facebook::velox
Loading
Loading