Skip to content

Commit

Permalink
Merge pull request #15 from gigya/AddTaskExtensionAndConfigureAwait
Browse files Browse the repository at this point in the history
 ConfigureAwait(false) And  task Extensions
  • Loading branch information
randa1 committed Feb 17, 2016
2 parents 9a77581 + a3f3a73 commit bf64974
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 29 deletions.
23 changes: 2 additions & 21 deletions src/KafkaNetClient/BrokerRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaNet
Expand Down Expand Up @@ -158,7 +157,7 @@ private async Task<bool> RefreshTopicMetadata(TimeSpan? cacheExpiration, TimeSpa

var connections = GetConnections();
var metadataRequestTask = _kafkaMetadataProvider.Get(connections, topics);
var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false);
var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false);

UpdateInternalMetadataCache(metadataResponse);
}
Expand Down Expand Up @@ -200,31 +199,13 @@ private async Task RefreshAllTopicMetadata(TimeSpan timeout)

var connections = GetConnections();
var metadataRequestTask = _kafkaMetadataProvider.Get(connections);
var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false);
var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false);

UpdateInternalMetadataCache(metadataResponse);
}
}

private async Task<MetadataResponse> RequestTopicMetadata(Task<MetadataResponse> requestTask, TimeSpan timeout)
{
if (requestTask.IsCompleted)
{
return await requestTask.ConfigureAwait(false);
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
Task completedTask = await Task.WhenAny(requestTask, Task.Delay(timeout, timeoutCancellationTokenSource.Token)).ConfigureAwait(false);
if (completedTask == requestTask)
{
timeoutCancellationTokenSource.Cancel(); // cancel timeout task
return await requestTask.ConfigureAwait(false);
}
else
{
throw new Exception("Metadata refresh operation timed out.");
}
}

private TopicSearchResult SearchCacheForTopics(IEnumerable<string> topics, TimeSpan? expiration)
{
Expand Down
74 changes: 74 additions & 0 deletions src/KafkaNetClient/Common/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaNet.Common
{
/// <summary>
/// Utility functions for dealing with Task's.
/// </summary>
/// <remarks>
/// This is copy of orleans TaskExtensions here:https://github.com/dotnet/orleans/blob/master/src/Orleans/Async/TaskExtensions.cs#L218
/// </remarks>

public static class TaskExtensions
{

/// <summary>
/// Observes and ignores a potential exception on a given Task.
/// If a Task fails and throws an exception which is never observed, it will be caught by the .NET finalizer thread.
/// This function awaits the given task and if the exception is thrown, it observes this exception and simply ignores it.
/// This will prevent the escalation of this exception to the .NET finalizer thread.
/// </summary>
/// <param name="task">The task to be ignored.</param>
[SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")]
public static void Ignore(this Task task)
{
if (task.IsCompleted)
{
var ignored = task.Exception;
}
else
{
task.ContinueWith(
t => { var ignored = t.Exception; },
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}

/// <summary>
/// This will apply a timeout delay to the task, allowing us to exit early
/// </summary>
/// <param name="taskToComplete">The task we will timeout after timeSpan</param>
/// <param name="timeout">Amount of time to wait before timing out</param>
/// <exception cref="TimeoutException">If we time out we will get this exception</exception>
/// <returns>The value of the completed task</returns>

public static async Task<T> WithTimeout<T>(this Task<T> taskToComplete, TimeSpan timeSpan)
{
if (taskToComplete.IsCompleted)
{
return await taskToComplete;
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeSpan, timeoutCancellationTokenSource.Token));

// We got done before the timeout, or were able to complete before this code ran, return the result
if (taskToComplete == completedTask)
{
timeoutCancellationTokenSource.Cancel();
// Await this so as to propagate the exception correctly
return await taskToComplete;
}

// We did not complete before the timeout, we fire and forget to ensure we observe any exceptions that may occur
taskToComplete.Ignore();
throw new TimeoutException(String.Format("WithTimeout has timed out after {0}.", timeSpan));
}

}
}
2 changes: 1 addition & 1 deletion src/KafkaNetClient/KafkaMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private async Task<MetadataResponse> GetMetadataResponse(IKafkaConnection[] conn
{
try
{
var response = await conn.SendAsync(request);
var response = await conn.SendAsync(request).ConfigureAwait(false);
if (response != null && response.Count > 0)
{
return response.FirstOrDefault();
Expand Down
1 change: 1 addition & 0 deletions src/KafkaNetClient/KafkaNetClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<Compile Include="Common\AsyncCollection.cs" />
<Compile Include="Common\ConcurrentCircularBuffer.cs" />
<Compile Include="Common\KafkaMessagePacker.cs" />
<Compile Include="Common\TaskExtensions.cs" />
<Compile Include="Interfaces\IManualConsumer.cs" />
<Compile Include="ManualConsumer.cs" />
<Compile Include="Model\StatisticsTrackerOptions.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNetClient/KafkaTcpSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private async Task ProcessNetworkstreamTasks(NetworkStream netStream)
//Exception need to thrown immediately and not depend on the next task
var readTask = ProcessNetworkstreamsSendTask(netStream);
var sendTask = ProcessNetworkstreamTasksReadTask(netStream);
await Task.WhenAny(readTask, sendTask);
await Task.WhenAny(readTask, sendTask).ConfigureAwait(false);
if (_disposeToken.IsCancellationRequested) return;
await ThrowTaskExceptionIfFaulted(readTask);
await ThrowTaskExceptionIfFaulted(sendTask);
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNetClient/MetadataQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public MetadataQueries(IBrokerRouter brokerRouter)
/// <returns></returns>
public async Task<List<OffsetResponse>> GetTopicOffsetAsync(string topic, int maxOffsets = 2, int time = -1)
{
await _brokerRouter.RefreshMissingTopicMetadata(topic);
await _brokerRouter.RefreshMissingTopicMetadata(topic).ConfigureAwait(false);
var topicMetadata = GetTopicFromCache(topic);

//send the offset request to each partition leader
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaNetClient/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Task<ProduceResponse[]> SendMessageAsync(string topic, int partition, par

public async Task<ProduceResponse> SendMessageAsync(Message messages, string topic, int partition, Int16 acks = 1)
{
var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks);
var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks).ConfigureAwait(false);
return result.FirstOrDefault();
}

Expand Down Expand Up @@ -284,7 +284,7 @@ private async Task ProduceAndSendBatchAsync(List<TopicMessage> messages, Cancell
BrokerRouter.Log.ErrorFormat("Exception[{0}] stacktrace[{1}]", ex.Message, ex.StackTrace);
}

await SetResult(sendTasks);
await SetResult(sendTasks).ConfigureAwait(false);
Interlocked.Add(ref _inFlightMessageCount, messages.Count * -1);
}
}
Expand All @@ -296,7 +296,7 @@ private async Task SetResult(List<BrokerRouteSendBatch> sendTasks)
try
{
//all ready done don't need to await but it none blocking syntext
var batchResult = await sendTask.Task;
var batchResult = await sendTask.Task.ConfigureAwait(false);
var numberOfMessage = sendTask.MessagesSent.Count;
for (int i = 0; i < numberOfMessage; i++)
{
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaNetClient/ProtocolGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top

//find route it can chage after Metadata Refresh
var route = _brokerRouter.SelectBrokerRouteFromLocalCache(topic, partition);
var responses = await route.Connection.SendAsync(request);
var responses = await route.Connection.SendAsync(request).ConfigureAwait(false);
response = responses.FirstOrDefault();

//this can happened if you send ProduceRequest with ack level=0
Expand Down Expand Up @@ -92,7 +92,7 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
_brokerRouter.Log.WarnFormat("ProtocolGateway error sending request, retrying (attempt number {0}): {1}", retryTime, errorDetails);
if (needToRefreshTopicMetadata && hasMoreRetry)
{
await _brokerRouter.RefreshTopicMetadata(topic);
await _brokerRouter.RefreshTopicMetadata(topic).ConfigureAwait(false);
}
else
{
Expand Down

0 comments on commit bf64974

Please sign in to comment.