From a3f3a7318aece2fc59899dec94f6bfa91081ea35 Mon Sep 17 00:00:00 2001 From: "erano.of" Date: Tue, 16 Feb 2016 16:55:25 +0200 Subject: [PATCH] add ConfigureAwait(false) . add task Extensions --- src/KafkaNetClient/BrokerRouter.cs | 23 +------ src/KafkaNetClient/Common/TaskExtensions.cs | 74 +++++++++++++++++++++ src/KafkaNetClient/KafkaMetadataProvider.cs | 2 +- src/KafkaNetClient/KafkaNetClient.csproj | 1 + src/KafkaNetClient/KafkaTcpSocket.cs | 2 +- src/KafkaNetClient/MetadataQueries.cs | 2 +- src/KafkaNetClient/Producer.cs | 6 +- src/KafkaNetClient/ProtocolGateway.cs | 4 +- 8 files changed, 85 insertions(+), 29 deletions(-) create mode 100644 src/KafkaNetClient/Common/TaskExtensions.cs diff --git a/src/KafkaNetClient/BrokerRouter.cs b/src/KafkaNetClient/BrokerRouter.cs index e22e771a..10a78c84 100644 --- a/src/KafkaNetClient/BrokerRouter.cs +++ b/src/KafkaNetClient/BrokerRouter.cs @@ -5,7 +5,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; namespace KafkaNet @@ -158,7 +157,7 @@ private async Task RefreshTopicMetadata(TimeSpan? cacheExpiration, TimeSpa var connections = GetConnections(); var metadataRequestTask = _kafkaMetadataProvider.Get(connections, topics); - var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false); + var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false); UpdateInternalMetadataCache(metadataResponse); } @@ -200,31 +199,13 @@ private async Task RefreshAllTopicMetadata(TimeSpan timeout) var connections = GetConnections(); var metadataRequestTask = _kafkaMetadataProvider.Get(connections); - var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false); + var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false); UpdateInternalMetadataCache(metadataResponse); } } - private async Task RequestTopicMetadata(Task requestTask, TimeSpan timeout) - { - if (requestTask.IsCompleted) - { - return await requestTask.ConfigureAwait(false); - } - var timeoutCancellationTokenSource = new CancellationTokenSource(); - Task completedTask = await Task.WhenAny(requestTask, Task.Delay(timeout, timeoutCancellationTokenSource.Token)).ConfigureAwait(false); - if (completedTask == requestTask) - { - timeoutCancellationTokenSource.Cancel(); // cancel timeout task - return await requestTask.ConfigureAwait(false); - } - else - { - throw new Exception("Metadata refresh operation timed out."); - } - } private TopicSearchResult SearchCacheForTopics(IEnumerable topics, TimeSpan? expiration) { diff --git a/src/KafkaNetClient/Common/TaskExtensions.cs b/src/KafkaNetClient/Common/TaskExtensions.cs new file mode 100644 index 00000000..b845eb7d --- /dev/null +++ b/src/KafkaNetClient/Common/TaskExtensions.cs @@ -0,0 +1,74 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Tasks; + +namespace KafkaNet.Common +{ + /// + /// Utility functions for dealing with Task's. + /// + /// + /// This is copy of orleans TaskExtensions here:https://github.com/dotnet/orleans/blob/master/src/Orleans/Async/TaskExtensions.cs#L218 + /// + + public static class TaskExtensions + { + + /// + /// Observes and ignores a potential exception on a given Task. + /// If a Task fails and throws an exception which is never observed, it will be caught by the .NET finalizer thread. + /// This function awaits the given task and if the exception is thrown, it observes this exception and simply ignores it. + /// This will prevent the escalation of this exception to the .NET finalizer thread. + /// + /// The task to be ignored. + [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")] + public static void Ignore(this Task task) + { + if (task.IsCompleted) + { + var ignored = task.Exception; + } + else + { + task.ContinueWith( + t => { var ignored = t.Exception; }, + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + } + + /// + /// This will apply a timeout delay to the task, allowing us to exit early + /// + /// The task we will timeout after timeSpan + /// Amount of time to wait before timing out + /// If we time out we will get this exception + /// The value of the completed task + + public static async Task WithTimeout(this Task taskToComplete, TimeSpan timeSpan) + { + if (taskToComplete.IsCompleted) + { + return await taskToComplete; + } + + var timeoutCancellationTokenSource = new CancellationTokenSource(); + var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeSpan, timeoutCancellationTokenSource.Token)); + + // We got done before the timeout, or were able to complete before this code ran, return the result + if (taskToComplete == completedTask) + { + timeoutCancellationTokenSource.Cancel(); + // Await this so as to propagate the exception correctly + return await taskToComplete; + } + + // We did not complete before the timeout, we fire and forget to ensure we observe any exceptions that may occur + taskToComplete.Ignore(); + throw new TimeoutException(String.Format("WithTimeout has timed out after {0}.", timeSpan)); + } + + } +} diff --git a/src/KafkaNetClient/KafkaMetadataProvider.cs b/src/KafkaNetClient/KafkaMetadataProvider.cs index 2fa94d76..8e206f8e 100644 --- a/src/KafkaNetClient/KafkaMetadataProvider.cs +++ b/src/KafkaNetClient/KafkaMetadataProvider.cs @@ -107,7 +107,7 @@ private async Task GetMetadataResponse(IKafkaConnection[] conn { try { - var response = await conn.SendAsync(request); + var response = await conn.SendAsync(request).ConfigureAwait(false); if (response != null && response.Count > 0) { return response.FirstOrDefault(); diff --git a/src/KafkaNetClient/KafkaNetClient.csproj b/src/KafkaNetClient/KafkaNetClient.csproj index 6e4f2463..0f747a0c 100644 --- a/src/KafkaNetClient/KafkaNetClient.csproj +++ b/src/KafkaNetClient/KafkaNetClient.csproj @@ -49,6 +49,7 @@ + diff --git a/src/KafkaNetClient/KafkaTcpSocket.cs b/src/KafkaNetClient/KafkaTcpSocket.cs index 45700884..06715f45 100644 --- a/src/KafkaNetClient/KafkaTcpSocket.cs +++ b/src/KafkaNetClient/KafkaTcpSocket.cs @@ -199,7 +199,7 @@ private async Task ProcessNetworkstreamTasks(NetworkStream netStream) //Exception need to thrown immediately and not depend on the next task var readTask = ProcessNetworkstreamsSendTask(netStream); var sendTask = ProcessNetworkstreamTasksReadTask(netStream); - await Task.WhenAny(readTask, sendTask); + await Task.WhenAny(readTask, sendTask).ConfigureAwait(false); if (_disposeToken.IsCancellationRequested) return; await ThrowTaskExceptionIfFaulted(readTask); await ThrowTaskExceptionIfFaulted(sendTask); diff --git a/src/KafkaNetClient/MetadataQueries.cs b/src/KafkaNetClient/MetadataQueries.cs index 17c7006d..0768d5b9 100644 --- a/src/KafkaNetClient/MetadataQueries.cs +++ b/src/KafkaNetClient/MetadataQueries.cs @@ -26,7 +26,7 @@ public MetadataQueries(IBrokerRouter brokerRouter) /// public async Task> GetTopicOffsetAsync(string topic, int maxOffsets = 2, int time = -1) { - await _brokerRouter.RefreshMissingTopicMetadata(topic); + await _brokerRouter.RefreshMissingTopicMetadata(topic).ConfigureAwait(false); var topicMetadata = GetTopicFromCache(topic); //send the offset request to each partition leader diff --git a/src/KafkaNetClient/Producer.cs b/src/KafkaNetClient/Producer.cs index 66a3bcdb..58a79b71 100644 --- a/src/KafkaNetClient/Producer.cs +++ b/src/KafkaNetClient/Producer.cs @@ -136,7 +136,7 @@ public Task SendMessageAsync(string topic, int partition, par public async Task SendMessageAsync(Message messages, string topic, int partition, Int16 acks = 1) { - var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks); + var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks).ConfigureAwait(false); return result.FirstOrDefault(); } @@ -284,7 +284,7 @@ private async Task ProduceAndSendBatchAsync(List messages, Cancell BrokerRouter.Log.ErrorFormat("Exception[{0}] stacktrace[{1}]", ex.Message, ex.StackTrace); } - await SetResult(sendTasks); + await SetResult(sendTasks).ConfigureAwait(false); Interlocked.Add(ref _inFlightMessageCount, messages.Count * -1); } } @@ -296,7 +296,7 @@ private async Task SetResult(List sendTasks) try { //all ready done don't need to await but it none blocking syntext - var batchResult = await sendTask.Task; + var batchResult = await sendTask.Task.ConfigureAwait(false); var numberOfMessage = sendTask.MessagesSent.Count; for (int i = 0; i < numberOfMessage; i++) { diff --git a/src/KafkaNetClient/ProtocolGateway.cs b/src/KafkaNetClient/ProtocolGateway.cs index d18f49fb..58687855 100644 --- a/src/KafkaNetClient/ProtocolGateway.cs +++ b/src/KafkaNetClient/ProtocolGateway.cs @@ -54,7 +54,7 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top //find route it can chage after Metadata Refresh var route = _brokerRouter.SelectBrokerRouteFromLocalCache(topic, partition); - var responses = await route.Connection.SendAsync(request); + var responses = await route.Connection.SendAsync(request).ConfigureAwait(false); response = responses.FirstOrDefault(); //this can happened if you send ProduceRequest with ack level=0 @@ -92,7 +92,7 @@ public async Task SendProtocolRequest(IKafkaRequest request, string top _brokerRouter.Log.WarnFormat("ProtocolGateway error sending request, retrying (attempt number {0}): {1}", retryTime, errorDetails); if (needToRefreshTopicMetadata && hasMoreRetry) { - await _brokerRouter.RefreshTopicMetadata(topic); + await _brokerRouter.RefreshTopicMetadata(topic).ConfigureAwait(false); } else {