diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index 8cc6aff46d..fb36808ecf 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -48,6 +48,8 @@ "aws.lambda.eventSource.arn", "aws.operation", "aws.requestId", + "cloud.account.id", + "cloud.region", "code.filepath", "code.function", "code.lineno", @@ -59,8 +61,8 @@ "enduser.id", "error.class", "error.expected", - "error.message", "error.group.name", + "error.message", "graphql.field.name", "graphql.field.parentType", "graphql.field.path", @@ -74,6 +76,8 @@ "llm", "message.queueName", "message.routingKey", + "messaging.destination.name", + "messaging.system", "peer.address", "peer.hostname", "request.headers.accept", @@ -86,10 +90,7 @@ "response.headers.contentLength", "response.headers.contentType", "response.status", - "messaging.system", - "cloud.region", - "cloud.account.id", - "messaging.destination.name", + "server.address", ) ) diff --git a/newrelic/hooks/messagebroker_pika.py b/newrelic/hooks/messagebroker_pika.py index 5396e38070..624243a855 100644 --- a/newrelic/hooks/messagebroker_pika.py +++ b/newrelic/hooks/messagebroker_pika.py @@ -31,11 +31,25 @@ wrap_object, ) -_START_KEY = "_nr_start_time" KWARGS_ERROR = "Supportability/hooks/pika/kwargs_error" -def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time, queue_name=None): +def instance_info(channel): + # Only host is currently used, so we only extract that. + try: + connection = channel.connection + if not hasattr(connection, "params") and hasattr(connection, "_impl"): + # Check for _impl attribute used by BlockingConnection to wrap actual connection objects + connection = connection._impl + + host = connection.params.host + except Exception: + host = None + + return host + + +def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time, queue_name=None, channel=None): routing_key = None if hasattr(method, "routing_key"): routing_key = method.routing_key @@ -79,7 +93,16 @@ def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time, params=params, ) trace.__enter__() + + # Set start time and attributes after trace has started trace.start_time = nr_start_time + + # Extract host from channel to add as an agent attribute + host = instance_info(channel) + if trace and host: + trace._add_agent_attribute("server.address", host) + + # Exit trace immediately and complete trace.__exit__(None, None, None) @@ -126,9 +149,15 @@ def _nr_wrapper_basic_publish(wrapped, instance, args, kwargs): destination_name=exchange or "Default", params=params, source=wrapped, - ): + ) as trace: cat_headers = MessageTrace.generate_request_headers(transaction) properties.headers.update(cat_headers) + + # Extract host from channel to add as an agent attribute + host = instance_info(instance) + if trace and host: + trace._add_agent_attribute("server.address", host) + return wrapped(*args, **kwargs) @@ -144,9 +173,15 @@ def callback_wrapper(callback, _instance, _args, _kwargs): if not _kwargs: method, properties = _args[1:3] start_time = getattr(callback_wrapper, "_nr_start_time", None) + channel = getattr(callback_wrapper, "_nr_channel", None) _add_consume_rabbitmq_trace( - transaction, method=method, properties=properties, nr_start_time=start_time, queue_name=queue + transaction, + method=method, + properties=properties, + nr_start_time=start_time, + queue_name=queue, + channel=channel, ) else: m = transaction._transaction_metrics.get(KWARGS_ERROR, 0) @@ -155,7 +190,12 @@ def callback_wrapper(callback, _instance, _args, _kwargs): name = callable_name(callback) return FunctionTraceWrapper(callback, name=name)(*_args, **_kwargs) - callback_wrapper._nr_start_time = time.time() + try: + callback_wrapper._nr_start_time = time.time() + callback_wrapper._nr_channel = instance + except Exception: + pass + queue, args, kwargs = wrap_get(callback_wrapper, *args, **kwargs) return wrapped(*args, **kwargs) @@ -266,6 +306,11 @@ def _possibly_create_traces(yielded): ) bt.__enter__() + # Extract host from channel to add as an agent attribute + host = instance_info(instance) + if bt and host: + bt._add_agent_attribute("server.address", host) + return bt def _generator(generator): @@ -376,6 +421,11 @@ def callback_wrapper(wrapped, instance, args, kwargs): m = mt._transaction_metrics.get(KWARGS_ERROR, 0) mt._transaction_metrics[KWARGS_ERROR] = m + 1 + # Extract host from channel to add as an agent attribute + host = instance_info(channel) + if mt and host: + mt._add_agent_attribute("server.address", host) + return wrapped(*args, **kwargs) queue, args, kwargs = wrap_consume(callback_wrapper, *args, **kwargs) diff --git a/tests/messagebroker_pika/test_pika_async_connection_consume.py b/tests/messagebroker_pika/test_pika_async_connection_consume.py index 15d4b97358..88d7b9f34b 100644 --- a/tests/messagebroker_pika/test_pika_async_connection_consume.py +++ b/tests/messagebroker_pika/test_pika_async_connection_consume.py @@ -33,12 +33,14 @@ from testing_support.db_settings import rabbitmq_settings from testing_support.fixtures import ( capture_transaction_metrics, + dt_enabled, function_not_called, override_application_settings, ) from testing_support.validators.validate_code_level_metrics import ( validate_code_level_metrics, ) +from testing_support.validators.validate_span_events import validate_span_events from testing_support.validators.validate_transaction_metrics import ( validate_transaction_metrics, ) @@ -98,13 +100,19 @@ def handle_callback_exception(self, *args, **kwargs): @parametrized_connection @pytest.mark.parametrize("callback_as_partial", [True, False]) +@dt_enabled @validate_code_level_metrics( "test_pika_async_connection_consume.test_async_connection_basic_get_inside_txn.", "on_message", py2_namespace="test_pika_async_connection_consume", ) +@validate_span_events( + count=1, + exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) @validate_transaction_metrics( - ("test_pika_async_connection_consume:" "test_async_connection_basic_get_inside_txn"), + "test_pika_async_connection_consume:test_async_connection_basic_get_inside_txn", scoped_metrics=_test_select_conn_basic_get_inside_txn_metrics, rollup_metrics=_test_select_conn_basic_get_inside_txn_metrics, background_task=True, @@ -192,7 +200,7 @@ def on_open_connection(connection): ) @parametrized_connection @validate_transaction_metrics( - ("test_pika_async_connection_consume:" "test_async_connection_basic_get_inside_txn_no_callback"), + "test_pika_async_connection_consume:test_async_connection_basic_get_inside_txn_no_callback", scoped_metrics=_test_select_conn_basic_get_inside_txn_no_callback_metrics, rollup_metrics=_test_select_conn_basic_get_inside_txn_no_callback_metrics, background_task=True, @@ -228,7 +236,7 @@ def on_open_connection(connection): @parametrized_connection @pytest.mark.parametrize("callback_as_partial", [True, False]) @validate_transaction_metrics( - ("test_pika_async_connection_consume:" "test_async_connection_basic_get_empty"), + "test_pika_async_connection_consume:test_async_connection_basic_get_empty", scoped_metrics=_test_async_connection_basic_get_empty_metrics, rollup_metrics=_test_async_connection_basic_get_empty_metrics, background_task=True, @@ -285,7 +293,7 @@ def on_open_connection(connection): @parametrized_connection @validate_transaction_metrics( - ("test_pika_async_connection_consume:" "test_async_connection_basic_consume_inside_txn"), + "test_pika_async_connection_consume:test_async_connection_basic_consume_inside_txn", scoped_metrics=_test_select_conn_basic_consume_in_txn_metrics, rollup_metrics=_test_select_conn_basic_consume_in_txn_metrics, background_task=True, @@ -361,7 +369,7 @@ def on_open_connection(connection): @parametrized_connection @validate_transaction_metrics( - ("test_pika_async_connection_consume:" "test_async_connection_basic_consume_two_exchanges"), + "test_pika_async_connection_consume:test_async_connection_basic_consume_two_exchanges", scoped_metrics=_test_select_conn_basic_consume_two_exchanges, rollup_metrics=_test_select_conn_basic_consume_two_exchanges, background_task=True, diff --git a/tests/messagebroker_pika/test_pika_blocking_connection_consume.py b/tests/messagebroker_pika/test_pika_blocking_connection_consume.py index a161607366..07de09f201 100644 --- a/tests/messagebroker_pika/test_pika_blocking_connection_consume.py +++ b/tests/messagebroker_pika/test_pika_blocking_connection_consume.py @@ -20,10 +20,11 @@ from compat import basic_consume from conftest import BODY, CORRELATION_ID, EXCHANGE, HEADERS, QUEUE, REPLY_TO from testing_support.db_settings import rabbitmq_settings -from testing_support.fixtures import capture_transaction_metrics +from testing_support.fixtures import capture_transaction_metrics, dt_enabled from testing_support.validators.validate_code_level_metrics import ( validate_code_level_metrics, ) +from testing_support.validators.validate_span_events import validate_span_events from testing_support.validators.validate_transaction_metrics import ( validate_transaction_metrics, ) @@ -48,17 +49,23 @@ _test_blocking_connection_basic_get_metrics = [ ("MessageBroker/RabbitMQ/Exchange/Produce/Named/%s" % EXCHANGE, None), ("MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE, 1), - (("Function/pika.adapters.blocking_connection:" "_CallbackResult.set_value_once"), 1), + ("Function/pika.adapters.blocking_connection:_CallbackResult.set_value_once", 1), ] +@dt_enabled @validate_transaction_metrics( - ("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_get"), + "test_pika_blocking_connection_consume:test_blocking_connection_basic_get", scoped_metrics=_test_blocking_connection_basic_get_metrics, rollup_metrics=_test_blocking_connection_basic_get_metrics, background_task=True, ) @validate_tt_collector_json(message_broker_params=_message_broker_tt_params) +@validate_span_events( + count=1, + exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) @background_task() def test_blocking_connection_basic_get(producer): with pika.BlockingConnection(pika.ConnectionParameters(DB_SETTINGS["host"])) as connection: @@ -75,7 +82,7 @@ def test_blocking_connection_basic_get(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_get_empty"), + "test_pika_blocking_connection_consume:test_blocking_connection_basic_get_empty", scoped_metrics=_test_blocking_connection_basic_get_empty_metrics, rollup_metrics=_test_blocking_connection_basic_get_empty_metrics, background_task=True, @@ -137,13 +144,14 @@ def test_basic_get(): ) ) else: - _txn_name = "test_pika_blocking_connection_consume:" "on_message" + _txn_name = "test_pika_blocking_connection_consume:on_message" _test_blocking_conn_basic_consume_no_txn_metrics.append( ("Function/test_pika_blocking_connection_consume:on_message", None) ) @pytest.mark.parametrize("as_partial", [True, False]) +@dt_enabled @validate_code_level_metrics( "test_pika_blocking_connection_consume.test_blocking_connection_basic_consume_outside_transaction.", "on_message", @@ -157,6 +165,11 @@ def test_basic_get(): group="Message/RabbitMQ/Exchange/%s" % EXCHANGE, ) @validate_tt_collector_json(message_broker_params=_message_broker_tt_params) +@validate_span_events( + count=1, + exact_intrinsics={"name": "Message/RabbitMQ/Exchange/%s/%s" % (EXCHANGE, _txn_name)}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) def test_blocking_connection_basic_consume_outside_transaction(producer, as_partial): def on_message(channel, method_frame, header_frame, body): assert hasattr(method_frame, "_nr_start_time") @@ -200,13 +213,14 @@ def on_message(channel, method_frame, header_frame, body): @pytest.mark.parametrize("as_partial", [True, False]) +@dt_enabled @validate_code_level_metrics( "test_pika_blocking_connection_consume.test_blocking_connection_basic_consume_inside_txn.", "on_message", py2_namespace="test_pika_blocking_connection_consume", ) @validate_transaction_metrics( - ("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_consume_inside_txn"), + "test_pika_blocking_connection_consume:test_blocking_connection_basic_consume_inside_txn", scoped_metrics=_test_blocking_conn_basic_consume_in_txn_metrics, rollup_metrics=_test_blocking_conn_basic_consume_in_txn_metrics, background_task=True, @@ -257,7 +271,7 @@ def on_message(channel, method_frame, header_frame, body): @pytest.mark.parametrize("as_partial", [True, False]) @validate_transaction_metrics( - ("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_consume_stopped_txn"), + "test_pika_blocking_connection_consume:test_blocking_connection_basic_consume_stopped_txn", scoped_metrics=_test_blocking_conn_basic_consume_stopped_txn_metrics, rollup_metrics=_test_blocking_conn_basic_consume_stopped_txn_metrics, background_task=True, diff --git a/tests/messagebroker_pika/test_pika_blocking_connection_consume_generator.py b/tests/messagebroker_pika/test_pika_blocking_connection_consume_generator.py index 816b28323d..94f1088b18 100644 --- a/tests/messagebroker_pika/test_pika_blocking_connection_consume_generator.py +++ b/tests/messagebroker_pika/test_pika_blocking_connection_consume_generator.py @@ -42,7 +42,7 @@ @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_break"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_break", scoped_metrics=_test_blocking_connection_consume_metrics, rollup_metrics=_test_blocking_connection_consume_metrics, background_task=True, @@ -59,7 +59,7 @@ def test_blocking_connection_consume_break(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_connection_close"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_connection_close", scoped_metrics=_test_blocking_connection_consume_metrics, rollup_metrics=_test_blocking_connection_consume_metrics, background_task=True, @@ -83,7 +83,7 @@ def test_blocking_connection_consume_connection_close(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_timeout"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_timeout", scoped_metrics=_test_blocking_connection_consume_metrics, rollup_metrics=_test_blocking_connection_consume_metrics, background_task=True, @@ -107,7 +107,7 @@ def test_blocking_connection_consume_timeout(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_exception_in_for_loop"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_exception_in_for_loop", scoped_metrics=_test_blocking_connection_consume_metrics, rollup_metrics=_test_blocking_connection_consume_metrics, background_task=True, @@ -122,7 +122,7 @@ def test_blocking_connection_consume_exception_in_for_loop(producer): # We should still create the metric in this case even if there is # an exception for result in channel.consume(QUEUE): - 1 / 0 + 1 / 0 # noqa except ZeroDivisionError: # Expected error pass @@ -140,7 +140,7 @@ def test_blocking_connection_consume_exception_in_for_loop(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_exception_in_generator"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_exception_in_generator", scoped_metrics=_test_blocking_connection_consume_empty_metrics, rollup_metrics=_test_blocking_connection_consume_empty_metrics, background_task=True, @@ -172,7 +172,7 @@ def test_blocking_connection_consume_exception_in_generator(): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_many"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_many", scoped_metrics=_test_blocking_connection_consume_many_metrics, rollup_metrics=_test_blocking_connection_consume_many_metrics, background_task=True, @@ -193,7 +193,7 @@ def test_blocking_connection_consume_many(produce_five): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_using_methods"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_using_methods", scoped_metrics=_test_blocking_connection_consume_metrics, rollup_metrics=_test_blocking_connection_consume_metrics, background_task=True, @@ -316,7 +316,7 @@ def test_blocking_connection_consume_using_methods_outside_txn(producer): @validate_transaction_metrics( - ("test_pika_blocking_connection_consume_generator:" "test_blocking_connection_consume_exception_on_creation"), + "test_pika_blocking_connection_consume_generator:test_blocking_connection_consume_exception_on_creation", scoped_metrics=_test_blocking_connection_consume_empty_metrics, rollup_metrics=_test_blocking_connection_consume_empty_metrics, background_task=True, diff --git a/tests/messagebroker_pika/test_pika_instance_info.py b/tests/messagebroker_pika/test_pika_instance_info.py new file mode 100644 index 0000000000..3111a4976e --- /dev/null +++ b/tests/messagebroker_pika/test_pika_instance_info.py @@ -0,0 +1,96 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pika +import pika.adapters +import pika.adapters.tornado_connection +import pytest +from testing_support.db_settings import rabbitmq_settings + +from newrelic.hooks.messagebroker_pika import instance_info + +DB_SETTINGS = rabbitmq_settings()[0] +EXPECTED_HOST = DB_SETTINGS["host"] +CONNECTION_PARAMS = [ + pika.ConnectionParameters(host=DB_SETTINGS["host"], port=DB_SETTINGS["port"]), + pika.URLParameters("amqp://%s:%s" % (DB_SETTINGS["host"], DB_SETTINGS["port"])), +] + + +@pytest.mark.parametrize("params", CONNECTION_PARAMS) +def test_instance_info_blocking_connection(params): + with pika.BlockingConnection(params) as connection: + channel = connection.channel() + host = instance_info(channel) + assert host == EXPECTED_HOST + + +@pytest.mark.parametrize("params", CONNECTION_PARAMS) +def test_instance_info_select_connection(params): + _channel = [] + + def on_open(connection): + connection.channel(on_open_callback=on_channel_open) + + def on_channel_open(channel): + _channel.append(channel) + + connection.close() + connection.ioloop.stop() + + parameters = pika.ConnectionParameters(DB_SETTINGS["host"]) + connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) + + try: + connection.ioloop.start() + except: + connection.close() + # Start the IOLoop again so Pika can communicate, it will stop on its + # own when the connection is closed + connection.ioloop.start() + raise + + channel = _channel.pop() + host = instance_info(channel) + assert host == EXPECTED_HOST + + +@pytest.mark.parametrize("params", CONNECTION_PARAMS) +def test_instance_info_tornado_connection(params): + _channel = [] + + def on_open(connection): + connection.channel(on_open_callback=on_channel_open) + + def on_channel_open(channel): + _channel.append(channel) + + connection.close() + connection.ioloop.stop() + + parameters = pika.ConnectionParameters(DB_SETTINGS["host"]) + connection = pika.adapters.tornado_connection.TornadoConnection(parameters=parameters, on_open_callback=on_open) + + try: + connection.ioloop.start() + except: + connection.close() + # Start the IOLoop again so Pika can communicate, it will stop on its + # own when the connection is closed + connection.ioloop.start() + raise + + channel = _channel.pop() + host = instance_info(channel) + assert host == EXPECTED_HOST diff --git a/tests/messagebroker_pika/test_pika_produce.py b/tests/messagebroker_pika/test_pika_produce.py index dbc9af030a..ab1a747467 100644 --- a/tests/messagebroker_pika/test_pika_produce.py +++ b/tests/messagebroker_pika/test_pika_produce.py @@ -15,10 +15,11 @@ import pika import pytest from testing_support.db_settings import rabbitmq_settings -from testing_support.fixtures import override_application_settings +from testing_support.fixtures import dt_enabled, override_application_settings from testing_support.validators.validate_messagebroker_headers import ( validate_messagebroker_headers, ) +from testing_support.validators.validate_span_events import validate_span_events from testing_support.validators.validate_transaction_metrics import ( validate_transaction_metrics, ) @@ -62,6 +63,7 @@ def cache_pika_headers(wrapped, instance, args, kwargs): ] +@dt_enabled @validate_transaction_metrics( "test_pika_produce:test_blocking_connection", scoped_metrics=_test_blocking_connection_metrics, @@ -72,6 +74,11 @@ def cache_pika_headers(wrapped, instance, args, kwargs): message_broker_params=_message_broker_tt_included_params, message_broker_forgone_params=_message_broker_tt_forgone_params, ) +@validate_span_events( + count=2, + exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) @background_task() @validate_messagebroker_headers @cache_pika_headers @@ -334,6 +341,7 @@ def test_blocking_connection_two_exchanges(): ] +@dt_enabled @validate_transaction_metrics( "test_pika_produce:test_select_connection", scoped_metrics=_test_select_connection_metrics, @@ -344,6 +352,11 @@ def test_blocking_connection_two_exchanges(): message_broker_params=_message_broker_tt_included_params, message_broker_forgone_params=_message_broker_tt_forgone_params, ) +@validate_span_events( + count=1, + exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) @background_task() @validate_messagebroker_headers @cache_pika_headers @@ -379,6 +392,7 @@ def on_channel_open(channel): ] +@dt_enabled @validate_transaction_metrics( "test_pika_produce:test_tornado_connection", scoped_metrics=_test_tornado_connection_metrics, @@ -389,6 +403,11 @@ def on_channel_open(channel): message_broker_params=_message_broker_tt_included_params, message_broker_forgone_params=_message_broker_tt_forgone_params, ) +@validate_span_events( + count=1, + exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"}, + exact_agents={"server.address": DB_SETTINGS["host"]}, +) @background_task() @validate_messagebroker_headers @cache_pika_headers diff --git a/tests/messagebroker_pika/test_pika_supportability.py b/tests/messagebroker_pika/test_pika_supportability.py index 5ab56f0b42..dacd68ca9f 100644 --- a/tests/messagebroker_pika/test_pika_supportability.py +++ b/tests/messagebroker_pika/test_pika_supportability.py @@ -56,7 +56,7 @@ def _create_channel(self, channel_number, on_open_callback): @validate_transaction_metrics( - ("test_pika_supportability:" "test_select_connection_supportability_in_txn"), + "test_pika_supportability:test_select_connection_supportability_in_txn", scoped_metrics=(), rollup_metrics=_test_select_connection_supportability_metrics, background_task=True,