Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6313] allow raw data to expire #826

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions how-to/use-the-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ query params:
- `record_identifier` (required): a source-specific identifier for the metadata record (no format restrictions) -- sending another record with the same `record_identifier` is considered a full update (only the most recent is used)
- `nonurgent`: if present (regardless of value), ingestion may be given a lower priority -- recommended for bulk or background operations
- `is_supplementary`: if present (regardless of value), this record's metadata will be added to all pre-existing index-cards from the same user with the same `focus_iri` (if any), but will not get an index-card of its own nor affect the last-updated timestamp (e.g. in OAI-PMH) of the index-cards it supplements
- `expiration_date`: optional date (in format `YYYY-MM-DD`) when the record is no longer valid and should be removed

## Deleting index-cards

Expand Down
4 changes: 4 additions & 0 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ def split(string, delim):
'task': 'share.tasks.harvest',
'schedule': 120,
},
'Expel expired data': {
'task': 'trove.digestive_tract.task__expel_expired_data',
'schedule': crontab(hour=0, minute=0), # every day at midnight UTC
},
}

if not DEBUG:
Expand Down
4 changes: 1 addition & 3 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
SourceUniqueIdentifier,
)
from trove import digestive_tract
from trove.models import Indexcard


class ShareAdminSite(admin.AdminSite):
Expand Down Expand Up @@ -292,8 +291,7 @@ def reingest(self, request, queryset):
def delete_cards_for_suid(self, request, queryset):
for suid in queryset:
FormattedMetadataRecord.objects.delete_formatted_records(suid)
for _indexcard in Indexcard.objects.filter(source_record_suid__in=queryset):
_indexcard.pls_delete()
digestive_tract.expel_suid(suid)

def get_search_results(self, request, queryset, search_term):
if not search_term:
Expand Down
18 changes: 18 additions & 0 deletions share/migrations/0075_rawdatum_expiration_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.25 on 2024-10-14 15:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('share', '0074_sourceuniqueidentifier_is_supplementary'),
]

operations = [
migrations.AddField(
model_name='rawdatum',
name='expiration_date',
field=models.DateField(blank=True, help_text='An (optional) date after which this datum is no longer valid.', null=True),
),
]
17 changes: 17 additions & 0 deletions share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db import migrations, models
from django.contrib.postgres.operations import AddIndexConcurrently


class Migration(migrations.Migration):
atomic = False # allow adding indexes concurrently (without locking tables)

dependencies = [
('share', '0075_rawdatum_expiration_date'),
]

operations = [
AddIndexConcurrently(
model_name='rawdatum',
index=models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
),
]
27 changes: 25 additions & 2 deletions share/models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,23 @@ def store_data(self, config, fetch_result):

return rd

def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: datetime.datetime):
def store_datum_for_suid(
self,
*,
suid,
datum: str,
mediatype: str | None, # `None` indicates sharev2-legacy ingestion
datestamp: datetime.datetime,
expiration_date: datetime.date | None = None,
):
_raw, _raw_created = self.get_or_create(
suid=suid,
sha256=hashlib.sha256(datum.encode()).hexdigest(),
defaults={
'datum': datum,
'mediatype': mediatype,
'datestamp': datestamp,
'expiration_date': expiration_date,
},
)
if not _raw_created:
Expand All @@ -371,10 +380,11 @@ def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: dateti
logger.critical(_msg)
sentry_sdk.capture_message(_msg)
_raw.mediatype = mediatype
_raw.expiration_date = expiration_date
# keep the latest datestamp
if (not _raw.datestamp) or (datestamp > _raw.datestamp):
_raw.datestamp = datestamp
_raw.save(update_fields=('mediatype', 'datestamp'))
_raw.save(update_fields=('mediatype', 'datestamp', 'expiration_date'))
return _raw

def latest_by_suid_id(self, suid_id) -> models.QuerySet:
Expand Down Expand Up @@ -420,6 +430,11 @@ class RawDatum(models.Model):
'This may be, but is not limited to, a deletion, modification, publication, or creation datestamp. '
'Ideally, this datetime should be appropriate for determining the chronological order its data will be applied.'
))
expiration_date = models.DateField(
null=True,
blank=True,
help_text='An (optional) date after which this datum is no longer valid.',
)

date_modified = models.DateTimeField(auto_now=True, editable=False)
date_created = models.DateTimeField(auto_now_add=True, editable=False)
Expand Down Expand Up @@ -447,11 +462,19 @@ def is_latest(self):
.exists()
)

@property
def is_expired(self) -> bool:
return (
self.expiration_date is not None
and self.expiration_date <= datetime.date.today()
)

class Meta:
unique_together = ('suid', 'sha256')
verbose_name_plural = 'Raw Data'
indexes = [
models.Index(fields=['no_output'], name='share_rawda_no_outp_f0330f_idx'),
models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
]

class JSONAPIMeta(BaseJSONAPIMeta):
Expand Down
13 changes: 13 additions & 0 deletions tests/share/models/test_rawdata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import pytest
import hashlib

Expand Down Expand Up @@ -65,3 +66,15 @@ def test_store_data_dedups_complex(self, source_config):
assert rd2.created is False
assert rd1.date_modified < rd2.date_modified
assert rd1.date_created == rd2.date_created

