Skip to content

Commit

Permalink
Fix gRPC URI Detection (#1137)
Browse files Browse the repository at this point in the history
* Fix gRPC URI detection

* [Mega-Linter] Apply linters fixes

* Bump tests

---------

Co-authored-by: TimPansino <[email protected]>
  • Loading branch information
TimPansino and TimPansino authored May 8, 2024
1 parent 8884497 commit df2b159
Showing 1 changed file with 45 additions and 78 deletions.
123 changes: 45 additions & 78 deletions newrelic/hooks/framework_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,30 @@
import time

from newrelic.api.external_trace import ExternalTrace
from newrelic.api.web_transaction import WebTransactionWrapper
from newrelic.api.transaction import current_transaction
from newrelic.api.time_trace import notice_error
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.api.transaction import current_transaction
from newrelic.api.web_transaction import WebTransactionWrapper
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import wrap_function_wrapper


def _get_uri_method(instance, *args, **kwargs):
target = instance._channel.target().decode('utf-8')
method = instance._method.decode('utf-8').lstrip('/')
uri = 'grpc://%s/%s' % (target, method)
target = instance._channel.target().decode("utf-8").lstrip("dns:///")
method = instance._method.decode("utf-8").lstrip("/")
uri = "grpc://%s/%s" % (target, method)
return (uri, method)


def _prepare_request(
transaction, guid, request,
timeout=None, metadata=None, *args, **kwargs):
def _prepare_request(transaction, guid, request, timeout=None, metadata=None, *args, **kwargs):
metadata = metadata and list(metadata) or []
dt_metadata = transaction._create_distributed_trace_data_with_guid(guid)
metadata.extend(
transaction._generate_distributed_trace_headers(dt_metadata)
)
metadata.extend(transaction._generate_distributed_trace_headers(dt_metadata))
args = (request, timeout, metadata) + args
return args, kwargs


def _prepare_request_stream(
transaction, guid, request_iterator, *args, **kwargs):
return _prepare_request(
transaction, guid, request_iterator, *args, **kwargs)
def _prepare_request_stream(transaction, guid, request_iterator, *args, **kwargs):
return _prepare_request(transaction, guid, request_iterator, *args, **kwargs)


def wrap_call(module, object_path, prepare):
Expand All @@ -56,7 +50,7 @@ def _call_wrapper(wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)

uri, method = _get_uri_method(instance)
with ExternalTrace('gRPC', uri, method, source=wrapped):
with ExternalTrace("gRPC", uri, method, source=wrapped):
args, kwargs = prepare(transaction, None, *args, **kwargs)
return wrapped(*args, **kwargs)

Expand All @@ -70,13 +64,13 @@ def _future_wrapper(wrapped, instance, args, kwargs):
if transaction is None:
return wrapped(*args, **kwargs)

guid = '%016x' % random.getrandbits(64)
guid = "%016x" % random.getrandbits(64)
uri, method = _get_uri_method(instance)

args, kwargs = prepare(transaction, guid, *args, **kwargs)
future = wrapped(*args, **kwargs)
future._nr_guid = guid
future._nr_args = {"library": 'gRPC', "url": uri, "method": method, "source": wrapped}
future._nr_args = {"library": "gRPC", "url": uri, "method": method, "source": wrapped}
future._nr_start_time = time.time()

# In non-streaming responses, result is typically called instead of
Expand All @@ -89,16 +83,16 @@ def _future_wrapper(wrapped, instance, args, kwargs):


def wrap_next(_wrapped, _instance, _args, _kwargs):
_nr_args = getattr(_instance, '_nr_args', None)
_nr_args = getattr(_instance, "_nr_args", None)
if not _nr_args:
return _wrapped(*_args, **_kwargs)

try:
return _wrapped(*_args, **_kwargs)
except Exception:
delattr(_instance, '_nr_args')
_nr_start_time = getattr(_instance, '_nr_start_time', 0.0)
_nr_guid = getattr(_instance, '_nr_guid', None)
delattr(_instance, "_nr_args")
_nr_start_time = getattr(_instance, "_nr_start_time", 0.0)
_nr_guid = getattr(_instance, "_nr_guid", None)

with ExternalTrace(**_nr_args) as t:
t.start_time = _nr_start_time or t.start_time
Expand All @@ -107,12 +101,12 @@ def wrap_next(_wrapped, _instance, _args, _kwargs):


def wrap_result(_wrapped, _instance, _args, _kwargs):
_nr_args = getattr(_instance, '_nr_args', None)
_nr_args = getattr(_instance, "_nr_args", None)
if not _nr_args:
return _wrapped(*_args, **_kwargs)
delattr(_instance, '_nr_args')
_nr_start_time = getattr(_instance, '_nr_start_time', 0.0)
_nr_guid = getattr(_instance, '_nr_guid', None)
delattr(_instance, "_nr_args")
_nr_start_time = getattr(_instance, "_nr_start_time", 0.0)
_nr_guid = getattr(_instance, "_nr_guid", None)

try:
result = _wrapped(*_args, **_kwargs)
Expand All @@ -136,31 +130,22 @@ def grpc_web_transaction(wrapped, instance, args, kwargs):
rpc_event, behavior = _bind_transaction_args(*args, **kwargs)
behavior_name = callable_name(behavior)

call_details = (
getattr(rpc_event, 'call_details', None) or
getattr(rpc_event, 'request_call_details', None))
call_details = getattr(rpc_event, "call_details", None) or getattr(rpc_event, "request_call_details", None)

metadata = (
getattr(rpc_event, 'invocation_metadata', None) or
getattr(rpc_event, 'request_metadata', None))
metadata = getattr(rpc_event, "invocation_metadata", None) or getattr(rpc_event, "request_metadata", None)

host = port = None
if call_details:
try:
host, port = call_details.host.split(b':', 1)
host, port = call_details.host.split(b":", 1)
except Exception:
pass

request_path = call_details.method

return WebTransactionWrapper(
wrapped,
name=behavior_name,
request_path=request_path,
host=host,
port=port,
headers=metadata,
source=behavior)(*args, **kwargs)
wrapped, name=behavior_name, request_path=request_path, host=host, port=port, headers=metadata, source=behavior
)(*args, **kwargs)


