Skip to content

Commit

Permalink
expanded load_celery_current_and_parent_ids to be able to utilize t…
Browse files Browse the repository at this point in the history
…he internally generated celery_id

* Added optional argument `use_internal_celery_task_id` to `load_celery_current_and_parent_ids`
* When `use_internal_celery_task_id` is set to True, the internal celery task ID will be used and the generator function will be ignored
* Updated README with information on this new argument
  • Loading branch information
David Pryor (dapryor) committed Sep 28, 2022
1 parent ec96801 commit 6fe14ef
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,15 @@ load_correlation_ids()
+ load_celery_current_and_parent_ids()
```

If you wish to correlate celery task IDs through the IDs found in your broker, use the `use_internal_celery_task_id` argument on `load_celery_current_and_parent_ids`
```diff
from asgi_correlation_id.extensions.celery import load_correlation_ids, load_celery_current_and_parent_ids

load_correlation_ids()
+ load_celery_current_and_parent_ids(use_internal_celery_task_id=True)
```
Note: `load_celery_current_and_parent_ids` will ignore the `generator` argument when `use_internal_celery_task_id` is set to `True`

To set up the additional log filters, update your log config like this:

```diff
Expand Down
9 changes: 6 additions & 3 deletions asgi_correlation_id/extensions/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def cleanup(**kwargs: Any) -> None:


def load_celery_current_and_parent_ids(
header_key: str = 'CELERY_PARENT_ID', generator: Callable[[], str] = uuid_hex_generator_fn
header_key: str = 'CELERY_PARENT_ID',
generator: Callable[[], str] = uuid_hex_generator_fn,
use_internal_celery_task_id: bool = False,
) -> None:
"""
Configure Celery event hooks for generating tracing IDs with depth.
Expand All @@ -88,15 +90,16 @@ def publish_task_from_worker_or_request(headers: Dict[str, str], **kwargs: Any)
headers[header_key] = current

@task_prerun.connect(weak=False)
def worker_prerun(task: 'Task', **kwargs: Any) -> None:
def worker_prerun(task_id: str, task: 'Task', **kwargs: Any) -> None:
"""
Set current ID, and parent ID if it exists.
"""
parent_id = task.request.get(header_key)
if parent_id:
celery_parent_id.set(parent_id)

celery_current_id.set(generator())
celery_id = task_id if use_internal_celery_task_id else generator()
celery_current_id.set(celery_id)

@task_postrun.connect(weak=False)
def clean_up(**kwargs: Any) -> None:
Expand Down

0 comments on commit 6fe14ef

Please sign in to comment.