Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConfigureAwait(false) And task Extensions #15

Merged
merged 1 commit into from
Feb 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions src/KafkaNetClient/BrokerRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaNet
Expand Down Expand Up @@ -158,7 +157,7 @@ private async Task<bool> RefreshTopicMetadata(TimeSpan? cacheExpiration, TimeSpa

var connections = GetConnections();
var metadataRequestTask = _kafkaMetadataProvider.Get(connections, topics);
var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false);
var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false);

UpdateInternalMetadataCache(metadataResponse);
}
Expand Down Expand Up @@ -200,31 +199,13 @@ private async Task RefreshAllTopicMetadata(TimeSpan timeout)

var connections = GetConnections();
var metadataRequestTask = _kafkaMetadataProvider.Get(connections);
var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false);
var metadataResponse = await metadataRequestTask.WithTimeout(timeout).ConfigureAwait(false);

UpdateInternalMetadataCache(metadataResponse);
}
}

private async Task<MetadataResponse> RequestTopicMetadata(Task<MetadataResponse> requestTask, TimeSpan timeout)
{
if (requestTask.IsCompleted)
{
return await requestTask.ConfigureAwait(false);
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
Task completedTask = await Task.WhenAny(requestTask, Task.Delay(timeout, timeoutCancellationTokenSource.Token)).ConfigureAwait(false);
if (completedTask == requestTask)
{
timeoutCancellationTokenSource.Cancel(); // cancel timeout task
return await requestTask.ConfigureAwait(false);
}
else
{
throw new Exception("Metadata refresh operation timed out.");
}
}

private TopicSearchResult SearchCacheForTopics(IEnumerable<string> topics, TimeSpan? expiration)
{
Expand Down
74 changes: 74 additions & 0 deletions src/KafkaNetClient/Common/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaNet.Common
{
/// <summary>
/// Utility functions for dealing with Task's.
/// </summary>
/// <remarks>
/// This is copy of orleans TaskExtensions here:https://github.com/dotnet/orleans/blob/master/src/Orleans/Async/TaskExtensions.cs#L218
/// </remarks>

public static class TaskExtensions
{

/// <summary>
/// Observes and ignores a potential exception on a given Task.
/// If a Task fails and throws an exception which is never observed, it will be caught by the .NET finalizer thread.
/// This function awaits the given task and if the exception is thrown, it observes this exception and simply ignores it.
/// This will prevent the escalation of this exception to the .NET finalizer thread.
/// </summary>
/// <param name="task">The task to be ignored.</param>
[SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")]
public static void Ignore(this Task task)
{
if (task.IsCompleted)
{
var ignored = task.Exception;
}
else
{
task.ContinueWith(
t => { var ignored = t.Exception; },
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}

/// <summary>
/// This will apply a timeout delay to the task, allowing us to exit early
/// </summary>
/// <param name="taskToComplete">The task we will timeout after timeSpan</param>
/// <param name="timeout">Amount of time to wait before timing out</param>
/// <exception cref="TimeoutException">If we time out we will get this exception</exception>
/// <returns>The value of the completed task</returns>

public static async Task<T> WithTimeout<T>(this Task<T> taskToComplete, TimeSpan timeSpan)
{
if (taskToComplete.IsCompleted)
{
return await taskToComplete;
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(taskToComplete, Task.Delay(timeSpan, timeoutCancellationTokenSource.Token));

// We got done before the timeout, or were able to complete before this code ran, return the result
if (taskToComplete == completedTask)
{
timeoutCancellationTokenSource.Cancel();
// Await this so as to propagate the exception correctly
return await taskToComplete;
}

// We did not complete before the timeout, we fire and forget to ensure we observe any exceptions that may occur
taskToComplete.Ignore();
throw new TimeoutException(String.Format("WithTimeout has timed out after {0}.", timeSpan));
}

}
}
2 changes: 1 addition & 1 deletion src/KafkaNetClient/KafkaMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private async Task<MetadataResponse> GetMetadataResponse(IKafkaConnection[] conn
{
try
{
var response = await conn.SendAsync(request);
var response = await conn.SendAsync(request).ConfigureAwait(false);
if (response != null && response.Count > 0)
{
return response.FirstOrDefault();
Expand Down
1 change: 1 addition & 0 deletions src/KafkaNetClient/KafkaNetClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<Compile Include="Common\AsyncCollection.cs" />
<Compile Include="Common\ConcurrentCircularBuffer.cs" />
<Compile Include="Common\KafkaMessagePacker.cs" />
<Compile Include="Common\TaskExtensions.cs" />
<Compile Include="Interfaces\IManualConsumer.cs" />
<Compile Include="ManualConsumer.cs" />
<Compile Include="Model\StatisticsTrackerOptions.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNetClient/KafkaTcpSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private async Task ProcessNetworkstreamTasks(NetworkStream netStream)
//Exception need to thrown immediately and not depend on the next task
var readTask = ProcessNetworkstreamsSendTask(netStream);
var sendTask = ProcessNetworkstreamTasksReadTask(netStream);
await Task.WhenAny(readTask, sendTask);
await Task.WhenAny(readTask, sendTask).ConfigureAwait(false);
if (_disposeToken.IsCancellationRequested) return;
await ThrowTaskExceptionIfFaulted(readTask);
await ThrowTaskExceptionIfFaulted(sendTask);
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNetClient/MetadataQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public MetadataQueries(IBrokerRouter brokerRouter)
/// <returns></returns>
public async Task<List<OffsetResponse>> GetTopicOffsetAsync(string topic, int maxOffsets = 2, int time = -1)
{
await _brokerRouter.RefreshMissingTopicMetadata(topic);
await _brokerRouter.RefreshMissingTopicMetadata(topic).ConfigureAwait(false);
var topicMetadata = GetTopicFromCache(topic);

//send the offset request to each partition leader
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaNetClient/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Task<ProduceResponse[]> SendMessageAsync(string topic, int partition, par

public async Task<ProduceResponse> SendMessageAsync(Message messages, string topic, int partition, Int16 acks = 1)
{
var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks);
var result = await SendMessageAsync(topic, new Message[] { messages }, partition: partition, acks: acks).ConfigureAwait(false);
return result.FirstOrDefault();
}

Expand Down Expand Up @@ -284,7 +284,7 @@ private async Task ProduceAndSendBatchAsync(List<TopicMessage> messages, Cancell
BrokerRouter.Log.ErrorFormat("Exception[{0}] stacktrace[{1}]", ex.Message, ex.StackTrace);
}

await SetResult(sendTasks);
await SetResult(sendTasks).ConfigureAwait(false);
Interlocked.Add(ref _inFlightMessageCount, messages.Count * -1);
}
}
Expand All @@ -296,7 +296,7 @@ private async Task SetResult(List<BrokerRouteSendBatch> sendTasks)
try
{
//all ready done don't need to await but it none blocking syntext
var batchResult = await sendTask.Task;
var batchResult = await sendTask.Task.ConfigureAwait(false);
var numberOfMessage = sendTask.MessagesSent.Count;
for (int i = 0; i < numberOfMessage; i++)
{
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaNetClient/ProtocolGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top

//find route it can chage after Metadata Refresh
var route = _brokerRouter.SelectBrokerRouteFromLocalCache(topic, partition);
var responses = await route.Connection.SendAsync(request);
var responses = await route.Connection.SendAsync(request).ConfigureAwait(false);
response = responses.FirstOrDefault();

//this can happened if you send ProduceRequest with ack level=0
Expand Down Expand Up @@ -92,7 +92,7 @@ public async Task<T> SendProtocolRequest<T>(IKafkaRequest<T> request, string top
_brokerRouter.Log.WarnFormat("ProtocolGateway error sending request, retrying (attempt number {0}): {1}", retryTime, errorDetails);
if (needToRefreshTopicMetadata && hasMoreRetry)
{
await _brokerRouter.RefreshTopicMetadata(topic);
await _brokerRouter.RefreshTopicMetadata(topic).ConfigureAwait(false);
}
else
{
Expand Down