Skip to content

Commit

Permalink
Infinite Tracing Batching & Compression (#762)
Browse files Browse the repository at this point in the history
* Infinite Tracing Batching and Compression settings (#756)

* Add compression setting

* Add batching setting

* Infinite Tracing Compression (#758)

* Initial commit

* Add compression option in StreamingRPC

* Add compression default to tests

* Add test to confirm compression settings

* Remove commented out code

* Set compression settings from settings override

* Infinite Tracing Batching (#759)

* Initial infinite tracing batching implementation

* Add RecordSpanBatch method to mock grpc server

* Span batching settings and testing.

Co-authored-by: Lalleh Rafeei <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>
Co-authored-by: Uma Annamalai <[email protected]>

* Add final 8t batching tests

* Rename serialization test

* Formatting

* Guard unittests from failing due to batching

* Linting

* Simplify batching algorithm

* Properly wire batching parametrization

* Fix incorrect validator use

* Data loss on reconnect regression testing

Co-authored-by: Uma Annamalai <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>

* Test stream buffer batch sizes

* Fix logic in supportability metrics for spans

* Clean up nested conditionals in stream buffer

* Compression parametrization in serialization test

* Formatting

* Update 8t test_no_delay_on_ok

* Update protobufs

* Remove unnecessary patching from test

* Fix waiting in supportability metric tests

* Add sleep to waiting in test

* Reorder sleep and condition check

* Mark no data loss xfail for py2.

* Fix conditional check

* Fix flake8 linter issues

---------

Co-authored-by: Lalleh Rafeei <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>
Co-authored-by: Uma Annamalai <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>

* Infinite Tracing Supportability Feature Toggle Metrics (#769)

* Add 8T feature toggle supportability metrics

* Remove supportability metrics when 8t is disabled.

* Formatting

---------

Co-authored-by: Lalleh Rafeei <[email protected]>
Co-authored-by: Lalleh Rafeei <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>
Co-authored-by: Uma Annamalai <[email protected]>
Co-authored-by: Hannah Stepanek <[email protected]>
  • Loading branch information
6 people authored Feb 16, 2023
1 parent a75c076 commit 59abb6a
Show file tree
Hide file tree
Showing 19 changed files with 1,022 additions and 632 deletions.
49 changes: 40 additions & 9 deletions newrelic/common/streaming_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
import threading

try:
from newrelic.core.infinite_tracing_pb2 import AttributeValue
from newrelic.core.infinite_tracing_pb2 import AttributeValue, SpanBatch
except:
AttributeValue = None
AttributeValue, SpanBatch = None, None


_logger = logging.getLogger(__name__)


class StreamBuffer(object):
def __init__(self, maxlen):
def __init__(self, maxlen, batching=False):
self._queue = collections.deque(maxlen=maxlen)
self._notify = self.condition()
self._shutdown = False
self._seen = 0
self._dropped = 0
self._settings = None

self.batching = batching

@staticmethod
def condition(*args, **kwargs):
Expand Down Expand Up @@ -66,14 +70,23 @@ def stats(self):

return seen, dropped

def __bool__(self):
return bool(self._queue)

def __len__(self):
return len(self._queue)

def __iter__(self):
return StreamBufferIterator(self)


class StreamBufferIterator(object):
MAX_BATCH_SIZE = 100

def __init__(self, stream_buffer):
self.stream_buffer = stream_buffer
self._notify = self.stream_buffer._notify
self.batching = self.stream_buffer.batching
self._shutdown = False
self._stream = None

Expand All @@ -100,12 +113,30 @@ def __next__(self):
self.shutdown()
raise StopIteration

try:
return self.stream_buffer._queue.popleft()
except IndexError:
pass

if not self.stream_closed() and not self.stream_buffer._queue:
if self.batching:
stream_buffer_len = len(self.stream_buffer)
if stream_buffer_len > self.MAX_BATCH_SIZE:
# Ensure batch size is never more than 100 to prevent issues with serializing large numbers
# of spans causing their age to exceed 10 seconds. That would cause them to be rejected
# by the trace observer.
batch = [self.stream_buffer._queue.popleft() for _ in range(self.MAX_BATCH_SIZE)]
return SpanBatch(spans=batch)
elif stream_buffer_len:
# For small span batches empty stream buffer into list and clear queue.
# This is only safe to do under lock which prevents items being added to the queue.
batch = list(self.stream_buffer._queue)
self.stream_buffer._queue.clear()
return SpanBatch(spans=batch)

else:
# Send items from stream buffer one at a time.
try:
return self.stream_buffer._queue.popleft()
except IndexError:
pass

# Wait until items are added to the stream buffer.
if not self.stream_closed() and not self.stream_buffer:
self._notify.wait()

next = __next__
Expand Down
2 changes: 2 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ def _process_configuration(section):
_process_setting(section, "event_harvest_config.harvest_limits.log_event_data", "getint", None)
_process_setting(section, "infinite_tracing.trace_observer_host", "get", None)
_process_setting(section, "infinite_tracing.trace_observer_port", "getint", None)
_process_setting(section, "infinite_tracing.compression", "getboolean", None)
_process_setting(section, "infinite_tracing.batching", "getboolean", None)
_process_setting(section, "infinite_tracing.span_queue_size", "getint", None)
_process_setting(section, "code_level_metrics.enabled", "getboolean", None)

Expand Down
31 changes: 24 additions & 7 deletions newrelic/core/agent_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
try:
import grpc

from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span
from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span, SpanBatch
except Exception:
grpc, RecordStatus, Span = None, None, None
grpc, RecordStatus, Span, SpanBatch = None, None, None, None

_logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,6 @@ class StreamingRpc(object):
retry will not occur.
"""

PATH = "/com.newrelic.trace.v1.IngestService/RecordSpan"
RETRY_POLICY = (
(15, False),
(15, False),
Expand All @@ -44,7 +43,7 @@ class StreamingRpc(object):
)
OPTIONS = [("grpc.enable_retries", 0)]

def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True, compression=None):
self._endpoint = endpoint
self._ssl = ssl
self.metadata = metadata
Expand All @@ -57,17 +56,35 @@ def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
self.notify = self.condition()
self.record_metric = record_metric
self.closed = False
# If this is not set, None is still a falsy value.
self.compression_setting = grpc.Compression.Gzip if compression else grpc.Compression.NoCompression

if self.batching: # Stream buffer will be sending span batches
self.path = "/com.newrelic.trace.v1.IngestService/RecordSpanBatch"
self.serializer = SpanBatch.SerializeToString
else:
self.path = "/com.newrelic.trace.v1.IngestService/RecordSpan"
self.serializer = Span.SerializeToString

self.create_channel()

@property
def batching(self):
# Determine batching by stream buffer settings
return self.stream_buffer.batching

def create_channel(self):
if self._ssl:
credentials = grpc.ssl_channel_credentials()
self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS)
self.channel = grpc.secure_channel(
self._endpoint, credentials, compression=self.compression_setting, options=self.OPTIONS
)
else:
self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS)
self.channel = grpc.insecure_channel(
self._endpoint, compression=self.compression_setting, options=self.OPTIONS
)

self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)
self.rpc = self.channel.stream_stream(self.path, self.serializer, RecordStatus.FromString)

def create_response_iterator(self):
with self.stream_buffer._notify:
Expand Down
89 changes: 61 additions & 28 deletions newrelic/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,30 +520,65 @@ def connect_to_data_collector(self, activate_agent):

self._global_events_account = 0

# Record metrics for how long it took us to connect and how
# many attempts we made. Also record metrics for the final
# successful attempt. If we went through multiple attempts,
# individual details of errors before the final one that
# worked are not recorded as recording them all in the
# initial harvest would possibly skew first harvest metrics
# and cause confusion as we cannot properly mark the time over
# which they were recorded. Make sure we do this before we
# mark the session active so we don't have to grab a lock on
# merging the internal metrics.

with InternalTraceContext(internal_metrics):
# Record metrics for how long it took us to connect and how
# many attempts we made. Also record metrics for the final
# successful attempt. If we went through multiple attempts,
# individual details of errors before the final one that
# worked are not recorded as recording them all in the
# initial harvest would possibly skew first harvest metrics
# and cause confusion as we cannot properly mark the time over
# which they were recorded. Make sure we do this before we
# mark the session active so we don't have to grab a lock on
# merging the internal metrics.

internal_metric(
"Supportability/Python/Application/Registration/Duration", self._period_start - connect_start
)
internal_metric("Supportability/Python/Application/Registration/Attempts", connect_attempts)

# Logging feature toggle supportability metrics
application_logging_metrics = configuration.application_logging.enabled and configuration.application_logging.metrics.enabled
application_logging_forwarding = configuration.application_logging.enabled and configuration.application_logging.forwarding.enabled
application_logging_local_decorating = configuration.application_logging.enabled and configuration.application_logging.local_decorating.enabled
internal_metric("Supportability/Logging/Forwarding/Python/%s" % ("enabled" if application_logging_forwarding else "disabled"), 1)
internal_metric("Supportability/Logging/LocalDecorating/Python/%s" % ("enabled" if application_logging_local_decorating else "disabled"), 1)
internal_metric("Supportability/Logging/Metrics/Python/%s" % ("enabled" if application_logging_metrics else "disabled"), 1)
# Record metrics for feature toggles from settings

# Logging feature toggle metrics
application_logging_metrics = (
configuration.application_logging.enabled and configuration.application_logging.metrics.enabled
)
application_logging_forwarding = (
configuration.application_logging.enabled and configuration.application_logging.forwarding.enabled
)
application_logging_local_decorating = (
configuration.application_logging.enabled and configuration.application_logging.local_decorating.enabled
)
internal_metric(
"Supportability/Logging/Forwarding/Python/%s"
% ("enabled" if application_logging_forwarding else "disabled"),
1,
)
internal_metric(
"Supportability/Logging/LocalDecorating/Python/%s"
% ("enabled" if application_logging_local_decorating else "disabled"),
1,
)
internal_metric(
"Supportability/Logging/Metrics/Python/%s" % ("enabled" if application_logging_metrics else "disabled"),
1,
)

# Infinite tracing feature toggle metrics
infinite_tracing = configuration.infinite_tracing.enabled # Property that checks trace observer host
if infinite_tracing:
infinite_tracing_batching = configuration.infinite_tracing.batching
infinite_tracing_compression = configuration.infinite_tracing.compression
internal_metric(
"Supportability/InfiniteTracing/gRPC/Batching/%s"
% ("enabled" if infinite_tracing_batching else "disabled"),
1,
)
internal_metric(
"Supportability/InfiniteTracing/gRPC/Compression/%s"
% ("enabled" if infinite_tracing_compression else "disabled"),
1,
)

self._stats_engine.merge_custom_metrics(internal_metrics.metrics())

Expand Down Expand Up @@ -724,11 +759,9 @@ def stop_data_samplers(self):

def remove_data_source(self, name):
with self._data_samplers_lock:

data_sampler = [x for x in self._data_samplers if x.name == name]

if len(data_sampler) > 0:

# Should be at most one data sampler for a given name.

data_sampler = data_sampler[0]
Expand All @@ -741,7 +774,6 @@ def remove_data_source(self, name):
data_sampler.stop()

except Exception:

# If sampler has not started yet, it may throw an error.

_logger.debug(
Expand Down Expand Up @@ -1066,7 +1098,6 @@ def harvest(self, shutdown=False, flexible=False):

with InternalTraceContext(internal_metrics):
with InternalTrace("Supportability/Python/Harvest/Calls/" + call_metric):

self._harvest_count += 1

start = time.time()
Expand Down Expand Up @@ -1204,7 +1235,6 @@ def harvest(self, shutdown=False, flexible=False):
stats.reset_synthetics_events()

if configuration.collect_analytics_events and configuration.transaction_events.enabled:

transaction_events = stats.transaction_events

if transaction_events:
Expand Down Expand Up @@ -1235,7 +1265,7 @@ def harvest(self, shutdown=False, flexible=False):
if configuration.infinite_tracing.enabled:
span_stream = stats.span_stream
# Only merge stats as part of default harvest
if span_stream and not flexible:
if span_stream is not None and not flexible:
spans_seen, spans_dropped = span_stream.stats()
spans_sent = spans_seen - spans_dropped

Expand Down Expand Up @@ -1267,7 +1297,6 @@ def harvest(self, shutdown=False, flexible=False):
and configuration.error_collector.capture_events
and configuration.error_collector.enabled
):

error_events = stats.error_events
if error_events:
num_error_samples = error_events.num_samples
Expand All @@ -1289,7 +1318,6 @@ def harvest(self, shutdown=False, flexible=False):
# Send custom events

if configuration.collect_custom_events and configuration.custom_insights_events.enabled:

customs = stats.custom_events

if customs:
Expand All @@ -1309,8 +1337,13 @@ def harvest(self, shutdown=False, flexible=False):

# Send log events

if configuration and configuration.application_logging and configuration.application_logging.enabled and configuration.application_logging.forwarding and configuration.application_logging.forwarding.enabled:

if (
configuration
and configuration.application_logging
and configuration.application_logging.enabled
and configuration.application_logging.forwarding
and configuration.application_logging.forwarding.enabled
):
logs = stats.log_events

if logs:
Expand Down
8 changes: 4 additions & 4 deletions newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ def _environ_as_mapping(name, default=""):
return result

for item in items.split(";"):

try:
key, value = item.split(":")
except ValueError:
Expand Down Expand Up @@ -731,6 +730,8 @@ def default_host(license_key):

_settings.infinite_tracing.trace_observer_host = os.environ.get("NEW_RELIC_INFINITE_TRACING_TRACE_OBSERVER_HOST", None)
_settings.infinite_tracing.trace_observer_port = _environ_as_int("NEW_RELIC_INFINITE_TRACING_TRACE_OBSERVER_PORT", 443)
_settings.infinite_tracing.compression = _environ_as_bool("NEW_RELIC_INFINITE_TRACING_COMPRESSION", default=True)
_settings.infinite_tracing.batching = _environ_as_bool("NEW_RELIC_INFINITE_TRACING_BATCHING", default=True)
_settings.infinite_tracing.ssl = True
_settings.infinite_tracing.span_queue_size = _environ_as_int("NEW_RELIC_INFINITE_TRACING_SPAN_QUEUE_SIZE", 10000)

Expand Down Expand Up @@ -939,7 +940,6 @@ def global_settings_dump(settings_object=None, serializable=False):
components = urlparse.urlparse(proxy_host)

if components.scheme:

netloc = create_obfuscated_netloc(components.username, components.password, components.hostname, obfuscated)

if components.port:
Expand Down Expand Up @@ -1062,14 +1062,14 @@ def apply_server_side_settings(server_side_config=None, settings=_settings):

# Overlay with agent server side configuration settings.

for (name, value) in agent_config.items():
for name, value in agent_config.items():
apply_config_setting(settings_snapshot, name, value)

# Overlay with global server side configuration settings.
# global server side configuration always takes precedence over the global
# server side configuration settings.

for (name, value) in server_side_config.items():
for name, value in server_side_config.items():
apply_config_setting(settings_snapshot, name, value)

event_harvest_config = server_side_config.get("event_harvest_config", {})
Expand Down
Loading

0 comments on commit 59abb6a

Please sign in to comment.