From 042835cd9d8e4062e44e867c4d9b44dc62e1a069 Mon Sep 17 00:00:00 2001 From: Guogang Li Date: Mon, 21 Oct 2024 15:11:16 -0700 Subject: [PATCH] Fixed the bug in payload transfer PiperOrigin-RevId: 688288286 --- connections/implementation/payload_manager.cc | 289 +++++++++--------- 1 file changed, 136 insertions(+), 153 deletions(-) diff --git a/connections/implementation/payload_manager.cc b/connections/implementation/payload_manager.cc index baa043bb45..a5dcb9efd2 100644 --- a/connections/implementation/payload_manager.cc +++ b/connections/implementation/payload_manager.cc @@ -82,11 +82,10 @@ bool PayloadManager::SendPayloadLoop( // Update the still-active recipients of this payload. if (available_endpoint_ids.empty()) { - LOG(INFO) - << "PayloadManager short-circuiting payload_id=" - << pending_payload.GetInternalPayload()->GetId() << " after sending " - << next_chunk_offset - << " bytes because none of the endpoints are available anymore."; + LOG(INFO) << "PayloadManager short-circuiting payload_id=" + << pending_payload.GetInternalPayload()->GetId() + << " after sending " << next_chunk_offset + << " bytes because none of the endpoints are available anymore."; return false; } @@ -94,9 +93,8 @@ bool PayloadManager::SendPayloadLoop( // notify the remaining recipients. if (pending_payload.IsLocallyCanceled()) { LOG(INFO) << "Aborting send of payload_id=" - << pending_payload.GetInternalPayload()->GetId() - << " at offset " << next_chunk_offset - << " since it is marked canceled."; + << pending_payload.GetInternalPayload()->GetId() << " at offset " + << next_chunk_offset << " since it is marked canceled."; HandleFinishedOutgoingPayload(client, available_endpoint_ids, payload_header, next_chunk_offset, location::nearby::proto::connections:: @@ -113,9 +111,9 @@ bool PayloadManager::SendPayloadLoop( pending_payload.GetInternalPayload()->SkipToOffset(resume_offset); if (!real_offset.ok()) { // Stop sending since it may cause remote file merging failed. - LOG(WARNING) << "PayloadManager failed to skip offset " - << resume_offset << " on payload_id " - << pending_payload.GetInternalPayload()->GetId(); + LOG(WARNING) << "PayloadManager failed to skip offset " << resume_offset + << " on payload_id " + << pending_payload.GetInternalPayload()->GetId(); HandleFinishedOutgoingPayload( client, available_endpoint_ids, payload_header, next_chunk_offset, location::nearby::proto::connections::PayloadStatus::LOCAL_ERROR); @@ -145,7 +143,7 @@ bool PayloadManager::SendPayloadLoop( pending_payload.GetInternalPayload()->GetTotalSize() < next_chunk_offset) { LOG(INFO) << "Payload xfer failed: payload_id=" - << pending_payload.GetInternalPayload()->GetId(); + << pending_payload.GetInternalPayload()->GetId(); HandleFinishedOutgoingPayload( client, available_endpoint_ids, payload_header, next_chunk_offset, location::nearby::proto::connections::PayloadStatus::LOCAL_ERROR); @@ -163,8 +161,8 @@ bool PayloadManager::SendPayloadLoop( // Check whether at least one endpoint failed. if (!failed_endpoint_ids.empty()) { LOG(INFO) << "Payload xfer: endpoints failed: payload_id=" - << payload_header.id() << "; endpoint_ids={" - << ToString(failed_endpoint_ids) << "}", + << payload_header.id() << "; endpoint_ids={" + << ToString(failed_endpoint_ids) << "}", HandleFinishedOutgoingPayload(client, failed_endpoint_ids, payload_header, next_chunk_offset, location::nearby::proto::connections:: @@ -197,8 +195,8 @@ bool PayloadManager::SendPayloadLoop( if (!next_chunk_size) { // That was the last chunk, we're outta here. LOG(INFO) << "Payload xfer done: payload_id=" - << pending_payload.GetInternalPayload()->GetId() - << "; size=" << next_chunk_offset; + << pending_payload.GetInternalPayload()->GetId() + << "; size=" << next_chunk_offset; ThroughputRecorderContainer::GetInstance() .GetTPRecorder(pending_payload.GetInternalPayload()->GetId(), PayloadDirection::OUTGOING_PAYLOAD) @@ -333,8 +331,8 @@ void PayloadManager::CancelAllPayloads() { } if (shutdown_barrier_) { LOG(INFO) << "PayloadManager: waiting for pending outgoing " - "payloads; self=" - << this; + "payloads; self=" + << this; shutdown_barrier_->Await(); } } @@ -350,8 +348,7 @@ PayloadManager::~PayloadManager() { ThroughputRecorderContainer::GetInstance().Shutdown(); DisconnectFromEndpointManager(); CancelAllPayloads(); - LOG(INFO) << "PayloadManager: turn down payload executors; self=" - << this; + LOG(INFO) << "PayloadManager: turn down payload executors; self=" << this; bytes_payload_executor_.Shutdown(); stream_payload_executor_.Shutdown(); file_payload_executor_.Shutdown(); @@ -362,16 +359,14 @@ PayloadManager::~PayloadManager() { RunOnStatusUpdateThread( "~payload-manager", [this, &stop_latch]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { - LOG(INFO) << "PayloadManager: stop tracking payloads; self=" - << this; + LOG(INFO) << "PayloadManager: stop tracking payloads; self=" << this; MutexLock lock(&mutex_); pending_payloads_.StopTrackingAllPayloads(); stop_latch.CountDown(); }); stop_latch.Await(); - LOG(INFO) << "PayloadManager: turn down notification executor; self=" - << this; + LOG(INFO) << "PayloadManager: turn down notification executor; self=" << this; // Stop all the ongoing Runnables (as gracefully as possible). payload_status_update_executor_.Shutdown(); @@ -391,8 +386,7 @@ void PayloadManager::SendPayload(ClientProxy* client, const EndpointIds& endpoint_ids, Payload payload) { if (shutdown_.Get()) return; - LOG(INFO) << "SendPayload: endpoint_ids={" << ToString(endpoint_ids) - << "}"; + LOG(INFO) << "SendPayload: endpoint_ids={" << ToString(endpoint_ids) << "}"; // Before transfer to internal payload, retrieves the Payload size for // analytics. std::int64_t payload_total_size; @@ -417,10 +411,10 @@ void PayloadManager::SendPayload(ClientProxy* client, RecordInvalidPayloadAnalytics(client, endpoint_ids, payload.GetId(), payload.GetType(), payload.GetOffset(), payload_total_size); - LOG(INFO) - << "PayloadManager failed to determine the right executor for " - "outgoing payload_id=" - << payload.GetId() << ", payload_type=" << ToString(payload.GetType()); + LOG(INFO) << "PayloadManager failed to determine the right executor for " + "outgoing payload_id=" + << payload.GetId() + << ", payload_type=" << ToString(payload.GetType()); return; } @@ -436,57 +430,56 @@ void PayloadManager::SendPayload(ClientProxy* client, Payload::Id payload_id = CreateOutgoingPayload(std::move(payload), endpoint_ids); - executor->Execute( - "send-payload", [this, client, endpoint_ids, payload_id, payload_type, - resume_offset, payload_total_size]() { - if (shutdown_.Get()) return; - PendingPayloadHandle pending_payload = GetPayload(payload_id); - if (!pending_payload) { - RecordInvalidPayloadAnalytics(client, endpoint_ids, payload_id, - payload_type, resume_offset, - payload_total_size); - LOG(INFO) - << "PayloadManager failed to create InternalPayload for outgoing " - "payload_id=" - << payload_id << ", payload_type=" << ToString(payload_type) - << ", aborting sendPayload()."; - return; - } - auto* internal_payload = pending_payload->GetInternalPayload(); - if (!internal_payload) return; - - RecordPayloadStartedAnalytics(client, endpoint_ids, payload_id, - payload_type, resume_offset, - internal_payload->GetTotalSize()); - - PayloadTransferFrame::PayloadHeader payload_header{ - CreatePayloadHeader(*internal_payload, resume_offset, - internal_payload->GetParentFolder(), - internal_payload->GetFileName())}; - - bool should_continue = true; - std::int64_t next_chunk_offset = 0; - int index = 0; - - ThroughputRecorderContainer::GetInstance() - .GetTPRecorder(payload_id, PayloadDirection::OUTGOING_PAYLOAD) - ->Start(payload_type, PayloadDirection::OUTGOING_PAYLOAD); - while (should_continue && !shutdown_.Get()) { - should_continue = - SendPayloadLoop(client, *pending_payload, payload_header, - next_chunk_offset, resume_offset, index); - index++; - } + executor->Execute("send-payload", [this, client, endpoint_ids, payload_id, + payload_type, resume_offset, + payload_total_size]() { + if (shutdown_.Get()) return; + PendingPayloadHandle pending_payload = GetPayload(payload_id); + if (!pending_payload) { + RecordInvalidPayloadAnalytics(client, endpoint_ids, payload_id, + payload_type, resume_offset, + payload_total_size); + LOG(INFO) + << "PayloadManager failed to create InternalPayload for outgoing " + "payload_id=" + << payload_id << ", payload_type=" << ToString(payload_type) + << ", aborting sendPayload()."; + return; + } + auto* internal_payload = pending_payload->GetInternalPayload(); + if (!internal_payload) return; - RunOnStatusUpdateThread("destroy-payload", - [this, payload_id]() - RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { - DestroyPendingPayload(payload_id); - }); - }); + RecordPayloadStartedAnalytics(client, endpoint_ids, payload_id, + payload_type, resume_offset, + internal_payload->GetTotalSize()); + + PayloadTransferFrame::PayloadHeader payload_header{CreatePayloadHeader( + *internal_payload, resume_offset, internal_payload->GetParentFolder(), + internal_payload->GetFileName())}; + + bool should_continue = true; + std::int64_t next_chunk_offset = 0; + int index = 0; + + ThroughputRecorderContainer::GetInstance() + .GetTPRecorder(payload_id, PayloadDirection::OUTGOING_PAYLOAD) + ->Start(payload_type, PayloadDirection::OUTGOING_PAYLOAD); + while (should_continue && !shutdown_.Get()) { + should_continue = + SendPayloadLoop(client, *pending_payload, payload_header, + next_chunk_offset, resume_offset, index); + index++; + } + + RunOnStatusUpdateThread("destroy-payload", + [this, payload_id]() + RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { + DestroyPendingPayload(payload_id); + }); + }); LOG(INFO) << "PayloadManager: xfer scheduled: self=" << this - << "; payload_id=" << payload_id - << ", payload_type=" << ToString(payload_type); + << "; payload_id=" << payload_id + << ", payload_type=" << ToString(payload_type); } PayloadManager::PendingPayloadHandle PayloadManager::GetPayload( @@ -498,17 +491,16 @@ Status PayloadManager::CancelPayload(ClientProxy* client, Payload::Id payload_id) { PendingPayloadHandle canceled_payload = GetPayload(payload_id); if (!canceled_payload) { - LOG(INFO) << "Client requested cancel for unknown payload_id=" - << payload_id << ", ignoring."; + LOG(INFO) << "Client requested cancel for unknown payload_id=" << payload_id + << ", ignoring."; return {Status::kPayloadUnknown}; } // Mark the payload as canceled. canceled_payload->MarkLocallyCanceled(); LOG(INFO) << "Cancelling " - << (canceled_payload->IsIncoming() ? "incoming" - : "outgoing") - << " payload_id=" << payload_id << " at request of client."; + << (canceled_payload->IsIncoming() ? "incoming" : "outgoing") + << " payload_id=" << payload_id << " at request of client."; // Return SUCCESS immediately. Remaining cleanup and updates will be sent // in SendPayload() or OnIncomingFrame() @@ -547,8 +539,8 @@ void PayloadManager::OnIncomingFrame( switch (frame.packet_type()) { case PayloadTransferFrame::CONTROL: - LOG(INFO) << "PayloadManager::OnIncomingFrame [CONTROL]: self=" - << this << "; endpoint_id=" << from_endpoint_id; + LOG(INFO) << "PayloadManager::OnIncomingFrame [CONTROL]: self=" << this + << "; endpoint_id=" << from_endpoint_id; ProcessControlPacket(to_client, from_endpoint_id, frame); break; case PayloadTransferFrame::DATA: @@ -557,14 +549,13 @@ void PayloadManager::OnIncomingFrame( break; case PayloadTransferFrame::PAYLOAD_ACK: LOG(INFO) << "[safe-to-disconnect][PAYLOAD_RECEIVED_ACK] sender " - "received payload ack from " - << from_endpoint_id; + "received payload ack from " + << from_endpoint_id; ProcessPayloadAckPacket(from_endpoint_id, frame); break; default: - LOG(WARNING) - << "PayloadManager: invalid frame; remote endpoint: self=" << this - << "; endpoint_id=" << from_endpoint_id; + LOG(WARNING) << "PayloadManager: invalid frame; remote endpoint: self=" + << this << "; endpoint_id=" << from_endpoint_id; break; } } @@ -766,7 +757,7 @@ PayloadManager::PendingPayloadHandle PayloadManager::CreateIncomingPayload( void PayloadManager::OnPendingPayloadDestroy(const PendingPayload* payload) { LOG(INFO) << "PayloadManager: destroying " << payload->ToString() - << " self=" << this; + << " self=" << this; ThroughputRecorderContainer::GetInstance().StopTPRecorder( payload->GetId(), payload->IsIncoming() ? PayloadDirection::INCOMING_PAYLOAD @@ -880,8 +871,8 @@ void PayloadManager::SendPayloadReceivedAck(ClientProxy* client, endpoint_manager_->SendPayloadAck(pending_payload.GetId(), {endpoint_id}); LOG(INFO) << "[safe-to-disconnect] Send " - "PAYLOAD_RECEIVED_ACK frame to: " - << endpoint_id << " done"; + "PAYLOAD_RECEIVED_ACK frame to: " + << endpoint_id << " done"; }); // Send the PAYLOAD_RECEIVED_ACK to the remote endpoint for the sender asap. LOG(INFO) << "[safe-to-disconnect] " << pending_payload.GetId() @@ -899,24 +890,24 @@ bool PayloadManager::WaitForReceivedAck( } LOG(INFO) << "[safe-to-disconnect] Last Chunk, sender wait for " - "PAYLOAD_RECEIVED_ACK frame from: " - << endpoint_id; + "PAYLOAD_RECEIVED_ACK frame from: " + << endpoint_id; while (true) { PendingPayloadHandle latest_pending_payload = GetPayload(payload_header.id()); // Make sure we're still tracking this payload and its associated endpoint. if (!latest_pending_payload) { LOG(INFO) << "[safe-to-disconnect] short-circuiting " - "latest_pending_payload is null for " - << payload_header.id() << ", stop wait ack."; + "latest_pending_payload is null for " + << payload_header.id() << ", stop wait ack."; return false; } auto* endpoint_info = latest_pending_payload->GetEndpoint(endpoint_id); if (endpoint_info == nullptr) { LOG(INFO) << "[safe-to-disconnect] short-circuiting " - "endpointInfo is null for " - << payload_header.id() << ", stop wait ack."; + "endpointInfo is null for " + << payload_header.id() << ", stop wait ack."; return false; } @@ -927,8 +918,8 @@ bool PayloadManager::WaitForReceivedAck( location::nearby::proto::connections:: PayloadStatus::LOCAL_CANCELLATION); LOG(INFO) << "[safe-to-disconnect] short-circuiting local " - "payload cancellation for " - << payload_header.id() << ", stop wait ack."; + "payload cancellation for " + << payload_header.id() << ", stop wait ack."; return false; } // Remote payload cancellation, etc @@ -938,8 +929,8 @@ bool PayloadManager::WaitForReceivedAck( client, {endpoint_id}, payload_header, payload_chunk_offset, EndpointInfoStatusToPayloadStatus(endpoint_info->status.Get())); LOG(INFO) << "[safe-to-disconnect] short-circuiting remote " - "payload cancellation for " - << payload_header.id() << ", stop wait ack."; + "payload cancellation for " + << payload_header.id() << ", stop wait ack."; return false; } { @@ -975,7 +966,7 @@ bool PayloadManager::WaitForReceivedAck( " received payload ack from " << endpoint_id << ", end with timeout."; return false; - } + } } } return true; @@ -1011,9 +1002,8 @@ void PayloadManager::HandleFinishedOutgoingPayload( break; case location::nearby::proto::connections::PayloadStatus:: LOCAL_CANCELLATION: - LOG(INFO) - << "Sending PAYLOAD_CANCEL to receiver side; payload_id=" - << payload_header.id(); + LOG(INFO) << "Sending PAYLOAD_CANCEL to receiver side; payload_id=" + << payload_header.id(); SendControlMessage( finished_endpoint_ids, payload_header, num_bytes_successfully_transferred, @@ -1033,10 +1023,9 @@ void PayloadManager::HandleFinishedOutgoingPayload( // No special handling needed for these. break; default: - LOG(INFO) - << "PayloadManager: Unhandled finished outgoing payload with " - "payload_status=" - << status; + LOG(INFO) << "PayloadManager: Unhandled finished outgoing payload with " + "payload_status=" + << status; break; } } @@ -1062,8 +1051,7 @@ void PayloadManager::HandleFinishedIncomingPayload( break; default: LOG(INFO) << "Unhandled finished incoming payload_id=" - << payload_header.id() - << " with payload_status=" << status; + << payload_header.id() << " with payload_status=" << status; break; } } @@ -1101,7 +1089,7 @@ void PayloadManager::HandleSuccessfulOutgoingChunk( FILE) { if (!is_last_chunk && payload_chunk_offset != 0) { LOG(INFO) << "Skip the outgoing chunk update with offset=" - << payload_chunk_offset; + << payload_chunk_offset; client->GetAnalyticsRecorder().OnPayloadChunkSent( endpoint_id, payload_header.id(), payload_chunk_body_size); return; @@ -1111,10 +1099,9 @@ void PayloadManager::HandleSuccessfulOutgoingChunk( PendingPayloadHandle pending_payload = GetPayload(payload_header.id()); if (!pending_payload || !pending_payload->GetEndpoint(endpoint_id)) { - LOG(INFO) - << "HandleSuccessfulOutgoingChunk: endpoint not found: " - "endpoint_id=" - << endpoint_id; + LOG(INFO) << "HandleSuccessfulOutgoingChunk: endpoint not found: " + "endpoint_id=" + << endpoint_id; return; } @@ -1185,7 +1172,7 @@ void PayloadManager::HandleSuccessfulIncomingChunk( FILE) { if (!is_last_chunk && payload_chunk_offset != 0) { LOG(INFO) << "Skip the incoming chunk update with offset=" - << payload_chunk_offset; + << payload_chunk_offset; client->GetAnalyticsRecorder().OnPayloadChunkReceived( endpoint_id, payload_header.id(), payload_chunk_body_size); return; @@ -1211,6 +1198,7 @@ void PayloadManager::HandleSuccessfulIncomingChunk( // Analyze the success. if (is_last_chunk) { + DestroyPendingPayload(payload_header.id()); client->GetAnalyticsRecorder().OnIncomingPayloadDone( endpoint_id, payload_header.id(), location::nearby::proto::connections::SUCCESS); @@ -1237,7 +1225,7 @@ void PayloadManager::ProcessDataPacket( // We explicitly deny payloads with ID 0. if (payload_header.id() == 0) { LOG(WARNING) << "Denying payload with ID 0 for endpoint_id=" - << from_endpoint_id << ", aborting receipt."; + << from_endpoint_id << ", aborting receipt."; // Send the error to the remote endpoint. SendControlMessage({from_endpoint_id}, payload_header, payload_chunk.offset(), @@ -1266,11 +1254,10 @@ void PayloadManager::ProcessDataPacket( pending_payload = CreateIncomingPayload(payload_transfer_frame, from_endpoint_id); if (!pending_payload) { - LOG(WARNING) - << "PayloadManager failed to create InternalPayload from " - "PayloadTransferFrame with payload_id=" - << payload_header.id() << " and type " << payload_header.type() - << ", aborting receipt."; + LOG(WARNING) << "PayloadManager failed to create InternalPayload from " + "PayloadTransferFrame with payload_id=" + << payload_header.id() << " and type " + << payload_header.type() << ", aborting receipt."; // Send the error to the remote endpoint. SendControlMessage({from_endpoint_id}, payload_header, payload_chunk.offset(), @@ -1284,10 +1271,9 @@ void PayloadManager::ProcessDataPacket( pending_payload = GetPayload(payload_id)]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { if (!pending_payload) return; - LOG(INFO) - << "PayloadManager received new payload_id=" - << pending_payload->GetInternalPayload()->GetId() - << " from endpoint_id=" << from_endpoint_id; + LOG(INFO) << "PayloadManager received new payload_id=" + << pending_payload->GetInternalPayload()->GetId() + << " from endpoint_id=" << from_endpoint_id; to_client->OnPayload( from_endpoint_id, pending_payload->GetInternalPayload()->ReleasePayload()); @@ -1298,16 +1284,14 @@ void PayloadManager::ProcessDataPacket( if (!pending_payload) { LOG(WARNING) << "ProcessDataPacket: [missing] endpoint_id=" - << from_endpoint_id - << "; payload_id=" << payload_header.id(); + << from_endpoint_id << "; payload_id=" << payload_header.id(); return; } if (pending_payload->IsLocallyCanceled()) { // This incoming payload was canceled by the client. Drop this frame and // do all the cleanup. See go/nc-cancel-payload - LOG(INFO) << "ProcessDataPacket: [cancel] endpoint_id=" - << from_endpoint_id - << "; payload_id=" << pending_payload->GetId(); + LOG(INFO) << "ProcessDataPacket: [cancel] endpoint_id=" << from_endpoint_id + << "; payload_id=" << pending_payload->GetId(); HandleFinishedIncomingPayload(to_client, from_endpoint_id, payload_header, payload_chunk.offset(), location::nearby::proto::connections:: @@ -1331,8 +1315,8 @@ void PayloadManager::ProcessDataPacket( ->AttachNextChunk(ByteArray(std::move(*payload_chunk.mutable_body()))) .Raised()) { LOG(ERROR) << "ProcessDataPacket: [data: error] endpoint_id=" - << from_endpoint_id - << "; payload_id=" << pending_payload->GetId(); + << from_endpoint_id + << "; payload_id=" << pending_payload->GetId(); HandleFinishedIncomingPayload( to_client, from_endpoint_id, payload_header, payload_chunk.offset(), location::nearby::proto::connections::PayloadStatus::LOCAL_ERROR); @@ -1369,8 +1353,8 @@ void PayloadManager::ProcessControlPacket( PendingPayloadHandle pending_payload = GetPayload(payload_header.id()); if (!pending_payload) { LOG(INFO) << "Got ControlMessage for unknown payload_id=" - << payload_header.id() - << ", ignoring: " << control_message.event(); + << payload_header.id() + << ", ignoring: " << control_message.event(); return; } @@ -1378,7 +1362,7 @@ void PayloadManager::ProcessControlPacket( case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED: if (pending_payload->IsIncoming()) { LOG(INFO) << "Incoming PAYLOAD_CANCELED: from endpoint_id=" - << from_endpoint_id << "; self=" << this; + << from_endpoint_id << "; self=" << this; // No need to mark the pending payload as cancelled, since this is a // remote cancellation for an incoming payload -- we handle everything // inline here. @@ -1388,7 +1372,7 @@ void PayloadManager::ProcessControlPacket( ControlMessageEventToPayloadStatus(control_message.event())); } else { LOG(INFO) << "Outgoing PAYLOAD_CANCELED: from endpoint_id=" - << from_endpoint_id << "; self=" << this; + << from_endpoint_id << "; self=" << this; // Mark the payload as canceled *for this endpoint*. pending_payload->SetEndpointStatusFromControlMessage(from_endpoint_id, control_message); @@ -1411,9 +1395,9 @@ void PayloadManager::ProcessControlPacket( } break; default: - LOG(INFO) << "Unhandled control message " - << control_message.event() << " for payload_id=" - << pending_payload->GetInternalPayload()->GetId(); + LOG(INFO) << "Unhandled control message " << control_message.event() + << " for payload_id=" + << pending_payload->GetInternalPayload()->GetId(); break; } } @@ -1425,16 +1409,16 @@ void PayloadManager::ProcessPayloadAckPacket( PendingPayloadHandle pending_payload = GetPayload(payload_header.id()); if (!pending_payload) { LOG(INFO) << "[safe-to-disconnect][PAYLOAD_RECEIVED_ACK] " - "short-circuiting got payload " - "ack for unknown payload " - << payload_header.id() << ", ignoring"; + "short-circuiting got payload " + "ack for unknown payload " + << payload_header.id() << ", ignoring"; return; } if (pending_payload->IsIncoming()) { LOG(INFO) << "[safe-to-disconnect][PAYLOAD_RECEIVED_ACK] " - "short-circuiting got Payload " - "ack for incoming payload " - << payload_header.id() << ", ignoring"; + "short-circuiting got Payload " + "ack for incoming payload " + << payload_header.id() << ", ignoring"; } LOG(INFO) << "[safe-to-disconnect][PAYLOAD_RECEIVED_ACK] sender received payload " @@ -1503,9 +1487,8 @@ PayloadManager::EndpointInfo::ControlMessageEventToEndpointInfoStatus( case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED: return Status::kCanceled; default: - LOG(INFO) - << "Unknown EndpointInfo.Status for ControlMessage.EventType " - << event; + LOG(INFO) << "Unknown EndpointInfo.Status for ControlMessage.EventType " + << event; return Status::kUnknown; } }