From 6fe14ef7d572fbfcfe2ee55a7622fecc6c33afa4 Mon Sep 17 00:00:00 2001 From: "David Pryor (dapryor)" Date: Wed, 28 Sep 2022 17:15:50 -0400 Subject: [PATCH] expanded `load_celery_current_and_parent_ids` to be able to utilize the internally generated celery_id * Added optional argument `use_internal_celery_task_id` to `load_celery_current_and_parent_ids` * When `use_internal_celery_task_id` is set to True, the internal celery task ID will be used and the generator function will be ignored * Updated README with information on this new argument --- README.md | 9 +++++++++ asgi_correlation_id/extensions/celery.py | 9 ++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1aff9d1..55b5271 100644 --- a/README.md +++ b/README.md @@ -484,6 +484,15 @@ load_correlation_ids() + load_celery_current_and_parent_ids() ``` +If you wish to correlate celery task IDs through the IDs found in your broker, use the `use_internal_celery_task_id` argument on `load_celery_current_and_parent_ids` +```diff +from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids + +load_correlation_ids() ++ load_celery_current_and_parent_ids(use_internal_celery_task_id=True) +``` +Note: `load_celery_current_and_parent_ids` will ignore the `generator` argument when `use_internal_celery_task_id` is set to `True` + To set up the additional log filters, update your log config like this: ```diff diff --git a/asgi_correlation_id/extensions/celery.py b/asgi_correlation_id/extensions/celery.py index 7656fb4..bdd443a 100644 --- a/asgi_correlation_id/extensions/celery.py +++ b/asgi_correlation_id/extensions/celery.py @@ -65,7 +65,9 @@ def cleanup(**kwargs: Any) -> None: def load_celery_current_and_parent_ids( - header_key: str = 'CELERY_PARENT_ID', generator: Callable[[], str] = uuid_hex_generator_fn + header_key: str = 'CELERY_PARENT_ID', + generator: Callable[[], str] = uuid_hex_generator_fn, + use_internal_celery_task_id: bool = False, ) -> None: """ Configure Celery event hooks for generating tracing IDs with depth. @@ -88,7 +90,7 @@ def publish_task_from_worker_or_request(headers: Dict[str, str], **kwargs: Any) headers[header_key] = current @task_prerun.connect(weak=False) - def worker_prerun(task: 'Task', **kwargs: Any) -> None: + def worker_prerun(task_id: str, task: 'Task', **kwargs: Any) -> None: """ Set current ID, and parent ID if it exists. """ @@ -96,7 +98,8 @@ def worker_prerun(task: 'Task', **kwargs: Any) -> None: if parent_id: celery_parent_id.set(parent_id) - celery_current_id.set(generator()) + celery_id = task_id if use_internal_celery_task_id else generator() + celery_current_id.set(celery_id) @task_postrun.connect(weak=False) def clean_up(**kwargs: Any) -> None: