Skip to content

Commit

Permalink
Add starlette and fastapi instrumentation. (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
newrelic-opensource committed Oct 22, 2020
1 parent 90363fa commit f02b59f
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 63 deletions.
16 changes: 15 additions & 1 deletion newrelic/api/asgi_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import functools

import newrelic.packages.asgiref_compatibility as asgiref_compatibility
Expand Down Expand Up @@ -247,8 +248,21 @@ def __init__(self, application, scope, receive, send):
if self._settings:
self.capture_params = self._settings.capture_params

def __exit__(self, exc, value, tb):
if getattr(value, "_nr_ignored", False):
exc, value, tb = None, None, None
return super(ASGIWebTransaction, self).__exit__(exc, value, tb)

async def send(self, event):
if event["type"] == "http.response.start":
if (
event["type"] == "http.response.body"
and not event.get("more_body", False)
):
try:
return await self._send(event)
finally:
self.__exit__(*sys.exc_info())
elif event["type"] == "http.response.start":
self.process_response(event["status"], event.get("headers", ()))
return await self._send(event)

Expand Down
4 changes: 2 additions & 2 deletions newrelic/common/async_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import time
import newrelic.packages.six as six

from newrelic.common.coroutine import (is_coroutine_function,
from newrelic.common.coroutine import (is_coroutine_callable,
is_asyncio_coroutine, is_generator_function)
from newrelic.common.object_wrapper import ObjectProxy
from newrelic.core.trace_cache import trace_cache
Expand Down Expand Up @@ -159,7 +159,7 @@ def __await__(self):


def async_proxy(wrapped):
if is_coroutine_function(wrapped):
if is_coroutine_callable(wrapped):
return CoroutineProxy
elif is_generator_function(wrapped):
if is_asyncio_coroutine(wrapped):
Expand Down
4 changes: 2 additions & 2 deletions newrelic/common/async_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import textwrap
import functools
from newrelic.common.coroutine import (
is_coroutine_function,
is_coroutine_callable,
is_asyncio_coroutine,
is_generator_function,
)
Expand Down Expand Up @@ -82,7 +82,7 @@ def wrapper(*args, **kwargs):


def async_wrapper(wrapped):
if is_coroutine_function(wrapped):
if is_coroutine_callable(wrapped):
return coroutine_wrapper
elif is_generator_function(wrapped):
if is_asyncio_coroutine(wrapped):
Expand Down
4 changes: 4 additions & 0 deletions newrelic/common/coroutine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ def is_generator_function(wrapped):

def _iscoroutinefunction_tornado(fn):
return hasattr(fn, '__tornado_coroutine__')


def is_coroutine_callable(wrapped):
return is_coroutine_function(wrapped) or is_coroutine_function(getattr(wrapped, "__call__", None))
23 changes: 23 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,10 @@ def _process_module_builtin_defaults():
'newrelic.hooks.framework_falcon',
'instrument_falcon_routing_util')

_process_module_definition('fastapi.routing',
'newrelic.hooks.framework_fastapi',
'instrument_fastapi_routing')

_process_module_definition('flask.app',
'newrelic.hooks.framework_flask',
'instrument_flask_app')
Expand Down Expand Up @@ -2359,6 +2363,25 @@ def _process_module_builtin_defaults():
'newrelic.hooks.external_urllib3',
'instrument_urllib3_connection')

_process_module_definition('starlette.requests',
'newrelic.hooks.framework_starlette',
'instrument_starlette_requests')
_process_module_definition('starlette.routing',
'newrelic.hooks.framework_starlette',
'instrument_starlette_routing')
_process_module_definition('starlette.applications',
'newrelic.hooks.framework_starlette',
'instrument_starlette_applications')
_process_module_definition('starlette.middleware.errors',
'newrelic.hooks.framework_starlette',
'instrument_starlette_middleware_errors')
_process_module_definition('starlette.exceptions',
'newrelic.hooks.framework_starlette',
'instrument_starlette_exceptions')
_process_module_definition('starlette.background',
'newrelic.hooks.framework_starlette',
'instrument_starlette_background_task')

_process_module_definition('uvicorn.config',
'newrelic.hooks.adapter_uvicorn',
'instrument_uvicorn_config')
Expand Down
102 changes: 61 additions & 41 deletions newrelic/core/trace_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def current_task(asyncio):
if not asyncio:
return

