Skip to content

Commit

Permalink
Add event resources to the event cleanup cycle (#16907)
Browse files Browse the repository at this point in the history
Co-authored-by: Fabian Nobis <[email protected]>
  • Loading branch information
NobisIndustries and Fabian Nobis authored Jan 30, 2025
1 parent ab71dae commit c4ba671
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
17 changes: 13 additions & 4 deletions src/prefect/server/events/services/event_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,25 @@ async def trim() -> None:

try:
async with db.session_context() as session:
result = await session.execute(
resource_result = await session.execute(
sa.delete(db.EventResource).where(
db.EventResource.occurred < older_than
)
)
event_result = await session.execute(
sa.delete(db.Event).where(db.Event.occurred < older_than)
)
await session.commit()
if result.rowcount:

if resource_result.rowcount or event_result.rowcount:
logger.debug(
"Trimmed %s events older than %s.", result.rowcount, older_than
"Trimmed %s events and %s event resources older than %s.",
event_result.rowcount,
resource_result.rowcount,
older_than,
)
except Exception:
logger.exception("Error trimming events", exc_info=True)
logger.exception("Error trimming events and resources", exc_info=True)

async def flush_periodically():
try:
Expand Down
38 changes: 21 additions & 17 deletions tests/events/server/storage/test_event_persister.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from datetime import timedelta
from typing import TYPE_CHECKING, AsyncGenerator, Optional, Sequence
from typing import AsyncGenerator, Optional, Sequence
from uuid import UUID, uuid4

import pytest
Expand All @@ -9,6 +9,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.server.database import PrefectDBInterface, db_injector
from prefect.server.database.orm_models import ORMEventResource
from prefect.server.events.filters import EventFilter
from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.events.services import event_persister
Expand All @@ -17,9 +18,6 @@
from prefect.settings import PREFECT_EVENTS_RETENTION_PERIOD, temporary_settings
from prefect.types import DateTime

if TYPE_CHECKING:
from prefect.server.database.orm_models import ORMEventResource


@db_injector
async def get_event(db: PrefectDBInterface, id: UUID) -> Optional[ReceivedEvent]:
Expand All @@ -34,13 +32,12 @@ async def get_event(db: PrefectDBInterface, id: UUID) -> Optional[ReceivedEvent]


async def get_resources(
session: AsyncSession, id: UUID, db: PrefectDBInterface
session: AsyncSession, event_id_filter: Optional[UUID], db: PrefectDBInterface
) -> Sequence["ORMEventResource"]:
result = await session.execute(
sa.select(db.EventResource)
.where(db.EventResource.event_id == id)
.order_by(db.EventResource.resource_id)
)
query = sa.select(db.EventResource).order_by(db.EventResource.resource_id)
if event_id_filter:
query = query.where(db.EventResource.event_id == event_id_filter)
result = await session.execute(query)
return result.scalars().all()


Expand Down Expand Up @@ -295,8 +292,7 @@ async def test_flushes_messages_periodically(


async def test_trims_messages_periodically(
event: ReceivedEvent,
session: AsyncSession,
event: ReceivedEvent, session: AsyncSession, db: PrefectDBInterface
):
await write_events(
session,
Expand All @@ -314,21 +310,29 @@ async def test_trims_messages_periodically(

five_days_ago = DateTime.now("UTC") - timedelta(days=5)

initial_events, total, _ = await query_events(session, filter=EventFilter())
assert total == 10
initial_events, event_count, _ = await query_events(session, filter=EventFilter())
assert event_count == 10
assert len(initial_events) == 10
assert any(event.occurred < five_days_ago for event in initial_events)
assert any(event.occurred >= five_days_ago for event in initial_events)

initial_resources = await get_resources(session, None, db)
assert len(initial_resources) == 40
assert any(resource.occurred < five_days_ago for resource in initial_resources)
assert any(resource.occurred >= five_days_ago for resource in initial_resources)

with temporary_settings({PREFECT_EVENTS_RETENTION_PERIOD: timedelta(days=5)}):
async with event_persister.create_handler(
flush_every=timedelta(seconds=0.001),
trim_every=timedelta(seconds=0.001),
):
await asyncio.sleep(0.1) # this is 100x the time necessary

remaining_events, total, _ = await query_events(session, filter=EventFilter())
assert total == 5
remaining_events, event_count, _ = await query_events(session, filter=EventFilter())
assert event_count == 5
assert len(remaining_events) == 5

assert all(event.occurred >= five_days_ago for event in remaining_events)

remaining_resources = await get_resources(session, None, db)
assert len(remaining_resources) == 20
assert all(resource.occurred >= five_days_ago for resource in remaining_resources)

0 comments on commit c4ba671

Please sign in to comment.