Skip to content

Commit

Permalink
Issue:(Consumers across multi-partitions / Consumers with Broker fail…
Browse files Browse the repository at this point in the history
…ure #13)[#13]

Change list:
* KafkaTcpSocket:
  * Stop disposing the task _socketTask on KafkaTcpSocket (Microsoft best practice).
  * Return BrokerConnectionException if there is a any exception that not dispose(before was mix exceptions).
  * Add CancellationToken to NetStream WriteAsync.
  * Change log level of some message to debug and not worn.
  * Add Log details on with contention write this log to identify the broker.
  * Remove some of the log message.
* Kafka Concoction
  * Add Log details to identify the broker.
  * Change log to write on the first failure/recover to remove a lot of unnecessary log message.
* Consumers
  * Refresh metadata on BrokerConnectionException.
  * Use broker Log.
* BrokerRouter:
  * It use internally concurrentDictionary of concoction in a way it create new concoction even when it didn't have to ,now it create only if necessary.
 * ProtocolGateway
  * Add log
  * Add more retry
* Test:
 *Missing Test:
   * BrokerRouter create new concoction on when broker IP change.
   * BrokerRouter not create new concoction on refresh metadata when noting change.
 * Add manual tests that you can play with your Kafka server and see that it still know how to recover.
  * ConsumerFailure
  * ManualConsumerFailure
 * fix the unit-test after the change in the log
 *In our Kafka we stop creating topic on demand because it make a mass, so we Ignore all test that create topic
  • Loading branch information
erano.of committed Feb 16, 2016
1 parent b11117c commit 9a77581
Show file tree
Hide file tree
Showing 20 changed files with 407 additions and 180 deletions.
33 changes: 13 additions & 20 deletions src/KafkaNetClient/BrokerRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private async Task<bool> RefreshTopicMetadata(TimeSpan? cacheExpiration, TimeSpa
}

_kafkaOptions.Log.DebugFormat("BrokerRouter: Refreshing metadata for topics: {0}", string.Join(",", topics));

var connections = GetConnections();
var metadataRequestTask = _kafkaMetadataProvider.Get(connections, topics);
var metadataResponse = await RequestTopicMetadata(metadataRequestTask, timeout).ConfigureAwait(false);
Expand Down Expand Up @@ -210,7 +210,7 @@ private async Task<MetadataResponse> RequestTopicMetadata(Task<MetadataResponse>
{
if (requestTask.IsCompleted)
{
return await requestTask.ConfigureAwait(false);
return await requestTask.ConfigureAwait(false);
}

var timeoutCancellationTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -299,15 +299,10 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata)
{
//if the connection is in our default connection index already, remove it and assign it to the broker index.
IKafkaConnection connection;
if (_defaultConnectionIndex.TryRemove(broker.Endpoint, out connection))
{
UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, connection);
}
else
{
connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaxRetry);
UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, connection);
}
_defaultConnectionIndex.TryRemove(broker.Endpoint, out connection);

Func<int, IKafkaConnection> connectionFactory = (i) => connection ?? _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaxRetry);
UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, broker.Endpoint, connectionFactory);
}

foreach (var topic in metadata.Topics)
Expand All @@ -317,20 +312,18 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata)
}
}