def _trailing_metadata(state, *args, **kwargs):
Expand All @@ -185,44 +170,26 @@ def _nr_wrap_abort(wrapped, instance, args, kwargs):


def instrument_grpc__channel(module):
wrap_call(module, '_UnaryUnaryMultiCallable.__call__',
_prepare_request)
wrap_call(module, '_UnaryUnaryMultiCallable.with_call',
_prepare_request)
wrap_future(module, '_UnaryUnaryMultiCallable.future',
_prepare_request)
wrap_future(module, '_UnaryStreamMultiCallable.__call__',
_prepare_request)
wrap_call(module, '_StreamUnaryMultiCallable.__call__',
_prepare_request_stream)
wrap_call(module, '_StreamUnaryMultiCallable.with_call',
_prepare_request_stream)
wrap_future(module, '_StreamUnaryMultiCallable.future',
_prepare_request_stream)
wrap_future(module, '_StreamStreamMultiCallable.__call__',
_prepare_request_stream)
if hasattr(module, '_MultiThreadedRendezvous'):
wrap_function_wrapper(module, '_MultiThreadedRendezvous.result',
wrap_result)
wrap_function_wrapper(module, '_MultiThreadedRendezvous._next',
wrap_next)
wrap_call(module, "_UnaryUnaryMultiCallable.__call__", _prepare_request)
wrap_call(module, "_UnaryUnaryMultiCallable.with_call", _prepare_request)
wrap_future(module, "_UnaryUnaryMultiCallable.future", _prepare_request)
wrap_future(module, "_UnaryStreamMultiCallable.__call__", _prepare_request)
wrap_call(module, "_StreamUnaryMultiCallable.__call__", _prepare_request_stream)
wrap_call(module, "_StreamUnaryMultiCallable.with_call", _prepare_request_stream)
wrap_future(module, "_StreamUnaryMultiCallable.future", _prepare_request_stream)
wrap_future(module, "_StreamStreamMultiCallable.__call__", _prepare_request_stream)
if hasattr(module, "_MultiThreadedRendezvous"):
wrap_function_wrapper(module, "_MultiThreadedRendezvous.result", wrap_result)
wrap_function_wrapper(module, "_MultiThreadedRendezvous._next", wrap_next)
else:
wrap_function_wrapper(module, '_Rendezvous.result',
wrap_result)
wrap_function_wrapper(module, '_Rendezvous._next',
wrap_next)
wrap_function_wrapper(module, '_Rendezvous.cancel',
wrap_result)
wrap_function_wrapper(module, "_Rendezvous.result", wrap_result)
wrap_function_wrapper(module, "_Rendezvous._next", wrap_next)
wrap_function_wrapper(module, "_Rendezvous.cancel", wrap_result)


def instrument_grpc_server(module):
wrap_function_wrapper(module, '_unary_response_in_pool',
grpc_web_transaction)
wrap_function_wrapper(module, '_stream_response_in_pool',
grpc_web_transaction)
wrap_function_wrapper(module, '_completion_code',
_nr_wrap_status_code)
wrap_function_wrapper(module, '_abortion_code',
_nr_wrap_status_code)
wrap_function_wrapper(module, '_abort',
_nr_wrap_abort)
wrap_function_wrapper(module, "_unary_response_in_pool", grpc_web_transaction)
wrap_function_wrapper(module, "_stream_response_in_pool", grpc_web_transaction)
wrap_function_wrapper(module, "_completion_code", _nr_wrap_status_code)
wrap_function_wrapper(module, "_abortion_code", _nr_wrap_status_code)
wrap_function_wrapper(module, "_abort", _nr_wrap_abort)

0 comments on commit df2b159

Please sign in to comment.