current_task = getattr(asyncio, 'current_task', None)
current_task = getattr(asyncio, "current_task", None)
if current_task is None:
current_task = getattr(asyncio.Task, 'current_task', None)
current_task = getattr(asyncio.Task, "current_task", None)

try:
return current_task()
Expand All @@ -52,9 +52,9 @@ def all_tasks(asyncio):
if not asyncio:
return

all_tasks = getattr(asyncio, 'all_tasks', None)
all_tasks = getattr(asyncio, "all_tasks", None)
if all_tasks is None:
all_tasks = getattr(asyncio.Task, 'all_tasks', None)
all_tasks = getattr(asyncio.Task, "all_tasks", None)

try:
return all_tasks()
Expand All @@ -63,15 +63,14 @@ def all_tasks(asyncio):


def get_event_loop(task):
get_loop = getattr(task, 'get_loop', None)
get_loop = getattr(task, "get_loop", None)
if get_loop:
return get_loop()
else:
return getattr(task, '_loop', None)
return getattr(task, "_loop", None)


class cached_module(object):

def __init__(self, module_path, name=None):
self.module_path = module_path
self.name = name or module_path
Expand Down Expand Up @@ -172,9 +171,9 @@ def active_threads(self):
transaction = trace and trace.transaction
if transaction is not None:
if transaction.background_task:
yield transaction, thread_id, 'BACKGROUND', frame
yield transaction, thread_id, "BACKGROUND", frame
else:
yield transaction, thread_id, 'REQUEST', frame
yield transaction, thread_id, "REQUEST", frame
else:
# Note that there may not always be a thread object.
# This is because thread could have been created direct
Expand All @@ -183,10 +182,10 @@ def active_threads(self):
# obtain a name for as being 'OTHER'.

thread = threading._active.get(thread_id)
if thread is not None and thread.getName().startswith('NR-'):
yield None, thread_id, 'AGENT', frame
if thread is not None and thread.getName().startswith("NR-"):
yield None, thread_id, "AGENT", frame
else:
yield None, thread_id, 'OTHER', frame
yield None, thread_id, "OTHER", frame

# Now yield up those corresponding to greenlets. Right now only
# doing this for greenlets in which any active transactions are
Expand All @@ -202,11 +201,9 @@ def active_threads(self):
gr = transaction._greenlet()
if gr and gr.gr_frame is not None:
if transaction.background_task:
yield (transaction, thread_id,
'BACKGROUND', gr.gr_frame)
yield (transaction, thread_id, "BACKGROUND", gr.gr_frame)
else:
yield (transaction, thread_id,
'REQUEST', gr.gr_frame)
yield (transaction, thread_id, "REQUEST", gr.gr_frame)

def prepare_for_root(self):
"""Updates the cache state so that a new root can be created if the
Expand All @@ -217,7 +214,7 @@ def prepare_for_root(self):
if not trace:
return None

if not hasattr(trace, '_task'):
if not hasattr(trace, "_task"):
return trace

task = current_task(self.asyncio)
Expand All @@ -243,15 +240,16 @@ def save_trace(self, trace):

if thread_id in self._cache:
cache_root = self._cache[thread_id].root
if (cache_root and cache_root is not trace.root and
not cache_root.exited):
if cache_root and cache_root is not trace.root and not cache_root.exited:
# Cached trace exists and has a valid root still
_logger.error('Runtime instrumentation error. Attempt to '
'save a trace from an inactive transaction. '
'Report this issue to New Relic support.\n%s',
''.join(traceback.format_stack()[:-1]))
_logger.error(
"Runtime instrumentation error. Attempt to "
"save a trace from an inactive transaction. "
"Report this issue to New Relic support.\n%s",
"".join(traceback.format_stack()[:-1]),
)

raise TraceCacheActiveTraceError('transaction already active')
raise TraceCacheActiveTraceError("transaction already active")

self._cache[thread_id] = trace

Expand All @@ -266,21 +264,39 @@ def save_trace(self, trace):

trace._greenlet = None

if hasattr(sys, '_current_frames'):
if hasattr(sys, "_current_frames"):
if thread_id not in sys._current_frames():
if self.greenlet:
trace._greenlet = weakref.ref(self.greenlet.getcurrent())

if self.asyncio and not hasattr(trace, '_task'):
if self.asyncio and not hasattr(trace, "_task"):
task = current_task(self.asyncio)
trace._task = task

def thread_start(self, trace):
current_thread_id = self.current_thread_id()
if current_thread_id not in self._cache:
self._cache[current_thread_id] = trace
else:
_logger.error(
"Runtime instrumentation error. An active "
"trace already exists in the cache on thread_id %s. Report "
"this issue to New Relic support.\n ", current_thread_id
)
return None

return current_thread_id

def thread_stop(self, thread_id):
if thread_id:
self._cache.pop(thread_id, None)

def pop_current(self, trace):
"""Restore the trace's parent under the thread ID of the current
executing thread."""

if hasattr(trace, '_task'):
delattr(trace, '_task')
if hasattr(trace, "_task"):
delattr(trace, "_task")

thread_id = trace.thread_id
parent = trace.parent
Expand All @@ -294,7 +310,7 @@ def complete_root(self, root):
"""

