From c4ba671256052758aa6ff6d9394c68728345e414 Mon Sep 17 00:00:00 2001 From: Fabian <77135162+NobisIndustries@users.noreply.github.com> Date: Thu, 30 Jan 2025 19:37:43 +0100 Subject: [PATCH] Add event resources to the event cleanup cycle (#16907) Co-authored-by: Fabian Nobis --- .../server/events/services/event_persister.py | 17 +++++++-- .../server/storage/test_event_persister.py | 38 ++++++++++--------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/prefect/server/events/services/event_persister.py b/src/prefect/server/events/services/event_persister.py index 23060715bb6f..d15ef6972555 100644 --- a/src/prefect/server/events/services/event_persister.py +++ b/src/prefect/server/events/services/event_persister.py @@ -127,16 +127,25 @@ async def trim() -> None: try: async with db.session_context() as session: - result = await session.execute( + resource_result = await session.execute( + sa.delete(db.EventResource).where( + db.EventResource.occurred < older_than + ) + ) + event_result = await session.execute( sa.delete(db.Event).where(db.Event.occurred < older_than) ) await session.commit() - if result.rowcount: + + if resource_result.rowcount or event_result.rowcount: logger.debug( - "Trimmed %s events older than %s.", result.rowcount, older_than + "Trimmed %s events and %s event resources older than %s.", + event_result.rowcount, + resource_result.rowcount, + older_than, ) except Exception: - logger.exception("Error trimming events", exc_info=True) + logger.exception("Error trimming events and resources", exc_info=True) async def flush_periodically(): try: diff --git a/tests/events/server/storage/test_event_persister.py b/tests/events/server/storage/test_event_persister.py index 53058e59b2a1..fec6a80a769e 100644 --- a/tests/events/server/storage/test_event_persister.py +++ b/tests/events/server/storage/test_event_persister.py @@ -1,6 +1,6 @@ import asyncio from datetime import timedelta -from typing import TYPE_CHECKING, AsyncGenerator, Optional, Sequence +from typing import AsyncGenerator, Optional, Sequence from uuid import UUID, uuid4 import pytest @@ -9,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from prefect.server.database import PrefectDBInterface, db_injector +from prefect.server.database.orm_models import ORMEventResource from prefect.server.events.filters import EventFilter from prefect.server.events.schemas.events import ReceivedEvent from prefect.server.events.services import event_persister @@ -17,9 +18,6 @@ from prefect.settings import PREFECT_EVENTS_RETENTION_PERIOD, temporary_settings from prefect.types import DateTime -if TYPE_CHECKING: - from prefect.server.database.orm_models import ORMEventResource - @db_injector async def get_event(db: PrefectDBInterface, id: UUID) -> Optional[ReceivedEvent]: @@ -34,13 +32,12 @@ async def get_event(db: PrefectDBInterface, id: UUID) -> Optional[ReceivedEvent] async def get_resources( - session: AsyncSession, id: UUID, db: PrefectDBInterface + session: AsyncSession, event_id_filter: Optional[UUID], db: PrefectDBInterface ) -> Sequence["ORMEventResource"]: - result = await session.execute( - sa.select(db.EventResource) - .where(db.EventResource.event_id == id) - .order_by(db.EventResource.resource_id) - ) + query = sa.select(db.EventResource).order_by(db.EventResource.resource_id) + if event_id_filter: + query = query.where(db.EventResource.event_id == event_id_filter) + result = await session.execute(query) return result.scalars().all() @@ -295,8 +292,7 @@ async def test_flushes_messages_periodically( async def test_trims_messages_periodically( - event: ReceivedEvent, - session: AsyncSession, + event: ReceivedEvent, session: AsyncSession, db: PrefectDBInterface ): await write_events( session, @@ -314,12 +310,17 @@ async def test_trims_messages_periodically( five_days_ago = DateTime.now("UTC") - timedelta(days=5) - initial_events, total, _ = await query_events(session, filter=EventFilter()) - assert total == 10 + initial_events, event_count, _ = await query_events(session, filter=EventFilter()) + assert event_count == 10 assert len(initial_events) == 10 assert any(event.occurred < five_days_ago for event in initial_events) assert any(event.occurred >= five_days_ago for event in initial_events) + initial_resources = await get_resources(session, None, db) + assert len(initial_resources) == 40 + assert any(resource.occurred < five_days_ago for resource in initial_resources) + assert any(resource.occurred >= five_days_ago for resource in initial_resources) + with temporary_settings({PREFECT_EVENTS_RETENTION_PERIOD: timedelta(days=5)}): async with event_persister.create_handler( flush_every=timedelta(seconds=0.001), @@ -327,8 +328,11 @@ async def test_trims_messages_periodically( ): await asyncio.sleep(0.1) # this is 100x the time necessary - remaining_events, total, _ = await query_events(session, filter=EventFilter()) - assert total == 5 + remaining_events, event_count, _ = await query_events(session, filter=EventFilter()) + assert event_count == 5 assert len(remaining_events) == 5 - assert all(event.occurred >= five_days_ago for event in remaining_events) + + remaining_resources = await get_resources(session, None, db) + assert len(remaining_resources) == 20 + assert all(resource.occurred >= five_days_ago for resource in remaining_resources)