diff --git a/newrelic/core/agent_streaming.py b/newrelic/core/agent_streaming.py index bf5c483ba2..b25bad0f3f 100644 --- a/newrelic/core/agent_streaming.py +++ b/newrelic/core/agent_streaming.py @@ -17,7 +17,8 @@ try: import grpc - from newrelic.core.infinite_tracing_pb2 import Span, RecordStatus + + from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span except ImportError: grpc = None @@ -33,14 +34,19 @@ class StreamingRpc(object): """ PATH = "/com.newrelic.trace.v1.IngestService/RecordSpan" + RETRY_POLICY = ( + (15, False), + (15, False), + (30, False), + (60, False), + (120, False), + (300, True), + ) + OPTIONS = [("grpc.enable_retries", 0)] def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True): - if ssl: - credentials = grpc.ssl_channel_credentials() - channel = grpc.secure_channel(endpoint, credentials) - else: - channel = grpc.insecure_channel(endpoint) - self.channel = channel + self._endpoint = endpoint + self._ssl = ssl self.metadata = metadata self.request_iterator = stream_buffer self.response_processing_thread = threading.Thread( @@ -48,10 +54,19 @@ def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True): ) self.response_processing_thread.daemon = True self.notify = self.condition() - self.rpc = self.channel.stream_stream( - self.PATH, Span.SerializeToString, RecordStatus.FromString - ) self.record_metric = record_metric + self.closed = False + + self.create_channel() + + def create_channel(self): + if self._ssl: + credentials = grpc.ssl_channel_credentials() + self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS) + else: + self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS) + + self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString) @staticmethod def condition(*args, **kwargs): @@ -63,6 +78,7 @@ def close(self): if self.channel: channel = self.channel self.channel = None + self.closed = True self.notify.notify_all() if channel: @@ -80,6 +96,7 @@ def connect(self): def process_responses(self): response_iterator = None + retry = 0 while True: with self.notify: if self.channel and response_iterator: @@ -112,21 +129,42 @@ def process_responses(self): ) break - _logger.warning( - "Streaming RPC closed. " - "Will attempt to reconnect in 15 seconds. " - "Code: %s Details: %s", - code, - details, - ) - self.notify.wait(15) + # Unpack retry policy settings + if retry >= len(self.RETRY_POLICY): + retry_time, error = self.RETRY_POLICY[-1] + else: + retry_time, error = self.RETRY_POLICY[retry] + retry += 1 + + # Emit appropriate retry logs + if not error: + _logger.warning( + "Streaming RPC closed. Will attempt to reconnect in %d seconds. Check the prior log entries and remedy any issue as necessary, or if the problem persists, report this problem to New Relic support for further investigation. Code: %s Details: %s", + retry_time, + code, + details, + ) + else: + _logger.error( + "Streaming RPC closed after additional attempts. Will attempt to reconnect in %d seconds. Please report this problem to New Relic support for further investigation. Code: %s Details: %s", + retry_time, + code, + details, + ) + + # Reconnect channel with backoff + self.channel.close() + self.notify.wait(retry_time) + if self.closed: + break + else: + _logger.debug("Attempting to reconnect Streaming RPC.") + self.create_channel() - if not self.channel: + if self.closed: break - response_iterator = self.rpc( - self.request_iterator, metadata=self.metadata - ) + response_iterator = self.rpc(self.request_iterator, metadata=self.metadata) _logger.info("Streaming RPC connect completed.") try: diff --git a/setup.py b/setup.py index f42bbf55ab..de7422255e 100644 --- a/setup.py +++ b/setup.py @@ -156,7 +156,7 @@ def build_extension(self, ext): "newrelic": ["newrelic.ini", "version.txt", "packages/urllib3/LICENSE.txt", "common/cacert.pem"], }, scripts=["scripts/newrelic-admin"], - extras_require={"infinite-tracing": ["grpcio<1.40", "protobuf<4"]}, + extras_require={"infinite-tracing": ["grpcio", "protobuf<4"]}, ) if with_setuptools: diff --git a/tox.ini b/tox.ini index 1ed0a52ccf..c43880ac3d 100644 --- a/tox.ini +++ b/tox.ini @@ -114,7 +114,9 @@ envlist = python-framework_fastapi-{py36,py37,py38,py39,py310}, python-framework_flask-{pypy,py27}-flask0012, python-framework_flask-{pypy,py27,py36,py37,py38,py39,py310,pypy3}-flask0101, - python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master}, + ;temporarily disabling tests on flask master + ; python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master}, + python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest}, python-framework_graphene-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphenelatest, python-framework_graphene-py37-graphene{0200,0201}, python-framework_graphql-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphql02,