From 25395c563354ddeea108f4463158d6e4f6c32dfb Mon Sep 17 00:00:00 2001 From: hai007 Date: Tue, 10 Dec 2024 00:16:44 -0800 Subject: [PATCH] Add support for reading multiple frames from one BlockingQueueStream buffer PiperOrigin-RevId: 704581842 --- internal/platform/blocking_queue_stream.cc | 35 ++++++++++++++++------ internal/platform/blocking_queue_stream.h | 3 +- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/internal/platform/blocking_queue_stream.cc b/internal/platform/blocking_queue_stream.cc index 005355eebc..c0f5a8f80c 100644 --- a/internal/platform/blocking_queue_stream.cc +++ b/internal/platform/blocking_queue_stream.cc @@ -14,7 +14,9 @@ #include "internal/platform/blocking_queue_stream.h" +#include #include +#include #include "internal/platform/byte_array.h" #include "internal/platform/exception.h" @@ -24,7 +26,7 @@ namespace nearby { BlockingQueueStream::BlockingQueueStream() { - NEARBY_LOGS(INFO) << "Create a BlockingQueueStream with size " + LOG(INFO) << "Create a BlockingQueueStream with size " << FeatureFlags::GetInstance() .GetFlags() .blocking_queue_stream_queue_capacity; @@ -32,18 +34,33 @@ BlockingQueueStream::BlockingQueueStream() { ExceptionOr BlockingQueueStream::Read(std::int64_t size) { if (is_closed_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "Failed to read BlockingQueueStream because it was closed."; return ExceptionOr(Exception::kInterrupted); } - NEARBY_LOGS(INFO) << "BlockingQueueStream expect to read " << size - << " bytes"; - return ExceptionOr(blocking_queue_.Take()); + + ByteArray bytes = queue_head_.Empty() ? blocking_queue_.Take() : queue_head_; + if (bytes == queue_end_) { + LOG(INFO) << "BlockingQueueStream is Interrupted."; + return ExceptionOr(Exception::kInterrupted); + } + + LOG(INFO) << "BlockingQueueStream is reading " << size << " bytes from " + << bytes.size() << " bytes buffer"; + int copy_len = std::min(size, bytes.size()); + ByteArray buffer; + buffer.SetData(bytes.data(), copy_len); + if (copy_len < bytes.size()) { + queue_head_ = ByteArray(bytes.data() + copy_len, bytes.size() - copy_len); + } else { + queue_head_ = ByteArray(); + } + return ExceptionOr(std::move(buffer)); } void BlockingQueueStream::Write(const ByteArray& bytes) { if (is_closed_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "Failed to write BlockingQueueStream because it was closed."; return; } @@ -55,17 +72,17 @@ void BlockingQueueStream::Write(const ByteArray& bytes) { Exception BlockingQueueStream::Close() { if (is_closed_) { - NEARBY_LOGS(INFO) << "InputBlockingQueueStream has already been closed."; + LOG(INFO) << "InputBlockingQueueStream has already been closed."; return {Exception::kSuccess}; } if (is_writing_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "BlockingQueueStream is waiting for writing, read first to unblock"; blocking_queue_.TryTake(); } blocking_queue_.TryPut(queue_end_); is_closed_ = true; - NEARBY_LOGS(INFO) << "InputBlockingQueueStream is closed."; + LOG(INFO) << "InputBlockingQueueStream is closed."; return {Exception::kSuccess}; } diff --git a/internal/platform/blocking_queue_stream.h b/internal/platform/blocking_queue_stream.h index 72d76c1461..b7f2652ca3 100644 --- a/internal/platform/blocking_queue_stream.h +++ b/internal/platform/blocking_queue_stream.h @@ -42,7 +42,8 @@ class BlockingQueueStream : public InputStream { ArrayBlockingQueue blocking_queue_{FeatureFlags::GetInstance() .GetFlags() .blocking_queue_stream_queue_capacity}; - ByteArray queue_end_{0}; + ByteArray queue_head_; + ByteArray queue_end_ = ByteArray(); bool is_writing_ = false; bool is_closed_ = false; };