Skip to content

Commit

Permalink
Add support for reading multiple frames from one BlockingQueueStream …
Browse files Browse the repository at this point in the history
…buffer

PiperOrigin-RevId: 704581842
  • Loading branch information
hai007 authored and copybara-github committed Dec 10, 2024
1 parent 2352d73 commit 25395c5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
35 changes: 26 additions & 9 deletions internal/platform/blocking_queue_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include "internal/platform/blocking_queue_stream.h"

#include <algorithm>
#include <cstdint>
#include <utility>

#include "internal/platform/byte_array.h"
#include "internal/platform/exception.h"
Expand All @@ -24,26 +26,41 @@
namespace nearby {

BlockingQueueStream::BlockingQueueStream() {
NEARBY_LOGS(INFO) << "Create a BlockingQueueStream with size "
LOG(INFO) << "Create a BlockingQueueStream with size "
<< FeatureFlags::GetInstance()
.GetFlags()
.blocking_queue_stream_queue_capacity;
}

ExceptionOr<ByteArray> BlockingQueueStream::Read(std::int64_t size) {
if (is_closed_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "Failed to read BlockingQueueStream because it was closed.";
return ExceptionOr<ByteArray>(Exception::kInterrupted);
}
NEARBY_LOGS(INFO) << "BlockingQueueStream expect to read " << size
<< " bytes";
return ExceptionOr<ByteArray>(blocking_queue_.Take());

ByteArray bytes = queue_head_.Empty() ? blocking_queue_.Take() : queue_head_;
if (bytes == queue_end_) {
LOG(INFO) << "BlockingQueueStream is Interrupted.";
return ExceptionOr<ByteArray>(Exception::kInterrupted);
}

LOG(INFO) << "BlockingQueueStream is reading " << size << " bytes from "
<< bytes.size() << " bytes buffer";
int copy_len = std::min<int>(size, bytes.size());
ByteArray buffer;
buffer.SetData(bytes.data(), copy_len);
if (copy_len < bytes.size()) {
queue_head_ = ByteArray(bytes.data() + copy_len, bytes.size() - copy_len);
} else {
queue_head_ = ByteArray();
}
return ExceptionOr<ByteArray>(std::move(buffer));
}

void BlockingQueueStream::Write(const ByteArray& bytes) {
if (is_closed_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "Failed to write BlockingQueueStream because it was closed.";
return;
}
Expand All @@ -55,17 +72,17 @@ void BlockingQueueStream::Write(const ByteArray& bytes) {

Exception BlockingQueueStream::Close() {
if (is_closed_) {
NEARBY_LOGS(INFO) << "InputBlockingQueueStream has already been closed.";
LOG(INFO) << "InputBlockingQueueStream has already been closed.";
return {Exception::kSuccess};
}
if (is_writing_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "BlockingQueueStream is waiting for writing, read first to unblock";
blocking_queue_.TryTake();
}
blocking_queue_.TryPut(queue_end_);
is_closed_ = true;
NEARBY_LOGS(INFO) << "InputBlockingQueueStream is closed.";
LOG(INFO) << "InputBlockingQueueStream is closed.";
return {Exception::kSuccess};
}

Expand Down
3 changes: 2 additions & 1 deletion internal/platform/blocking_queue_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class BlockingQueueStream : public InputStream {
ArrayBlockingQueue<ByteArray> blocking_queue_{FeatureFlags::GetInstance()
.GetFlags()
.blocking_queue_stream_queue_capacity};
ByteArray queue_end_{0};
ByteArray queue_head_;
ByteArray queue_end_ = ByteArray();
bool is_writing_ = false;
bool is_closed_ = false;
};
Expand Down

0 comments on commit 25395c5

Please sign in to comment.