Skip to content

Commit

Permalink
Add asyncpg instrumentation. Closes: #15
Browse files Browse the repository at this point in the history
  • Loading branch information
a-feld committed Sep 21, 2020
1 parent 92e3701 commit 2cb4a44
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 0 deletions.
7 changes: 7 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,13 @@ def _process_module_builtin_defaults():
'newrelic.hooks.database_psycopg2cffi',
'instrument_psycopg2cffi_extensions')

_process_module_definition('asyncpg.connect_utils',
'newrelic.hooks.database_asyncpg',
'instrument_asyncpg_connect_utils')
_process_module_definition('asyncpg.protocol',
'newrelic.hooks.database_asyncpg',
'instrument_asyncpg_protocol')

_process_module_definition('postgresql.driver.dbapi20',
'newrelic.hooks.database_postgresql',
'instrument_postgresql_driver_dbapi20')
Expand Down
1 change: 1 addition & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
'aws.lambda.coldStart',
'aws.lambda.eventSource.arn',
'db.instance',
'db.operation',
'db.statement',
'error.class',
'error.message',
Expand Down
3 changes: 3 additions & 0 deletions newrelic/core/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ def _parse_alter(sql):
'alter': None,
'commit': None,
'rollback': None,
'begin': None,
'prepare': None,
'copy': None,
}

_parse_operation_p = r'(\w+)'
Expand Down
5 changes: 5 additions & 0 deletions newrelic/core/datastore_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,8 @@ def trace_node(self, stats, root, connections):
return newrelic.core.trace_node.TraceNode(start_time=start_time,
end_time=end_time, name=name, params=params, children=children,
label=None)

def span_event(self, *args, **kwargs):
if self.operation:
self.agent_attributes["db.operation"] = self.operation
return super(DatastoreNode, self).span_event(*args, **kwargs)
146 changes: 146 additions & 0 deletions newrelic/hooks/database_asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from newrelic.api.database_trace import (
DatabaseTrace,
enable_datastore_instance_feature,
register_database_client,
)
from newrelic.api.datastore_trace import DatastoreTrace
from newrelic.common.object_wrapper import ObjectProxy, wrap_function_wrapper


class PostgresApi(object):
@staticmethod
def _instance_info(addr, connected_fut, con_params, *args, **kwargs):
if isinstance(addr, str):
host = "localhost"
port = addr
else:
host, port = addr

return (host, port, getattr(con_params, "database", None))

@classmethod
def instance_info(cls, args, kwargs):
return cls._instance_info(*args, **kwargs)


register_database_client(
PostgresApi,
"Postgres",
quoting_style="single+dollar",
instance_info=PostgresApi.instance_info,
)
enable_datastore_instance_feature(PostgresApi)


class ProtocolProxy(ObjectProxy):
async def bind_execute(self, state, *args, **kwargs):
with DatabaseTrace(
state.query,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.bind_execute(state, *args, **kwargs)

async def bind_execute_many(self, state, *args, **kwargs):
with DatabaseTrace(
state.query,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.bind_execute_many(state, *args, **kwargs)

async def bind(self, state, *args, **kwargs):
with DatabaseTrace(
state.query,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.bind(state, *args, **kwargs)

async def execute(self, state, *args, **kwargs):
with DatabaseTrace(
state.query,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.execute(state, *args, **kwargs)

async def query(self, query, *args, **kwargs):
with DatabaseTrace(
query,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.query(query, *args, **kwargs)

async def prepare(self, stmt_name, query, *args, **kwargs):
with DatabaseTrace(
"PREPARE {stmt_name} FROM '{query}'".format(
stmt_name=stmt_name, query=query
),
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.prepare(stmt_name, query, *args, **kwargs)

async def copy_in(self, copy_stmt, *args, **kwargs):
with DatabaseTrace(
copy_stmt,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.copy_in(copy_stmt, *args, **kwargs)

async def copy_out(self, copy_stmt, *args, **kwargs):
with DatabaseTrace(
copy_stmt,
dbapi2_module=PostgresApi,
connect_params=getattr(self, "_nr_connect_params", None),
):
return await self.__wrapped__.copy_out(copy_stmt, *args, **kwargs)


def proxy_protocol(wrapped, instance, args, kwargs):
proxy = ProtocolProxy(wrapped(*args, **kwargs))
proxy._nr_connect_params = (args, kwargs)
return proxy


def wrap_connect(wrapped, instance, args, kwargs):
host = port = database_name = None
if "addr" in kwargs:
host, port, database_name = PostgresApi._instance_info(
kwargs["addr"], None, kwargs.get("params")
)

with DatastoreTrace(
PostgresApi._nr_database_product,
None,
"connect",
host=host,
port_path_or_id=port,
database_name=database_name,
):
return wrapped(*args, **kwargs)


def instrument_asyncpg_protocol(module):
wrap_function_wrapper(module, "Protocol", proxy_protocol)


def instrument_asyncpg_connect_utils(module):
wrap_function_wrapper(module, "_connect_addr", wrap_connect)

0 comments on commit 2cb4a44

Please sign in to comment.