private void UpsertConnectionToBrokerConnectionIndex(int brokerId, IKafkaConnection newConnection)
private void UpsertConnectionToBrokerConnectionIndex(int brokerId, KafkaEndpoint brokerEndpoint, Func<int, IKafkaConnection> connectionFactory)
{
//associate the connection with the broker id, and add or update the reference
_brokerConnectionIndex.AddOrUpdate(brokerId,
i => newConnection,
_brokerConnectionIndex.AddOrUpdate(brokerId, connectionFactory,
(i, existingConnection) =>
{
//if a connection changes for a broker close old connection and create a new one
if (existingConnection.Endpoint.Equals(newConnection.Endpoint)) return existingConnection;
_kafkaOptions.Log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", brokerId, existingConnection.Endpoint, newConnection.Endpoint);
using (existingConnection)
{
return newConnection;
}
if (existingConnection.Endpoint.Equals(brokerEndpoint)) return existingConnection;
_kafkaOptions.Log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", brokerId, existingConnection.Endpoint, brokerEndpoint);

existingConnection.Dispose();
return connectionFactory(i);
});
}

Expand Down
5 changes: 5 additions & 0 deletions src/KafkaNetClient/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ await Task.Run(() =>
//no message received from server wait a while before we try another long poll
await Task.Delay(_options.BackoffInterval, _disposeToken.Token);
}
catch (BrokerConnectionException ex)
{
needToRefreshMetadata = true;
_options.Log.ErrorFormat(ex.Message);
}
catch (BufferUnderRunException ex)
{
bufferSizeHighWatermark = (int)(ex.RequiredBufferSize * _options.FetchBufferMultiplier) +
Expand Down
75 changes: 46 additions & 29 deletions src/KafkaNetClient/KafkaConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ namespace KafkaNet
public class KafkaConnection : IKafkaConnection
{
private const int DefaultResponseTimeoutMs = 60000;
bool _isInErrorState = false;

public bool IsOnErrorState()
{
return _isInErrorState;
}

private readonly ConcurrentDictionary<int, AsyncRequestItem> _requestIndex = new ConcurrentDictionary<int, AsyncRequestItem>();
private readonly TimeSpan _responseTimeoutMs;
Expand Down Expand Up @@ -96,7 +102,7 @@ public async Task<List<T>> SendAsync<T>(IKafkaRequest<T> request)
//assign unique correlationId
request.CorrelationId = NextCorrelationId();

_log.DebugFormat("Entered SendAsync for CorrelationId:{0} Connection:{1}", request.CorrelationId, this.Endpoint);
_log.DebugFormat("Entered SendAsync for CorrelationId:{0} Connection:{1} ", request.CorrelationId,Endpoint);
//if response is expected, register a receive data task and send request
if (request.ExpectResponse)
{
Expand Down Expand Up @@ -163,43 +169,54 @@ private void StartReadStreamPoller()
//This thread will poll the receive stream for data, parce a message out
//and trigger an event with the message payload
_connectionReadPollingTask = Task.Run(async () =>
{

try
{
try
{
//only allow one reader to execute, dump out all other requests
if (Interlocked.Increment(ref _ensureOneActiveReader) != 1) return;
//only allow one reader to execute, dump out all other requests
if (Interlocked.Increment(ref _ensureOneActiveReader) != 1) return;

while (_disposeToken.IsCancellationRequested == false)
while (_disposeToken.IsCancellationRequested == false)
{
try
{
try
{
_log.DebugFormat("Awaiting message from: {0}", _client.Endpoint);
var messageSizeResult = await _client.ReadAsync(4, _disposeToken.Token).ConfigureAwait(false);
var messageSize = messageSizeResult.ToInt32();
_log.DebugFormat("Awaiting message from: {0}", _client.Endpoint);
var messageSizeResult = await _client.ReadAsync(4, _disposeToken.Token).ConfigureAwait(false);
var messageSize = messageSizeResult.ToInt32();

_log.DebugFormat("Received message of size: {0} From: {1}", messageSize, _client.Endpoint);
var message = await _client.ReadAsync(messageSize, _disposeToken.Token).ConfigureAwait(false);
_log.DebugFormat("Received message of size: {0} From: {1}", messageSize, _client.Endpoint);
var message = await _client.ReadAsync(messageSize, _disposeToken.Token).ConfigureAwait(false);

CorrelatePayloadToRequest(message);
}
catch (Exception ex)
CorrelatePayloadToRequest(message);
if (_isInErrorState)
_log.InfoFormat("Polling read thread has recovered: {0}", _client.Endpoint);

_isInErrorState = false;
}
catch (Exception ex)
{
//don't record the exception if we are disposing
if (_disposeToken.IsCancellationRequested == false)
{
//don't record the exception if we are disposing
if (_disposeToken.IsCancellationRequested == false)
//TODO being in sync with the byte order on read is important. What happens if this exception causes us to be out of sync?
//record exception and continue to scan for data.

//TODO create an event on kafkaTcpSocket and resume only when the connection is online
if (!_isInErrorState)
{
//TODO being in sync with the byte order on read is important. What happens if this exception causes us to be out of sync?
//record exception and continue to scan for data.
_log.ErrorFormat("Exception occured in polling read thread. Exception={0}", ex);
_log.ErrorFormat("Exception occured in polling read thread {0}: {1}", _client.Endpoint, ex);
_isInErrorState = true;
}
}
}
}
finally
{
Interlocked.Decrement(ref _ensureOneActiveReader);
_log.DebugFormat("Closed down connection to: {0}", _client.Endpoint);
}
});
}
finally
{
Interlocked.Decrement(ref _ensureOneActiveReader);
_log.DebugFormat("Closed down connection to: {0}", _client.Endpoint);
}
});
}

private void CorrelatePayloadToRequest(byte[] payload)
Expand Down Expand Up @@ -253,8 +270,8 @@ private void TriggerMessageTimeout(AsyncRequestItem asyncRequestItem)
else
{
asyncRequestItem.ReceiveTask.TrySetException(new ResponseTimeoutException(
string.Format("Timeout Expired. Client failed to receive a response from server after waiting {0}ms.",
_responseTimeoutMs)));
string.Format("Timeout reached for endpoint {0} (after waiting {1})",
_client.Endpoint, _responseTimeoutMs)));
}
}

Expand Down
Loading

0 comments on commit 9a77581

Please sign in to comment.