Skip to content

Commit

Permalink
Add support for reading multiple frames from one BlockingQueueStream …
Browse files Browse the repository at this point in the history
…buffer

PiperOrigin-RevId: 704581842
  • Loading branch information
hai007 authored and copybara-github committed Dec 24, 2024
1 parent c9fbf6b commit 4d78f2a
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 13 deletions.
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ let package = Package(
"internal/platform/direct_executor_test.cc",
"internal/platform/borrowable_test.cc",
"internal/platform/implementation/windows/http_loader_test.cc",
"internal/platform/blocking_queue_stream_test.cc",
"internal/network/utils_test.cc",
"internal/network/url_test.cc",
"internal/network/http_response_test.cc",
Expand Down
1 change: 1 addition & 0 deletions connections/implementation/flags/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cc_library(
visibility = [
"//connections:__subpackages__",
"//googlemac/iPhone/Shared/Identity/SmartSetup:__subpackages__",
"//internal/platform:__subpackages__",
"//internal/platform/implementation:__subpackages__",
"//location/nearby/cpp:__subpackages__",
"//location/nearby/sharing:__subpackages__",
Expand Down
4 changes: 4 additions & 0 deletions internal/platform/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ cc_library(
deps = [
":base",
":util",
"//connections/implementation/flags:connections_flags",
"//internal/base:files",
"//internal/crypto_cros",
"//internal/flags:nearby_flags",
Expand Down Expand Up @@ -459,6 +460,7 @@ cc_test(
"ble_connection_info_test.cc",
"ble_test.cc",
"ble_v2_test.cc",
"blocking_queue_stream_test.cc",
"bluetooth_adapter_test.cc",
"bluetooth_classic_test.cc",
"bluetooth_connection_info_test.cc",
Expand All @@ -476,6 +478,8 @@ cc_test(
":connection_info",
":test_util",
":types",
"//connections/implementation/flags:connections_flags",
"//internal/flags:nearby_flags",
"//internal/platform/implementation:comm",
"//internal/platform/implementation/g3", # build_cleaner: keep
"//proto:connections_enums_cc_proto",
Expand Down
46 changes: 37 additions & 9 deletions internal/platform/blocking_queue_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include "internal/platform/blocking_queue_stream.h"

#include <algorithm>
#include <cstdint>
#include <utility>

#include "internal/platform/byte_array.h"
#include "internal/platform/exception.h"
Expand All @@ -24,26 +26,48 @@
namespace nearby {

BlockingQueueStream::BlockingQueueStream() {
NEARBY_LOGS(INFO) << "Create a BlockingQueueStream with size "
LOG(INFO) << "Create a BlockingQueueStream with size "
<< FeatureFlags::GetInstance()
.GetFlags()
.blocking_queue_stream_queue_capacity;
}

ExceptionOr<ByteArray> BlockingQueueStream::Read(std::int64_t size) {
if (!is_multiplex_enabled_) {
LOG(INFO) << "Multiplex is not enabled, drop the read.";
return ExceptionOr<ByteArray>(Exception::kExecution);
}

if (is_closed_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "Failed to read BlockingQueueStream because it was closed.";
return ExceptionOr<ByteArray>(Exception::kInterrupted);
}
NEARBY_LOGS(INFO) << "BlockingQueueStream expect to read " << size
<< " bytes";
return ExceptionOr<ByteArray>(blocking_queue_.Take());

ByteArray bytes = queue_head_.Empty() ? blocking_queue_.Take() : queue_head_;
if (bytes == queue_end_) {
LOG(INFO) << "BlockingQueueStream is Interrupted.";
return ExceptionOr<ByteArray>(Exception::kInterrupted);
}

int copy_len = std::min<int>(size, bytes.size());
ByteArray buffer;
buffer.SetData(bytes.data(), copy_len);
if (copy_len < bytes.size()) {
queue_head_ = ByteArray(bytes.data() + copy_len, bytes.size() - copy_len);
} else {
queue_head_ = ByteArray();
}
return ExceptionOr<ByteArray>(std::move(buffer));
}

void BlockingQueueStream::Write(const ByteArray& bytes) {
if (!is_multiplex_enabled_) {
LOG(INFO) << "Multiplex is not enabled, drop the write.";
return;
}
if (is_closed_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "Failed to write BlockingQueueStream because it was closed.";
return;
}
Expand All @@ -54,18 +78,22 @@ void BlockingQueueStream::Write(const ByteArray& bytes) {
}

Exception BlockingQueueStream::Close() {
if (!is_multiplex_enabled_) {
LOG(INFO) << "Multiplex is not enabled, drop the write.";
return {Exception::kExecution};
}
if (is_closed_) {
NEARBY_LOGS(INFO) << "InputBlockingQueueStream has already been closed.";
LOG(INFO) << "InputBlockingQueueStream has already been closed.";
return {Exception::kSuccess};
}
if (is_writing_) {
NEARBY_LOGS(INFO)
LOG(INFO)
<< "BlockingQueueStream is waiting for writing, read first to unblock";
blocking_queue_.TryTake();
}
blocking_queue_.TryPut(queue_end_);
is_closed_ = true;
NEARBY_LOGS(INFO) << "InputBlockingQueueStream is closed.";
LOG(INFO) << "InputBlockingQueueStream is closed.";
return {Exception::kSuccess};
}

Expand Down
15 changes: 11 additions & 4 deletions internal/platform/blocking_queue_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <cstdint>

#include "connections/implementation/flags/nearby_connections_feature_flags.h"
#include "internal/flags/nearby_flags.h"
#include "internal/platform/array_blocking_queue.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/exception.h"
Expand All @@ -39,10 +41,15 @@ class BlockingQueueStream : public InputStream {

private:
mutable Mutex mutex_;
ArrayBlockingQueue<ByteArray> blocking_queue_{FeatureFlags::GetInstance()
.GetFlags()
.blocking_queue_stream_queue_capacity};
ByteArray queue_end_{0};
bool is_multiplex_enabled_ = NearbyFlags::GetInstance().GetBoolFlag(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex);
ArrayBlockingQueue<ByteArray> blocking_queue_{
FeatureFlags::GetInstance()
.GetFlags()
.blocking_queue_stream_queue_capacity};
ByteArray queue_head_;
ByteArray queue_end_ = ByteArray();
bool is_writing_ = false;
bool is_closed_ = false;
};
Expand Down
71 changes: 71 additions & 0 deletions internal/platform/blocking_queue_stream_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "internal/platform/blocking_queue_stream.h"

#include "gtest/gtest.h"
#include "connections/implementation/flags/nearby_connections_feature_flags.h"
#include "internal/flags/nearby_flags.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/exception.h"

namespace nearby {
namespace {

TEST(BlockingQueueStreamTest, ReadSuccess) {
bool is_multiplex_enabled = NearbyFlags::GetInstance().GetBoolFlag(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex);
NearbyFlags::GetInstance().OverrideBoolFlagValue(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex, true);

BlockingQueueStream stream;
ByteArray bytes = ByteArray("test1test2test3");
stream.Write(bytes);
ExceptionOr<ByteArray> result = stream.Read(5);
EXPECT_EQ(result.result(), ByteArray("test1"));
result = stream.Read(5);
EXPECT_EQ(result.result(), ByteArray("test2"));
result = stream.Read(5);
EXPECT_EQ(result.result(), ByteArray("test3"));
stream.Close();

NearbyFlags::GetInstance().OverrideBoolFlagValue(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex, is_multiplex_enabled);
}

TEST(BlockingQueueStreamTest, MultiplexDisabled) {
bool is_multiplex_enabled = NearbyFlags::GetInstance().GetBoolFlag(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex);
NearbyFlags::GetInstance().OverrideBoolFlagValue(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex, false);

BlockingQueueStream stream;
ByteArray bytes = ByteArray("test1test2test3");
stream.Write(bytes);
ExceptionOr<ByteArray> result = stream.Read(5);
EXPECT_EQ(result, ExceptionOr<ByteArray>(Exception::kExecution));
stream.Close();

NearbyFlags::GetInstance().OverrideBoolFlagValue(
connections::config_package_nearby::nearby_connections_feature::
kEnableMultiplex, is_multiplex_enabled);
}

} // namespace
} // namespace nearby

0 comments on commit 4d78f2a

Please sign in to comment.