def test_is_expired(self):
rd = RawDatum()
assert rd.expiration_date is None
assert not rd.is_expired
_today = datetime.date.today()
rd.expiration_date = datetime.date(_today.year - 1, _today.month, _today.day)
assert rd.is_expired
rd.expiration_date = datetime.date(_today.year, _today.month, _today.day)
assert rd.is_expired
rd.expiration_date = datetime.date(_today.year + 1, _today.month, _today.day)
assert not rd.is_expired
207 changes: 207 additions & 0 deletions tests/trove/digestive_tract/test_expel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import datetime
from unittest import mock

from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from share import models as share_db
from tests import factories
from trove import digestive_tract
from trove import models as trove_db


_BLARG = rdf.IriNamespace('https://blarg.example/')


class TestDigestiveTractExpel(TestCase):
@classmethod
def setUpTestData(cls):
cls.focus_1 = _BLARG.this1
cls.focus_2 = _BLARG.this2
cls.raw_1, cls.indexcard_1 = _setup_ingested(cls.focus_1)
cls.raw_2, cls.indexcard_2 = _setup_ingested(cls.focus_2)
cls.raw_supp = _setup_supplementary(cls.focus_1, cls.raw_1.suid, cls.indexcard_1)

def setUp(self):
super().setUp()
self.notified_indexcard_ids = set()
self.enterContext(mock.patch(
'share.search.index_messenger.IndexMessenger.notify_indexcard_update',
new=self._replacement_notify_indexcard_update,
))
self.mock_derive_task = self.enterContext(mock.patch('trove.digestive_tract.task__derive'))

def _replacement_notify_indexcard_update(self, indexcards, **kwargs):
self.notified_indexcard_ids.update(_card.id for _card in indexcards)

def enterContext(self, context_manager):
# TestCase.enterContext added in python3.11 -- implementing here until then
result = context_manager.__enter__()
self.addCleanup(lambda: context_manager.__exit__(None, None, None))
return result

def test_setup(self):
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# neither notified indexes nor enqueued re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_not_called()

def test_expel(self):
with mock.patch('trove.digestive_tract.expel_suid') as _mock_expel_suid:
_user = self.raw_1.suid.source_config.source.user
digestive_tract.expel(from_user=_user, record_identifier=self.raw_1.suid.identifier)
_mock_expel_suid.assert_called_once_with(self.raw_1.suid)

def test_expel_suid(self):
digestive_tract.expel_suid(self.raw_1.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNotNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_1.latest_rdf # deleted
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_1.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_supplementary_suid(self):
digestive_tract.expel_suid(self.raw_supp.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)

def test_expel_expired_task(self):
with mock.patch('trove.digestive_tract.expel_expired_data') as _mock_expel_expired:
digestive_tract.task__expel_expired_data.apply()
_mock_expel_expired.assert_called_once_with(datetime.date.today())

def test_expel_expired(self):
_today = datetime.date.today()
self.raw_2.expiration_date = _today
self.raw_2.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNotNone(self.indexcard_2.deleted) # marked deleted
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_2.latest_rdf # deleted
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 0) # deleted
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_2.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_expired_supplement(self):
_today = datetime.date.today()
self.raw_supp.expiration_date = _today
self.raw_supp.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)


def _setup_ingested(focus_iri: str):
_focus_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(focus_iri)
_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=_focus_ident,
)
_raw = factories.RawDatumFactory(suid=_suid)
_indexcard = trove_db.Indexcard.objects.create(source_record_suid=_raw.suid)
_indexcard.focus_identifier_set.add(_focus_ident)
_latest_rdf = trove_db.LatestIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
trove_db.ArchivedIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle=_latest_rdf.rdf_as_turtle,
)
_deriver_iri = _BLARG.deriver
_deriver_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(_deriver_iri)
trove_db.DerivedIndexcard.objects.create(
upriver_indexcard=_indexcard,
deriver_identifier=_deriver_ident,
derived_checksum_iri='...',
derived_text='...',
)
return _raw, _indexcard


def _setup_supplementary(focus_iri, main_suid, indexcard):
_supp_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=main_suid.focus_identifier,
source_config=main_suid.source_config,
is_supplementary=True,
)
_supp_raw = factories.RawDatumFactory(suid=_supp_suid)
trove_db.SupplementaryIndexcardRdf.objects.create(
indexcard=indexcard,
from_raw_datum=_supp_raw,
supplementary_suid=_supp_suid,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
return _supp_raw
12 changes: 12 additions & 0 deletions tests/trove/digestive_tract/test_extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from tests import factories
from trove import digestive_tract
from trove import exceptions as trove_exceptions
from trove import models as trove_db


Expand Down Expand Up @@ -128,3 +130,13 @@ def test_extract_empty_supplementary(self):
(_indexcard,) = digestive_tract.extract(_empty_raw)
self.assertEqual(_indexcard.id, _orig_indexcard.id)
self.assertFalse(_orig_indexcard.supplementary_rdf_set.exists())

def test_extract_expired(self):
self.raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.raw)

def test_extract_expired_supplement(self):
self.supplementary_raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.supplementary_raw)
Loading
Loading