From 42db63c19b508505cc91da9c7fab9376335b030f Mon Sep 17 00:00:00 2001 From: hai007 Date: Mon, 23 Dec 2024 22:52:45 -0800 Subject: [PATCH] Add support for reading multiple frames from one BlockingQueueStream buffer PiperOrigin-RevId: 709247147 --- Package.swift | 1 + connections/implementation/flags/BUILD | 1 + internal/platform/BUILD | 4 ++ internal/platform/blocking_queue_stream.cc | 46 +++++++++--- internal/platform/blocking_queue_stream.h | 15 ++-- .../platform/blocking_queue_stream_test.cc | 71 +++++++++++++++++++ 6 files changed, 125 insertions(+), 13 deletions(-) create mode 100644 internal/platform/blocking_queue_stream_test.cc diff --git a/Package.swift b/Package.swift index 4cae5eb91c..c4975a01a7 100644 --- a/Package.swift +++ b/Package.swift @@ -558,6 +558,7 @@ let package = Package( "internal/platform/direct_executor_test.cc", "internal/platform/borrowable_test.cc", "internal/platform/implementation/windows/http_loader_test.cc", + "internal/platform/blocking_queue_stream_test.cc", "internal/network/utils_test.cc", "internal/network/url_test.cc", "internal/network/http_response_test.cc", diff --git a/connections/implementation/flags/BUILD b/connections/implementation/flags/BUILD index 9f42e7124f..960c477e52 100644 --- a/connections/implementation/flags/BUILD +++ b/connections/implementation/flags/BUILD @@ -21,6 +21,7 @@ cc_library( visibility = [ "//connections:__subpackages__", "//googlemac/iPhone/Shared/Identity/SmartSetup:__subpackages__", + "//internal/platform:__subpackages__", "//internal/platform/implementation:__subpackages__", "//location/nearby/cpp:__subpackages__", "//location/nearby/sharing:__subpackages__", diff --git a/internal/platform/BUILD b/internal/platform/BUILD index 6b7b3aefe4..4dcb6ddbca 100644 --- a/internal/platform/BUILD +++ b/internal/platform/BUILD @@ -271,6 +271,7 @@ cc_library( deps = [ ":base", ":util", + "//connections/implementation/flags:connections_flags", "//internal/base:files", "//internal/crypto_cros", "//internal/flags:nearby_flags", @@ -459,6 +460,7 @@ cc_test( "ble_connection_info_test.cc", "ble_test.cc", "ble_v2_test.cc", + "blocking_queue_stream_test.cc", "bluetooth_adapter_test.cc", "bluetooth_classic_test.cc", "bluetooth_connection_info_test.cc", @@ -476,6 +478,8 @@ cc_test( ":connection_info", ":test_util", ":types", + "//connections/implementation/flags:connections_flags", + "//internal/flags:nearby_flags", "//internal/platform/implementation:comm", "//internal/platform/implementation/g3", # build_cleaner: keep "//proto:connections_enums_cc_proto", diff --git a/internal/platform/blocking_queue_stream.cc b/internal/platform/blocking_queue_stream.cc index 005355eebc..ffe33751df 100644 --- a/internal/platform/blocking_queue_stream.cc +++ b/internal/platform/blocking_queue_stream.cc @@ -14,7 +14,9 @@ #include "internal/platform/blocking_queue_stream.h" +#include #include +#include #include "internal/platform/byte_array.h" #include "internal/platform/exception.h" @@ -24,26 +26,48 @@ namespace nearby { BlockingQueueStream::BlockingQueueStream() { - NEARBY_LOGS(INFO) << "Create a BlockingQueueStream with size " + LOG(INFO) << "Create a BlockingQueueStream with size " << FeatureFlags::GetInstance() .GetFlags() .blocking_queue_stream_queue_capacity; } ExceptionOr BlockingQueueStream::Read(std::int64_t size) { + if (!is_multiplex_enabled_) { + LOG(INFO) << "Multiplex is not enabled, drop the read."; + return ExceptionOr(Exception::kExecution); + } + if (is_closed_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "Failed to read BlockingQueueStream because it was closed."; return ExceptionOr(Exception::kInterrupted); } - NEARBY_LOGS(INFO) << "BlockingQueueStream expect to read " << size - << " bytes"; - return ExceptionOr(blocking_queue_.Take()); + + ByteArray bytes = queue_head_.Empty() ? blocking_queue_.Take() : queue_head_; + if (bytes == queue_end_) { + LOG(INFO) << "BlockingQueueStream is Interrupted."; + return ExceptionOr(Exception::kInterrupted); + } + + int copy_len = std::min(size, bytes.size()); + ByteArray buffer; + buffer.SetData(bytes.data(), copy_len); + if (copy_len < bytes.size()) { + queue_head_ = ByteArray(bytes.data() + copy_len, bytes.size() - copy_len); + } else { + queue_head_ = ByteArray(); + } + return ExceptionOr(std::move(buffer)); } void BlockingQueueStream::Write(const ByteArray& bytes) { + if (!is_multiplex_enabled_) { + LOG(INFO) << "Multiplex is not enabled, drop the write."; + return; + } if (is_closed_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "Failed to write BlockingQueueStream because it was closed."; return; } @@ -54,18 +78,22 @@ void BlockingQueueStream::Write(const ByteArray& bytes) { } Exception BlockingQueueStream::Close() { + if (!is_multiplex_enabled_) { + LOG(INFO) << "Multiplex is not enabled, drop the write."; + return {Exception::kExecution}; + } if (is_closed_) { - NEARBY_LOGS(INFO) << "InputBlockingQueueStream has already been closed."; + LOG(INFO) << "InputBlockingQueueStream has already been closed."; return {Exception::kSuccess}; } if (is_writing_) { - NEARBY_LOGS(INFO) + LOG(INFO) << "BlockingQueueStream is waiting for writing, read first to unblock"; blocking_queue_.TryTake(); } blocking_queue_.TryPut(queue_end_); is_closed_ = true; - NEARBY_LOGS(INFO) << "InputBlockingQueueStream is closed."; + LOG(INFO) << "InputBlockingQueueStream is closed."; return {Exception::kSuccess}; } diff --git a/internal/platform/blocking_queue_stream.h b/internal/platform/blocking_queue_stream.h index 72d76c1461..79030bc959 100644 --- a/internal/platform/blocking_queue_stream.h +++ b/internal/platform/blocking_queue_stream.h @@ -17,6 +17,8 @@ #include +#include "connections/implementation/flags/nearby_connections_feature_flags.h" +#include "internal/flags/nearby_flags.h" #include "internal/platform/array_blocking_queue.h" #include "internal/platform/byte_array.h" #include "internal/platform/exception.h" @@ -39,10 +41,15 @@ class BlockingQueueStream : public InputStream { private: mutable Mutex mutex_; - ArrayBlockingQueue blocking_queue_{FeatureFlags::GetInstance() - .GetFlags() - .blocking_queue_stream_queue_capacity}; - ByteArray queue_end_{0}; + bool is_multiplex_enabled_ = NearbyFlags::GetInstance().GetBoolFlag( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex); + ArrayBlockingQueue blocking_queue_{ + FeatureFlags::GetInstance() + .GetFlags() + .blocking_queue_stream_queue_capacity}; + ByteArray queue_head_; + ByteArray queue_end_ = ByteArray(); bool is_writing_ = false; bool is_closed_ = false; }; diff --git a/internal/platform/blocking_queue_stream_test.cc b/internal/platform/blocking_queue_stream_test.cc new file mode 100644 index 0000000000..bf1d73efd6 --- /dev/null +++ b/internal/platform/blocking_queue_stream_test.cc @@ -0,0 +1,71 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "internal/platform/blocking_queue_stream.h" + +#include "gtest/gtest.h" +#include "connections/implementation/flags/nearby_connections_feature_flags.h" +#include "internal/flags/nearby_flags.h" +#include "internal/platform/byte_array.h" +#include "internal/platform/exception.h" + +namespace nearby { +namespace { + +TEST(BlockingQueueStreamTest, ReadSuccess) { + bool is_multiplex_enabled = NearbyFlags::GetInstance().GetBoolFlag( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex); + NearbyFlags::GetInstance().OverrideBoolFlagValue( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex, true); + + BlockingQueueStream stream; + ByteArray bytes = ByteArray("test1test2test3"); + stream.Write(bytes); + ExceptionOr result = stream.Read(5); + EXPECT_EQ(result.result(), ByteArray("test1")); + result = stream.Read(5); + EXPECT_EQ(result.result(), ByteArray("test2")); + result = stream.Read(5); + EXPECT_EQ(result.result(), ByteArray("test3")); + stream.Close(); + + NearbyFlags::GetInstance().OverrideBoolFlagValue( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex, is_multiplex_enabled); +} + +TEST(BlockingQueueStreamTest, MultiplexDisabled) { + bool is_multiplex_enabled = NearbyFlags::GetInstance().GetBoolFlag( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex); + NearbyFlags::GetInstance().OverrideBoolFlagValue( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex, false); + + BlockingQueueStream stream; + ByteArray bytes = ByteArray("test1test2test3"); + stream.Write(bytes); + ExceptionOr result = stream.Read(5); + EXPECT_EQ(result, ExceptionOr(Exception::kExecution)); + stream.Close(); + + NearbyFlags::GetInstance().OverrideBoolFlagValue( + connections::config_package_nearby::nearby_connections_feature:: + kEnableMultiplex, is_multiplex_enabled); +} + +} // namespace +} // namespace nearby