if hasattr(root, '_task'):
if hasattr(root, "_task"):
if root.has_outstanding_children():
task_ids = (id(task) for task in all_tasks(self.asyncio))

Expand All @@ -319,17 +335,19 @@ def complete_root(self, root):
if thread_id not in self._cache:
thread_id = self.current_thread_id()
if thread_id not in self._cache:
raise TraceCacheNoActiveTraceError('no active trace')
raise TraceCacheNoActiveTraceError("no active trace")

current = self._cache.get(thread_id)

if root is not current:
_logger.error('Runtime instrumentation error. Attempt to '
'drop the root when it is not the current '
'trace. Report this issue to New Relic support.\n%s',
''.join(traceback.format_stack()[:-1]))
_logger.error(
"Runtime instrumentation error. Attempt to "
"drop the root when it is not the current "
"trace. Report this issue to New Relic support.\n%s",
"".join(traceback.format_stack()[:-1]),
)

raise RuntimeError('not the current trace')
raise RuntimeError("not the current trace")

del self._cache[thread_id]
root._greenlet = None
Expand All @@ -354,26 +372,28 @@ def record_event_loop_wait(self, start_time, end_time):
roots = set()
seen = set()

task = getattr(transaction.root_span, '_task', None)
task = getattr(transaction.root_span, "_task", None)
loop = get_event_loop(task)

for trace in self._cache.values():
if trace in seen:
continue

# If the trace is on a different transaction and it's asyncio
if (trace.transaction is not transaction and
getattr(trace, '_task', None) is not None and
get_event_loop(trace._task) is loop and
trace._is_leaf()):
if (
trace.transaction is not transaction
and getattr(trace, "_task", None) is not None
and get_event_loop(trace._task) is loop
and trace._is_leaf()
):
trace.exclusive -= duration
roots.add(trace.root)
seen.add(trace)

seen = None

for root in roots:
guid = '%016x' % random.getrandbits(64)
guid = "%016x" % random.getrandbits(64)
node = LoopNode(
fetch_name=fetch_name,
start_time=start_time,
Expand Down
4 changes: 2 additions & 2 deletions newrelic/hooks/adapter_gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

from newrelic.api.wsgi_application import WSGIApplicationWrapper
from newrelic.common.object_wrapper import wrap_out_function
from newrelic.common.coroutine import (is_coroutine_function,
from newrelic.common.coroutine import (is_coroutine_callable,
is_asyncio_coroutine)


def is_coroutine(fn):
return is_coroutine_function(fn) or is_asyncio_coroutine(fn)
return is_coroutine_callable(fn) or is_asyncio_coroutine(fn)


def _nr_wrapper_Application_wsgi_(application):
Expand Down
4 changes: 2 additions & 2 deletions newrelic/hooks/framework_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from newrelic.api.function_trace import function_trace
from newrelic.api.transaction import current_transaction, ignore_transaction
from newrelic.api.web_transaction import web_transaction
from newrelic.common.async_wrapper import is_coroutine_function, async_wrapper
from newrelic.common.async_wrapper import is_coroutine_callable, async_wrapper
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import (wrap_function_wrapper,
function_wrapper, ObjectProxy)
Expand Down Expand Up @@ -219,7 +219,7 @@ def _nr_aiohttp_add_cat_headers_(wrapped, instance, args, kwargs):
tmp = instance.headers
instance.headers = HeaderProxy(tmp, cat_headers)

if is_coroutine_function(wrapped):
if is_coroutine_callable(wrapped):
@asyncio.coroutine
def new_coro():
try:
Expand Down
Loading

0 comments on commit f02b59f

Please sign in to comment.