From 6dd1ecd91299b54213c824d954b26a9d655c3589 Mon Sep 17 00:00:00 2001 From: jtotoole Date: Thu, 28 Oct 2021 13:01:39 -0600 Subject: [PATCH 1/2] start on media merge app --- apps/merge-media-sources/.idea/.gitignore | 8 + .../inspectionProfiles/profiles_settings.xml | 6 + .../.idea/merge-media-sources.iml | 15 ++ apps/merge-media-sources/.idea/misc.xml | 4 + apps/merge-media-sources/.idea/modules.xml | 8 + .../.idea/sqlDataSources.xml | 23 ++ .../merge-media-sources/.idea/sqldialects.xml | 7 + apps/merge-media-sources/.idea/vcs.xml | 7 + apps/merge-media-sources/Dockerfile | 25 ++ apps/merge-media-sources/README.md | 11 + .../bin/feeds_rabbitmq_worker.py | 42 ++++ .../bin/feeds_workflow_worker.py | 32 +++ .../docker-compose.tests.yml | 135 +++++++++++ apps/merge-media-sources/src/__init__.py | 0 .../src/python/__init__.py | 0 .../python/merge_media_sources/__init__.py | 0 .../src/python/merge_media_sources/config.py | 6 + .../merge_media_sources/feeds_workflow.py | 54 +++++ .../feeds_workflow_interface.py | 219 ++++++++++++++++++ .../tests/python/__init__.py | 0 .../tests/python/test_feeds_merge_workflow.py | 104 +++++++++ .../tests/python/test_media_merge_workflow.py | 0 22 files changed, 706 insertions(+) create mode 100644 apps/merge-media-sources/.idea/.gitignore create mode 100644 apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml create mode 100644 apps/merge-media-sources/.idea/merge-media-sources.iml create mode 100644 apps/merge-media-sources/.idea/misc.xml create mode 100644 apps/merge-media-sources/.idea/modules.xml create mode 100644 apps/merge-media-sources/.idea/sqlDataSources.xml create mode 100644 apps/merge-media-sources/.idea/sqldialects.xml create mode 100644 apps/merge-media-sources/.idea/vcs.xml create mode 100644 apps/merge-media-sources/Dockerfile create mode 100644 apps/merge-media-sources/README.md create mode 100755 apps/merge-media-sources/bin/feeds_rabbitmq_worker.py create mode 100755 apps/merge-media-sources/bin/feeds_workflow_worker.py create mode 100644 apps/merge-media-sources/docker-compose.tests.yml create mode 100644 apps/merge-media-sources/src/__init__.py create mode 100644 apps/merge-media-sources/src/python/__init__.py create mode 100644 apps/merge-media-sources/src/python/merge_media_sources/__init__.py create mode 100644 apps/merge-media-sources/src/python/merge_media_sources/config.py create mode 100644 apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py create mode 100644 apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py create mode 100644 apps/merge-media-sources/tests/python/__init__.py create mode 100644 apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py create mode 100644 apps/merge-media-sources/tests/python/test_media_merge_workflow.py diff --git a/apps/merge-media-sources/.idea/.gitignore b/apps/merge-media-sources/.idea/.gitignore new file mode 100644 index 0000000000..13566b81b0 --- /dev/null +++ b/apps/merge-media-sources/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml b/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000000..105ce2da2d --- /dev/null +++ b/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/merge-media-sources.iml b/apps/merge-media-sources/.idea/merge-media-sources.iml new file mode 100644 index 0000000000..5fdd65ba2a --- /dev/null +++ b/apps/merge-media-sources/.idea/merge-media-sources.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/misc.xml b/apps/merge-media-sources/.idea/misc.xml new file mode 100644 index 0000000000..927f93c5cb --- /dev/null +++ b/apps/merge-media-sources/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/modules.xml b/apps/merge-media-sources/.idea/modules.xml new file mode 100644 index 0000000000..63b5104821 --- /dev/null +++ b/apps/merge-media-sources/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/sqlDataSources.xml b/apps/merge-media-sources/.idea/sqlDataSources.xml new file mode 100644 index 0000000000..90ffac82a3 --- /dev/null +++ b/apps/merge-media-sources/.idea/sqlDataSources.xml @@ -0,0 +1,23 @@ + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/sqldialects.xml b/apps/merge-media-sources/.idea/sqldialects.xml new file mode 100644 index 0000000000..533abce227 --- /dev/null +++ b/apps/merge-media-sources/.idea/sqldialects.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/vcs.xml b/apps/merge-media-sources/.idea/vcs.xml new file mode 100644 index 0000000000..a4647a1c0e --- /dev/null +++ b/apps/merge-media-sources/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/Dockerfile b/apps/merge-media-sources/Dockerfile new file mode 100644 index 0000000000..771be001bf --- /dev/null +++ b/apps/merge-media-sources/Dockerfile @@ -0,0 +1,25 @@ +# +# Identify duplicate feeds or media sources and merge them into one entry, updating +# the feed and media source references to point to the new entry. +# + +FROM gcr.io/mcback/common:latest + +# Copy sources +COPY src/ /opt/mediacloud/src/merge-media-sources/ +ENV PERL5LIB="/opt/mediacloud/src/merge-media-sources/perl:${PERL5LIB}" \ + PYTHONPATH="/opt/mediacloud/src/merge-media-sources/python:${PYTHONPATH}" + +# Copy worker script +COPY bin /opt/mediacloud/bin + +USER mediacloud + +# Set a failing CMD because we'll be using the same image to run: +# +# * "rabbitmq_worker.py" - processes Celery jobs, starts Temporal workflows for those; +# * "workflow_worker.py" - runs Temporal workflows. +# +# so the user is expected to set "command" in docker-compose.yml to run a specific worker. +# +CMD ["SET_CONTAINER_COMMAND_TO_ONE_OF_THE_WORKERS"] diff --git a/apps/merge-media-sources/README.md b/apps/merge-media-sources/README.md new file mode 100644 index 0000000000..c09329192a --- /dev/null +++ b/apps/merge-media-sources/README.md @@ -0,0 +1,11 @@ +# Merging media sources + +## TODO + +* Create sample database with fake data +* Test running the same activity multiple times +* If an activity throws an exception, its message should get printed out to the console as well (in addition to + Temporal's log) +* Track failed workflows / activities in Munin +* Instead (in addition to) of setting `workflow_run_timeout` in `test_workflow.py`, limit retries of the individual + activities too so that when they fail, we'd get a nice error message printed to the test log diff --git a/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py b/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py new file mode 100755 index 0000000000..79460e6e84 --- /dev/null +++ b/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 + +import asyncio + +from mediawords.job import JobBroker +from mediawords.util.log import create_logger +from mediawords.util.perl import decode_object_from_bytes_if_needed +from mediawords.workflow.client import workflow_client + +# noinspection PyPackageRequirements +from temporal.workflow import WorkflowClient, WorkflowOptions + +from merge_media_sources.workflow_interface import FeedsMergeWorkflow + +log = create_logger(__name__) + + +async def _start_workflow(parent_feeds_id: int, child_feeds_id: int) -> None: + log.info(f"Starting a workflow to merge feed {child_feeds_id} into {parent_feeds_id}...") + + client = workflow_client() + workflow: FeedsMergeWorkflow = client.new_workflow_stub( + cls=FeedsMergeWorkflow, + workflow_options=WorkflowOptions(workflow_id=str(child_feeds_id)), + ) + + # Fire and forget as the workflow will do everything (including adding a extraction job) itself + await WorkflowClient.start(workflow.merge_feeds, child_feeds_id, parent_feeds_id) + + log.info(f"Started a workflow to merge feed {child_feeds_id} into {parent_feeds_id}...") + + + def run_merge_feeds(parent_feeds_id: int, child_feeds_id: int) -> None: + # todo: some stuff + + + asyncio.run(_start_workflow(feeds_id=child_feeds_id, parent_feeds_id=parent_feeds_id)) + + +if __name__ == '__main__': + app = JobBroker(queue_name='MediaWords::Job::Feeds::MergeFeeds') + app.start_worker(handler=run_merge_feeds) diff --git a/apps/merge-media-sources/bin/feeds_workflow_worker.py b/apps/merge-media-sources/bin/feeds_workflow_worker.py new file mode 100755 index 0000000000..79f7f2d15d --- /dev/null +++ b/apps/merge-media-sources/bin/feeds_workflow_worker.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +import asyncio + +# noinspection PyPackageRequirements +from temporal.workerfactory import WorkerFactory + +from mediawords.util.log import create_logger +from mediawords.workflow.client import workflow_client + +from merge_media_sources.workflow import MergeMediaWorkflowImpl, MergeMediaActivitiesImpl +from merge_media_sources.workflow_interface import TASK_QUEUE, MergeMediaActivities + +log = create_logger(__name__) + + +async def _start_worker(): + client = workflow_client() + factory = WorkerFactory(client=client, namespace=client.namespace) + worker = factory.new_worker(task_queue=TASK_QUEUE) + worker.register_activities_implementation( + activities_instance=MergeMediaActivitiesImpl(), + activities_cls_name=MergeMediaActivities.__name__, + ) + worker.register_workflow_implementation_type(impl_cls=MergeMediaWorkflowImpl) + factory.start() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + asyncio.ensure_future(_start_worker()) + loop.run_forever() diff --git a/apps/merge-media-sources/docker-compose.tests.yml b/apps/merge-media-sources/docker-compose.tests.yml new file mode 100644 index 0000000000..c52e805d9f --- /dev/null +++ b/apps/merge-media-sources/docker-compose.tests.yml @@ -0,0 +1,135 @@ +version: "3.7" + +services: + + merge-media-sources: + image: gcr.io/mcback/merge-media-sources:latest + init: true + stop_signal: SIGKILL + volumes: + - type: bind + source: ./bin/ + target: /opt/mediacloud/bin/ + - type: bind + source: ./src/ + target: /opt/mediacloud/src/merge-media-sources/ + - type: bind + source: ./tests/ + target: /opt/mediacloud/tests/ + - type: bind + source: ./../common/src/ + target: /opt/mediacloud/src/common/ + depends_on: + - postgresql-pgbouncer + - rabbitmq-server + - temporal-server + + # Not needed for running the test but useful for debugging, demos + # and such + # - temporal-webapp + + postgresql-pgbouncer: + image: gcr.io/mcback/postgresql-pgbouncer:latest + init: true + stop_signal: SIGKILL + expose: + - 6432 + volumes: + - type: bind + source: ./../postgresql-pgbouncer/conf/ + target: /etc/pgbouncer/ + depends_on: + - postgresql-server + + postgresql-server: + image: gcr.io/mcback/postgresql-server:latest + init: true + stop_signal: SIGKILL + expose: + - 5432 + volumes: + - type: bind + source: ./../postgresql-server/bin/ + target: /opt/mediacloud/bin/ + - type: bind + source: ./../postgresql-server/pgmigrate/ + target: /opt/postgresql-server/pgmigrate/ + - type: bind + source: ./../postgresql-base/etc/postgresql/ + target: /etc/postgresql/ + + rabbitmq-server: + image: gcr.io/mcback/rabbitmq-server:latest + init: true + stop_signal: SIGKILL + expose: + - 5672 + - 15672 + volumes: + - type: bind + source: ./../rabbitmq-server/conf/ + target: /etc/rabbitmq/ + + temporal-server: + image: gcr.io/mcback/temporal-server:latest + init: true + stop_signal: SIGKILL + depends_on: + - temporal-postgresql + - temporal-elasticsearch + expose: + - 6933 + - 6934 + - 6935 + - 6939 + - 7233 + - 7234 + - 7235 + - 7239 + volumes: + - type: bind + source: ./../temporal-server/bin/ + target: /opt/temporal-server/bin/ + - type: bind + source: ./../temporal-server/config/dynamicconfig.yaml + target: /opt/temporal-server/config/dynamicconfig.yaml + - type: bind + source: ./../temporal-server/config/mediacloud_template.yaml + target: /opt/temporal-server/config/mediacloud_template.yaml + + temporal-postgresql: + image: gcr.io/mcback/temporal-postgresql:latest + init: true + stop_signal: SIGKILL + expose: + - 5432 + volumes: + - type: bind + source: ./../temporal-postgresql/bin/ + target: /opt/temporal-postgresql/bin/ + - type: bind + source: ./../postgresql-base/etc/postgresql/ + target: /etc/postgresql/ + + temporal-elasticsearch: + image: gcr.io/mcback/temporal-elasticsearch:latest + init: true + stop_signal: SIGKILL + expose: + - "9200" + - "9300" + volumes: + - type: bind + source: ./../elasticsearch-base/bin/elasticsearch.sh + target: /opt/elasticsearch/bin/elasticsearch.sh + # Not mounting config as it gets concatenated into a single file + + # temporal-webapp: + # image: gcr.io/mcback/temporal-webapp:latest + # init: true + # stop_signal: SIGKILL + # expose: + # - "8088" + # ports: + # # Expose to host for debugging + # - "8088:8088" diff --git a/apps/merge-media-sources/src/__init__.py b/apps/merge-media-sources/src/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/__init__.py b/apps/merge-media-sources/src/python/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/merge_media_sources/__init__.py b/apps/merge-media-sources/src/python/merge_media_sources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/merge_media_sources/config.py b/apps/merge-media-sources/src/python/merge_media_sources/config.py new file mode 100644 index 0000000000..c946212fb5 --- /dev/null +++ b/apps/merge-media-sources/src/python/merge_media_sources/config.py @@ -0,0 +1,6 @@ +import abc + +from mediawords.util.config import env_value, file_with_env_value + + +class MergeMediaConfig(object, metaclass=abc.ABCMeta): diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py new file mode 100644 index 0000000000..7e1deee21b --- /dev/null +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py @@ -0,0 +1,54 @@ +import os +import tempfile +from typing import Optional + +# noinspection PyPackageRequirements +from temporal.workflow import Workflow + +from mediawords.db import connect_to_db_or_raise +from mediawords.job import JobBroker +from mediawords.util.parse_json import encode_json, decode_json +from mediawords.util.config.common import RabbitMQConfig +from mediawords.util.log import create_logger +from mediawords.workflow.exceptions import McProgrammingError, McTransientError, McPermanentError + +from .feeds_workflow_interface import FeedsMergeWorkflow, FeedsMergeActivities + +log = create_logger(__name__) + + +class FeedsMergeActivitiesImpl(FeedsMergeActivities): + """Activities implementation.""" + + async def + + async def identify_story_bcp47_language_code(self, stories_id: int) -> Optional[str]: + log.info(f"Identifying story language for story {stories_id}...") + + db = connect_to_db_or_raise() + + story = db.find_by_id(table='stories', object_id=stories_id) + if not story: + raise McPermanentError(f"Story {stories_id} was not found.") + + # Podcast episodes typically come with title and description set so try guessing from that + story_title = story['title'] + story_description = html_strip(story['description']) + sample_text = f"{story_title}\n{story_description}" + + bcp_47_language_code = None + if identification_would_be_reliable(text=sample_text): + iso_639_1_language_code = language_code_for_text(text=sample_text) + + # Convert to BCP 47 identifier + bcp_47_language_code = iso_639_1_code_to_bcp_47_identifier( + iso_639_1_code=iso_639_1_language_code, + url_hint=story['url'], + ) + + log.info(f"Language code for story {stories_id} is {bcp_47_language_code}") + + return bcp_47_language_code + + + await self.activities.add_to_extraction_queue(stories_id) diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py new file mode 100644 index 0000000000..5634db917b --- /dev/null +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py @@ -0,0 +1,219 @@ +import dataclasses +from datetime import timedelta +from typing import Optional + +# noinspection PyPackageRequirements +from temporal.activity_method import activity_method, RetryParameters +# noinspection PyPackageRequirements +from temporal.workflow import workflow_method + +from mediawords.workflow.exceptions import McPermanentError + +from .config import MergeMediaConfig +# TODO: create config file? + +TASK_QUEUE = "merge-media-sources" +"""Temporal task queue.""" + +DEFAULT_RETRY_PARAMETERS = RetryParameters( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2, + maximum_interval=timedelta(hours=2), + maximum_attempts=1000, + non_retryable_error_types=[ + McPermanentError.__name__, + ], +) + + +class MergeMediaActivities(object): + + @classmethod + def _create_config(cls) -> MergeMediaConfig: + """ + Create and return configuration instance to be used for running the workflow. + + Might get overridden in case some configuration changes have to be made while running the tests. + """ + return MergeMediaConfig() + + def __init__(self): + super().__init__() + self.config = self._create_config() + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def identify_story_bcp47_language_code(self, stories_id: int) -> Optional[str]: + """ + Guess BCP 47 language code of a story. + + https://cloud.google.com/speech-to-text/docs/languages + + :param stories_id: Story to guess the language code for. + :return: BCP 47 language code (e.g. 'en-US') or None if the language code could not be determined. + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def determine_best_enclosure(self, stories_id: int) -> Optional[StoryEnclosureDict]: + """ + Fetch a list of story enclosures, determine which one looks like a podcast episode the most. + + Uses or similar tag. + + :param stories_id: Story to fetch the enclosures for. + :return: Best enclosure metadata object (as dict), or None if no best enclosure could be determined. + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + # With a super-slow server, it's probably reasonable to expect that it might take a few hours to fetch a single + # episode + start_to_close_timeout=timedelta(hours=2), + retry_parameters=dataclasses.replace( + DEFAULT_RETRY_PARAMETERS, + + # Wait for a minute before trying again + initial_interval=timedelta(minutes=1), + + # Hope for the server to resurrect in a week + maximum_interval=timedelta(weeks=1), + + # Don't kill ourselves trying to hit a permanently dead server + maximum_attempts=50, + ), + ) + async def fetch_enclosure_to_gcs(self, stories_id: int, enclosure: StoryEnclosureDict) -> None: + """ + Fetch enclosure and store it to GCS as an episode. + + Doesn't do transcoding or anything because transcoding or any subsequent steps might fail, and if they do, we + want to have the raw episode fetched and safely stored somewhere. + + :param stories_id: Story to fetch the enclosure for. + :param enclosure: Enclosure to fetch (as dict). + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + + # Let's expect super long episodes or super slow servers + start_to_close_timeout=timedelta(hours=2), + + retry_parameters=dataclasses.replace( + DEFAULT_RETRY_PARAMETERS, + + # Wait for a minute before trying again (GCS might be down) + initial_interval=timedelta(minutes=1), + + # Hope for GCS to resurrect in a day + maximum_interval=timedelta(days=1), + + # Limit attempts because transcoding itself might be broken, and we don't want to be fetching huge objects + # from GCS periodically + maximum_attempts=20, + ), + ) + async def fetch_transcode_store_episode(self, stories_id: int) -> MediaFileInfoAudioStreamDict: + """ + Fetch episode from GCS, transcode it if needed and store it to GCS again in a separate bucket. + + Now that the raw episode file is safely located in GCS, we can try transcoding it. + + :param stories_id: Story ID the episode of which should be transcoded. + :return: Metadata of the best audio stream determined as part of the transcoding (as dict). + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + + # Give a bit more time as the implementation is likely to do some non-Temporal retries on weird Speech API + # errors + start_to_close_timeout=timedelta(minutes=5), + + retry_parameters=dataclasses.replace( + DEFAULT_RETRY_PARAMETERS, + + # Given that the thing is costly, wait a whole hour before retrying anything + initial_interval=timedelta(hours=1), + + # Hope for the Speech API to resurrect in a week + maximum_interval=timedelta(weeks=1), + + # Don't retry too much as each try is potentially very costly + maximum_attempts=10, + ), + ) + async def submit_transcribe_operation(self, + stories_id: int, + episode_metadata: MediaFileInfoAudioStreamDict, + bcp47_language_code: str) -> str: + """ + Submit a long-running transcription operation to the Speech API. + + :param stories_id: Story ID of the episode which should be submitted for transcribing. + :param episode_metadata: Metadata of transcoded episode (as dict). + :param bcp47_language_code: BCP 47 language code of the story. + :return: Speech API operation ID for the transcription operation. + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def fetch_store_raw_transcript_json(self, stories_id: int, speech_operation_id: str) -> None: + """ + Fetch a finished transcription and store the raw JSON of it into a GCS bucket. + + Raises an exception if the transcription operation is not finished yet. + + :param stories_id: Story ID the episode of which should be submitted for transcribing. + :param speech_operation_id: Speech API operation ID. + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def fetch_store_transcript(self, stories_id: int) -> None: + """ + Fetch a raw JSON transcript from a GCS bucket, store it to "download_texts". + + :param stories_id: Story ID the transcript of which should be stored into the database. + """ + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(minutes=2), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def add_to_extraction_queue(self, stories_id: int) -> None: + """ + Add a story to the extraction queue. + + :param stories_id: Story ID to be added to the extraction queue. + """ + raise NotImplementedError + + +class FeedsMergeWorkflow(object): + """Workflow interface.""" + + @workflow_method(task_queue=TASK_QUEUE) + async def merge_feeds(self, child_feeds_id: int, parent_feeds_id: int) -> None: + raise NotImplementedError diff --git a/apps/merge-media-sources/tests/python/__init__.py b/apps/merge-media-sources/tests/python/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py new file mode 100644 index 0000000000..cf408a7e98 --- /dev/null +++ b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py @@ -0,0 +1,104 @@ +import os +from datetime import timedelta +from typing import Union + +# noinspection PyPackageRequirements +import pytest +# noinspection PyPackageRequirements +from temporal.workerfactory import WorkerFactory +# noinspection PyPackageRequirements +from temporal.workflow import WorkflowOptions + +from mediawords.db import connect_to_db +from mediawords.dbi.downloads.store import fetch_content +from mediawords.test.db.create import create_test_feed, create_test_medium, create_test_story, create_download_for_feed +from mediawords.test.hash_server import HashServer +from mediawords.util.log import create_logger +from mediawords.util.network import random_unused_port +from mediawords.workflow.client import workflow_client +from mediawords.workflow.worker import stop_worker_faster + + +log = create_logger(__name__) + + +@pytest.mark.asyncio +async def test_feeds_workflow(): + db = connect_to_db() + + test_medium = create_test_medium(db=db, label='test') + test_parent_feed = create_test_feed(db=db, label='parent', medium=test_medium) + test_child_feeds = [] + for i in range(1,4): + test_child_feeds.append(create_test_feed(db=db, label=F'child_feed_{str(i)}', medium=test_medium)) + + for feed in test_child_feeds: + create_download_for_feed(db=db, feed=feed) + create_test_story(db=db, label=F"story for {feed['label']}", feed=feed) + db.insert(table='scraped_feeds', insert_hash={ + 'feeds_id': int(feed['feeds_id']), + 'url': feed['url'], + 'scrape_date': 'NOW()', + 'import_module': 'mediawords' + }) + db.insert(table='feeds_from_yesterday', insert_hash={ + 'feeds_id': int(feed['feeds_id']), + 'media_id': int(test_medium['media_id']), + 'name': F"feed_from_yesterday_{feed['name']}", + 'url': feed['url'], + 'type': 'test', + 'active': True + }) + db.insert(table='feeds_tags_map', insert_hash={ + 'feeds_id': int(feed['feeds_id']), + 'tags_id': int(test_medium['media_id']), + }) + + + client = workflow_client() + + # Start worker + factory = WorkerFactory(client=client, namespace=client.namespace) + worker = factory.new_worker(task_queue=TASK_QUEUE) + + activities = FeedsMergeActivities() + + worker.register_activities_implementation( + activities_instance=activities, + activities_cls_name=FeedsMergeActivities.__name__, + ) + worker.register_workflow_implementation_type(impl_cls=FeedsMergeWorkflowImpl) + factory.start() + + # Initialize workflow instance + workflow: FeedsMergeWorkflow = client.new_workflow_stub( + cls=FeedsMergeWorkflow, + workflow_options=WorkflowOptions( + workflow_id=str(stories_id), + + # By default, if individual activities of the workflow fail, they will get restarted pretty much + # indefinitely, and so this test might run for days (or rather just timeout on the CI). So we cap the + # workflow so that if it doesn't manage to complete in X minutes, we consider it as failed. + workflow_run_timeout=timedelta(minutes=5), + + ), + ) + + # Wait for the workflow to complete + await workflow.merge_feeds(feeds_id=feed[], parent_feeds_id=test_parent_feed['feeds_id']) + + downloads = db.select(table='downloads', what_to_select='*').hashes() + assert len(downloads) == 1 + first_download = downloads[0] + assert first_download['stories_id'] == stories_id + assert first_download['type'] == 'content' + assert first_download['state'] == 'success' + + + # Initiate the worker shutdown in the background while we do the GCS cleanup so that the stop_workers_faster() + # doesn't have to wait that long + await worker.stop(background=True) + + log.info("Stopping workers...") + await stop_worker_faster(worker) + log.info("Stopped workers") diff --git a/apps/merge-media-sources/tests/python/test_media_merge_workflow.py b/apps/merge-media-sources/tests/python/test_media_merge_workflow.py new file mode 100644 index 0000000000..e69de29bb2 From 0b6dc772d82459be8b5bd42cb0c57760fa22dc11 Mon Sep 17 00:00:00 2001 From: jtotoole Date: Fri, 12 Nov 2021 20:48:31 -0700 Subject: [PATCH 2/2] load feeds to temporal + refactor activities and test --- apps/merge-media-sources/Dockerfile | 10 +- .../bin/feeds_rabbitmq_worker.py | 42 -- .../bin/feeds_to_merge.csv | 480 ++++++++++++++++++ .../bin/feeds_workflow_worker.py | 12 +- .../bin/load_feeds_workflows.py | 38 ++ .../src/python/merge_media_sources/config.py | 6 - .../merge_media_sources/feeds_workflow.py | 129 +++-- .../feeds_workflow_interface.py | 182 +------ .../tests/python/test_feeds_merge_workflow.py | 100 ++-- 9 files changed, 685 insertions(+), 314 deletions(-) delete mode 100755 apps/merge-media-sources/bin/feeds_rabbitmq_worker.py create mode 100644 apps/merge-media-sources/bin/feeds_to_merge.csv create mode 100644 apps/merge-media-sources/bin/load_feeds_workflows.py delete mode 100644 apps/merge-media-sources/src/python/merge_media_sources/config.py diff --git a/apps/merge-media-sources/Dockerfile b/apps/merge-media-sources/Dockerfile index 771be001bf..426000071d 100644 --- a/apps/merge-media-sources/Dockerfile +++ b/apps/merge-media-sources/Dockerfile @@ -10,16 +10,12 @@ COPY src/ /opt/mediacloud/src/merge-media-sources/ ENV PERL5LIB="/opt/mediacloud/src/merge-media-sources/perl:${PERL5LIB}" \ PYTHONPATH="/opt/mediacloud/src/merge-media-sources/python:${PYTHONPATH}" -# Copy worker script +# Copy worker scripts COPY bin /opt/mediacloud/bin USER mediacloud -# Set a failing CMD because we'll be using the same image to run: -# -# * "rabbitmq_worker.py" - processes Celery jobs, starts Temporal workflows for those; -# * "workflow_worker.py" - runs Temporal workflows. -# +# Set a failing CMD because we'll be using the same image to run feeds merge + media merge, # so the user is expected to set "command" in docker-compose.yml to run a specific worker. -# + CMD ["SET_CONTAINER_COMMAND_TO_ONE_OF_THE_WORKERS"] diff --git a/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py b/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py deleted file mode 100755 index 79460e6e84..0000000000 --- a/apps/merge-media-sources/bin/feeds_rabbitmq_worker.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio - -from mediawords.job import JobBroker -from mediawords.util.log import create_logger -from mediawords.util.perl import decode_object_from_bytes_if_needed -from mediawords.workflow.client import workflow_client - -# noinspection PyPackageRequirements -from temporal.workflow import WorkflowClient, WorkflowOptions - -from merge_media_sources.workflow_interface import FeedsMergeWorkflow - -log = create_logger(__name__) - - -async def _start_workflow(parent_feeds_id: int, child_feeds_id: int) -> None: - log.info(f"Starting a workflow to merge feed {child_feeds_id} into {parent_feeds_id}...") - - client = workflow_client() - workflow: FeedsMergeWorkflow = client.new_workflow_stub( - cls=FeedsMergeWorkflow, - workflow_options=WorkflowOptions(workflow_id=str(child_feeds_id)), - ) - - # Fire and forget as the workflow will do everything (including adding a extraction job) itself - await WorkflowClient.start(workflow.merge_feeds, child_feeds_id, parent_feeds_id) - - log.info(f"Started a workflow to merge feed {child_feeds_id} into {parent_feeds_id}...") - - - def run_merge_feeds(parent_feeds_id: int, child_feeds_id: int) -> None: - # todo: some stuff - - - asyncio.run(_start_workflow(feeds_id=child_feeds_id, parent_feeds_id=parent_feeds_id)) - - -if __name__ == '__main__': - app = JobBroker(queue_name='MediaWords::Job::Feeds::MergeFeeds') - app.start_worker(handler=run_merge_feeds) diff --git a/apps/merge-media-sources/bin/feeds_to_merge.csv b/apps/merge-media-sources/bin/feeds_to_merge.csv new file mode 100644 index 0000000000..aeaa28d413 --- /dev/null +++ b/apps/merge-media-sources/bin/feeds_to_merge.csv @@ -0,0 +1,480 @@ +feed_id,parent_feed_id +18541,347569 +55406,865683 +58299,58354 +105195,1980261 +107128,176026 +107300,357899 +109082,118879 +109195,113506 +109925,107729 +110990,286407 +111237,43458 +112900,123734 +113085,153952 +113606,286407 +114041,877303 +114248,6697 +115643,106985 +117790,113435 +117860,118879 +118929,109971 +119899,6697 +120158,124153 +120373,173701 +120806,113309 +120939,109641 +123589,113056 +123619,107998 +123770,424573 +130033,81595 +130587,585863 +130626,873639 +132132,528155 +174779,348081 +174855,313933 +175080,347400 +175198,17986 +176024,107126 +176596,880526 +176876,504717 +177002,400534 +240986,379546 +314789,123443 +315209,315436 +315360,504145 +315469,315436 +348199,348190 +348293,177238 +348925,893363 +349026,348966 +349042,410833 +355629,176769 +355664,176769 +356679,356962 +358669,883977 +361018,400406 +361829,89720 +362786,810172 +362787,804684 +364984,69673 +366067,6697 +366191,429047 +367188,254373 +368816,379546 +369101,362079 +369718,379546 +374031,429320 +374396,374994 +374851,113056 +376910,857959 +380136,504100 +383036,402668 +383074,419887 +387001,118650 +392929,406284 +393012,414974 +393100,379546 +393616,520585 +393898,358638 +395809,514941 +396085,109948 +396182,396290 +399323,28546 +404955,429623 +406046,174650 +406303,502596 +416090,441469 +417558,406284 +423295,418169 +425512,535154 +427290,78177 +429637,880365 +438914,609853 +439628,905644 +462818,406284 +470172,494956 +502769,398131 +503201,1455534 +503698,2328935 +504133,448117 +504540,405795 +507787,609853 +509358,509376 +509366,509376 +514870,514869 +514925,514924 +535334,867266 +540465,442211 +553866,884015 +553867,427071 +558097,367781 +577416,842077 +583719,1771240 +626989,515582 +635639,525610 +695537,746931 +695757,903083 +695763,804555 +695899,811445 +696002,118650 +696170,811339 +725070,37 +738460,781361 +744967,710706 +782181,782180 +782218,782362 +782329,782362 +810326,132461 +842185,1516409 +848641,538809 +855639,725111 +855851,1201871 +860400,860787 +861032,861033 +861896,866211 +865089,348081 +865977,535154 +867228,866211 +867248,865733 +869831,1209649 +873677,1753176 +877135,439660 +882235,855434 +884056,403004 +884331,884415 +884711,884882 +886641,886642 +892835,892930 +895089,894937 +895191,894937 +895437,1201871 +895461,896504 +895806,895850 +895846,895850 +896482,895439 +898442,898479 +898471,898434 +898617,889774 +898623,889774 +898719,889774 +898928,889774 +899334,898954 +900986,1535268 +906798,583735 +912257,881149 +932480,875688 +946914,695690 +960028,725558 +961478,961477 +979456,979457 +1030511,1032213 +1031490,1015443 +1036937,881079 +1040504,1040496 +1040517,1040496 +1040705,1634140 +1040742,1634140 +1041374,1654294 +1043439,502594 +1055036,1055048 +1055044,1055048 +1055816,904106 +1056090,897261 +1061616,1061613 +1081063,695937 +1094176,970915 +1114943,1114932 +1115364,78114 +1139908,503397 +1151957,373697 +1153916,946645 +1162565,1452649 +1166245,1166343 +1166342,1166244 +1170658,1166244 +1171238,1166343 +1172666,78454 +1172836,172182 +1173492,1166244 +1173518,1650515 +1173551,1092182 +1173692,1455039 +1174165,1166244 +1175496,1175497 +1177289,1166343 +1177363,1166343 +1177625,1166343 +1177751,1173973 +1178658,129982 +1178861,78454 +1179046,1166343 +1179053,1166244 +1179067,78454 +1179268,7567 +1179353,1189129 +1180137,1182564 +1180272,1173867 +1180679,78454 +1180701,1178836 +1185354,399462 +1185388,399462 +1185527,1178874 +1185544,7683 +1185617,78454 +1185684,899976 +1186100,1189129 +1189096,1208405 +1204545,1187557 +1208142,524207 +1208240,1031285 +1208337,1208361 +1208413,1208345 +1208503,1208345 +1209789,2293489 +1210109,1210208 +1217945,2309770 +1223693,972200 +1227620,132750 +1227633,1185526 +1227901,1185526 +1227906,505592 +1227921,129982 +1227932,1719971 +1227944,698220 +1228907,1456650 +1229630,2283308 +1229862,1456600 +1262756,1185526 +1277071,1031367 +1308670,875584 +1372276,640836 +1372352,1372353 +1417817,1262381 +1421453,2125493 +1452657,2161108 +1454243,106462 +1454516,28546 +1455569,503220 +1455924,860721 +1456654,1729512 +1456759,1189129 +1456871,1032213 +1457205,124422 +1457223,1457215 +1457326,1459165 +1457566,1166343 +1457585,1167049 +1457632,1166343 +1457994,1178874 +1458356,1166244 +1458430,1166343 +1458585,1229045 +1458804,1703176 +1458970,1166244 +1459040,1166343 +1459209,884521 +1459683,1021426 +1459764,1081082 +1459767,129985 +1459777,129981 +1459800,129982 +1459817,1081082 +1459823,129982 +1460353,1701174 +1460774,1708647 +1460963,1704198 +1461715,1189129 +1461882,75252 +1472956,509348 +1488496,364128 +1489416,1225231 +1489605,794874 +1513852,1513932 +1513877,124422 +1513935,1513822 +1513967,1185526 +1514051,124422 +1514187,1567287 +1514343,609853 +1516124,1014349 +1516171,804540 +1516543,1185526 +1540074,1648401 +1549925,794874 +1577692,521172 +1583917,1262381 +1591908,1066831 +1591917,1066831 +1604948,1608304 +1608771,2013571 +1614000,1093722 +1618802,362188 +1619433,1616668 +1619601,1616667 +1619650,1093722 +1620087,1209654 +1620092,1567287 +1624141,1706328 +1633998,1633985 +1634269,912217 +1634818,1081082 +1643621,1185526 +1648391,1648394 +1648928,1092182 +1650316,1721675 +1651697,1653000 +1653182,1653131 +1653235,863097 +1670870,1719941 +1686997,1262381 +1688630,2123407 +1693605,1704851 +1700920,1208208 +1701120,1703030 +1702617,1172806 +1702639,1702900 +1702668,902832 +1702770,1172657 +1702892,1702631 +1702940,1719941 +1703031,1703030 +1703033,1703030 +1703175,1703030 +1703679,892617 +1704587,1700624 +1704903,1703539 +1706534,1703030 +1706979,1703030 +1707344,1745227 +1707353,1702627 +1707567,1172806 +1707710,811412 +1708202,1185526 +1708316,1703534 +1708722,1700629 +1709947,1697103 +1709953,1703030 +1710066,1458604 +1710405,1697039 +1710666,1184969 +1710700,1719941 +1710731,1719941 +1711426,1697613 +1711554,1719941 +1711748,1719941 +1712522,1719941 +1712533,1719941 +1712708,1712604 +1712710,1703030 +1712713,1703030 +1712730,1703030 +1712933,1721355 +1719605,1719569 +1719861,2125607 +1720068,1696959 +1721643,1743831 +1721750,1710083 +1722052,1721498 +1723622,1185526 +1723795,1743831 +1727283,1706464 +1728464,1227260 +1730464,2125500 +1730484,994791 +1737106,1178874 +1740709,1456900 +1740934,1604680 +1741067,860615 +1741108,1805734 +1741226,1741225 +1742692,2091704 +1743650,875584 +1743832,1721644 +1744772,1743831 +1762119,1762110 +1762158,1460202 +1801315,1951679 +1805782,875258 +1812605,873408 +1838969,1690444 +1841804,1840208 +1841826,2123324 +1848209,1152262 +1930894,356621 +1945930,2092688 +1945935,1744876 +1946321,1804134 +1946525,2326411 +1946956,1744876 +1946966,1744876 +1947123,1744876 +1947201,1744876 +1951678,1801314 +1974931,2153840 +1975818,1744876 +1976607,1800245 +1979940,1636131 +1980122,1461248 +1982216,379546 +2019859,877236 +2028140,1262381 +2034695,379546 +2035032,1634841 +2070887,971212 +2073610,2073588 +2073612,2073588 +2089720,2134537 +2091486,712212 +2091712,1742700 +2091919,1980233 +2093765,2067549 +2099414,736099 +2121411,2122600 +2123346,2123348 +2125204,2089518 +2125205,2089518 +2125392,1208089 +2125406,1208176 +2125455,1208089 +2125632,1979787 +2131628,2131625 +2134006,1209654 +2134534,122110 +2135029,1787254 +2135104,2135165 +2135392,2125524 +2141690,488131 +2161628,1228115 +2161629,1228115 +2161632,1228115 +2211112,2259916 +2211137,7683 +2211212,2211356 +2220068,2220067 +2225790,1605408 +2246063,2125603 +2288505,1809385 +2289396,736099 +2293008,1578794 +2299252,122108 +2299509,1178874 +2315110,808947 +2315606,2244905 +2316483,379546 +2321272,2321275 +2321274,2321275 +2321276,2321275 +2321280,2321129 +2321281,2321283 +2325617,1945550 +2326172,2326244 +2327393,1842987 +2327861,1801314 +2328757,1968840 +2329535,2210932 +2329656,395033 +2330284,2330256 +2331025,2331026 +2332463,1653000 +2336962,1744876 diff --git a/apps/merge-media-sources/bin/feeds_workflow_worker.py b/apps/merge-media-sources/bin/feeds_workflow_worker.py index 79f7f2d15d..547bcf098a 100755 --- a/apps/merge-media-sources/bin/feeds_workflow_worker.py +++ b/apps/merge-media-sources/bin/feeds_workflow_worker.py @@ -8,8 +8,9 @@ from mediawords.util.log import create_logger from mediawords.workflow.client import workflow_client -from merge_media_sources.workflow import MergeMediaWorkflowImpl, MergeMediaActivitiesImpl -from merge_media_sources.workflow_interface import TASK_QUEUE, MergeMediaActivities +from merge_media_sources.feeds_workflow import FeedsMergeWorkflowImpl, FeedsMergeActivitiesImpl +from merge_media_sources.feeds_workflow_interface import TASK_QUEUE, FeedsMergeActivities +from load_feeds_workflows import submit_feed_workflows log = create_logger(__name__) @@ -19,14 +20,15 @@ async def _start_worker(): factory = WorkerFactory(client=client, namespace=client.namespace) worker = factory.new_worker(task_queue=TASK_QUEUE) worker.register_activities_implementation( - activities_instance=MergeMediaActivitiesImpl(), - activities_cls_name=MergeMediaActivities.__name__, + activities_instance=FeedsMergeActivitiesImpl(), + activities_cls_name=FeedsMergeActivities.__name__, ) - worker.register_workflow_implementation_type(impl_cls=MergeMediaWorkflowImpl) + worker.register_workflow_implementation_type(impl_cls=FeedsMergeWorkflowImpl) factory.start() if __name__ == '__main__': loop = asyncio.get_event_loop() asyncio.ensure_future(_start_worker()) + submit_feed_workflows() loop.run_forever() diff --git a/apps/merge-media-sources/bin/load_feeds_workflows.py b/apps/merge-media-sources/bin/load_feeds_workflows.py new file mode 100644 index 0000000000..cee0dac55a --- /dev/null +++ b/apps/merge-media-sources/bin/load_feeds_workflows.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import asyncio + +from mediawords.util.log import create_logger +from mediawords.workflow.client import workflow_client + +# noinspection PyPackageRequirements +from temporal.workflow import WorkflowClient, WorkflowOptions + +from merge_media_sources.feeds_workflow_interface import FeedsMergeWorkflow + +log = create_logger(__name__) + + +async def _start_workflow(client: WorkflowClient, child_feed_id: int, parent_feed_id: int) -> None: + + log.info(f"Starting a workflow to merge feed {child_feed_id} into {parent_feed_id}...") + + workflow: FeedsMergeWorkflow = client.new_workflow_stub( + cls=FeedsMergeWorkflow, + workflow_options=WorkflowOptions(workflow_id=str(child_feed_id) + '_to_' + str(parent_feed_id)), + ) + + # Fire and forget as the workflow will do everything (including adding a extraction job) itself + await WorkflowClient.start(workflow.merge_feeds, child_feed_id, parent_feed_id) + + log.info(f"Started a workflow to merge feed {child_feed_id} into {parent_feed_id}...") + + +def submit_feed_workflows(): + client = workflow_client() + with open('feeds_to_merge.csv') as f: + feeds_to_merge = [{k: int(v) for k, v in row.items()} for row in csv.DictReader(f, skipinitialspace=True)] + for feed_pair in feeds_to_merge: + child_feed = feed_pair['feed_id'] + parent_feed = feed_pair['parent_feed_id'] + _start_workflow(client, child_feed, parent_feed) diff --git a/apps/merge-media-sources/src/python/merge_media_sources/config.py b/apps/merge-media-sources/src/python/merge_media_sources/config.py deleted file mode 100644 index c946212fb5..0000000000 --- a/apps/merge-media-sources/src/python/merge_media_sources/config.py +++ /dev/null @@ -1,6 +0,0 @@ -import abc - -from mediawords.util.config import env_value, file_with_env_value - - -class MergeMediaConfig(object, metaclass=abc.ABCMeta): diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py index 7e1deee21b..b2c4e33a36 100644 --- a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py @@ -1,14 +1,8 @@ -import os -import tempfile -from typing import Optional - # noinspection PyPackageRequirements from temporal.workflow import Workflow from mediawords.db import connect_to_db_or_raise -from mediawords.job import JobBroker -from mediawords.util.parse_json import encode_json, decode_json -from mediawords.util.config.common import RabbitMQConfig +from mediawords.db.handler import DatabaseHandler from mediawords.util.log import create_logger from mediawords.workflow.exceptions import McProgrammingError, McTransientError, McPermanentError @@ -18,37 +12,114 @@ class FeedsMergeActivitiesImpl(FeedsMergeActivities): - """Activities implementation.""" - async def + #TODO: when implementing media merge, consider breaking helper functions below into separate module + + @staticmethod + def chunk_results(results: list) -> list: + """Break results of a query into chunks of 1000 (returns list of lists).""" + return [results[i:i + 1000] for i in range(0, len(results), 1000)] + + @staticmethod + def get_child_feed_entries(db: DatabaseHandler, table: str, table_id_field: str, child_feed_id: int) -> list: + log.info(f"Getting entries in {table} table associated with feed {str(child_feed_id)}") + + child_feed_id = db.find_by_id(table='feeds', object_id=child_feed_id) + if not child_feed_id: + raise McPermanentError(f"Feed {child_feed_id} was not found.") + + get_child_feed_entries_query = f""" + SELECT {table_id_field} + FROM {table} + WHERE feeds_id = {child_feed_id}; + """ + + child_feed_entries = db.query(get_child_feed_entries_query) + + log.info(f"Got all entries in downloads table for feed {str(child_feed_id)}") - async def identify_story_bcp47_language_code(self, stories_id: int) -> Optional[str]: - log.info(f"Identifying story language for story {stories_id}...") + return child_feed_entries + + async def migrate_child_entries(self, table: str, table_id_field: str, id_list: list, child_feed_id: int, + parent_feed_id: int) -> None: + log.info(f"Updating {table} table to migrate {len(id_list)} entries associated with {child_feed_id} to " + f"parent {parent_feed_id}") + + db = connect_to_db_or_raise() + update_query = f""" + UPDATE {table} + SET feeds_id = {parent_feed_id} + WHERE {table_id_field} IN {id_list}; + """ + + db.query(update_query) + + log.info(f"Migrated {len(id_list)} entries in {table} for feed {child_feed_id} to parent {parent_feed_id}") + + async def delete_child_entries(self, child_feed_id: int, table: str) -> None: + log.info(f"Deleting entries in {table} table associated with feed {str(child_feed_id)}") db = connect_to_db_or_raise() - story = db.find_by_id(table='stories', object_id=stories_id) - if not story: - raise McPermanentError(f"Story {stories_id} was not found.") + delete_query = f""" + DELETE FROM {table} + WHERE feeds_id = {child_feed_id}; + """ + + db.query(delete_query) + + log.info(f"Deleted entries in {table} table associated with feed {str(child_feed_id)}") + + +class FeedsMergeWorkflowImpl(FeedsMergeWorkflow): + """Workflow implementation.""" + + def __init__(self): + self.activities: FeedsMergeActivities = Workflow.new_activity_stub( + activities_cls=FeedsMergeWorkflow, + # No retry_parameters here as they get set individually in @activity_method() + ) + + async def merge_feeds(self, child_feed_id: int, parent_feed_id: int) -> None: + + child_feed_downloads = self.activities.get_child_feed_entries('downloads', 'downloads_id', child_feed_id) + + for chunk in self.activities.chunk_results(child_feed_downloads): + await self.activities.migrate_child_entries('downloads', 'downloads_id', chunk, child_feed_id, + parent_feed_id) + + child_feed_stories_map = self.activities.get_child_feed_entries('feeds_stories_map_p', 'feeds_stories_map_p_id', + child_feed_id) + + for chunk in self.activities.chunk_results(child_feed_stories_map): + await self.activities.migrate_child_entries('feeds_stories_map_p', 'feeds_stories_map_p_id', chunk, + child_feed_id) + + child_scraped_feeds = self.activities.get_child_feed_entries('scraped_feeds', 'scraped_feeds_id', child_feed_id) + + await self.activities.migrate_child_entries('scraped_feeds', 'feed_scrapes_id', child_scraped_feeds, + child_feed_id, parent_feed_id) + + child_feeds_from_yesterday = self.activities.get_child_feed_entries('feeds_from_yesterday', 'feeds_id', + child_feed_id) + + await self.activities.migrate_child_entries('feeds_from_yesterday', 'feeds_id', child_feeds_from_yesterday, + child_feed_id, parent_feed_id) + + child_feeds_tags_map = self.activities.get_child_feed_entries('feeds_tags_map', 'feeds_tags_map_id', + child_feed_id) - # Podcast episodes typically come with title and description set so try guessing from that - story_title = story['title'] - story_description = html_strip(story['description']) - sample_text = f"{story_title}\n{story_description}" + await self.activities.migrate_child_entries('feeds_tags_map', 'feeds_tags_map_id', child_feeds_tags_map, + child_feed_id, parent_feed_id) - bcp_47_language_code = None - if identification_would_be_reliable(text=sample_text): - iso_639_1_language_code = language_code_for_text(text=sample_text) + await self.activities.delete_child_entries(child_feed_id, 'downloads') - # Convert to BCP 47 identifier - bcp_47_language_code = iso_639_1_code_to_bcp_47_identifier( - iso_639_1_code=iso_639_1_language_code, - url_hint=story['url'], - ) + await self.activities.delete_child_entries(child_feed_id, 'feeds_stories_map') - log.info(f"Language code for story {stories_id} is {bcp_47_language_code}") + await self.activities.delete_child_entries(child_feed_id, 'scraped_feeds') - return bcp_47_language_code + await self.activities.delete_child_entries(child_feed_id, 'feeds_from_yesterday') + await self.activities.delete_child_entries(child_feed_id, 'feeds_tags_map') - await self.activities.add_to_extraction_queue(stories_id) + await self.activities.delete_child_entries(child_feed_id, 'feeds') diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py index 5634db917b..d4ad85526c 100644 --- a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py @@ -1,6 +1,4 @@ -import dataclasses from datetime import timedelta -from typing import Optional # noinspection PyPackageRequirements from temporal.activity_method import activity_method, RetryParameters @@ -9,10 +7,8 @@ from mediawords.workflow.exceptions import McPermanentError -from .config import MergeMediaConfig -# TODO: create config file? -TASK_QUEUE = "merge-media-sources" +TASK_QUEUE = "merge-feeds" """Temporal task queue.""" DEFAULT_RETRY_PARAMETERS = RetryParameters( @@ -26,162 +22,16 @@ ) -class MergeMediaActivities(object): - - @classmethod - def _create_config(cls) -> MergeMediaConfig: - """ - Create and return configuration instance to be used for running the workflow. - - Might get overridden in case some configuration changes have to be made while running the tests. - """ - return MergeMediaConfig() - - def __init__(self): - super().__init__() - self.config = self._create_config() - - @activity_method( - task_queue=TASK_QUEUE, - start_to_close_timeout=timedelta(seconds=60), - retry_parameters=DEFAULT_RETRY_PARAMETERS, - ) - async def identify_story_bcp47_language_code(self, stories_id: int) -> Optional[str]: - """ - Guess BCP 47 language code of a story. - - https://cloud.google.com/speech-to-text/docs/languages - - :param stories_id: Story to guess the language code for. - :return: BCP 47 language code (e.g. 'en-US') or None if the language code could not be determined. - """ - raise NotImplementedError - - @activity_method( - task_queue=TASK_QUEUE, - start_to_close_timeout=timedelta(seconds=60), - retry_parameters=DEFAULT_RETRY_PARAMETERS, - ) - async def determine_best_enclosure(self, stories_id: int) -> Optional[StoryEnclosureDict]: - """ - Fetch a list of story enclosures, determine which one looks like a podcast episode the most. - - Uses or similar tag. - - :param stories_id: Story to fetch the enclosures for. - :return: Best enclosure metadata object (as dict), or None if no best enclosure could be determined. - """ - raise NotImplementedError - - @activity_method( - task_queue=TASK_QUEUE, - # With a super-slow server, it's probably reasonable to expect that it might take a few hours to fetch a single - # episode - start_to_close_timeout=timedelta(hours=2), - retry_parameters=dataclasses.replace( - DEFAULT_RETRY_PARAMETERS, - - # Wait for a minute before trying again - initial_interval=timedelta(minutes=1), - - # Hope for the server to resurrect in a week - maximum_interval=timedelta(weeks=1), - - # Don't kill ourselves trying to hit a permanently dead server - maximum_attempts=50, - ), - ) - async def fetch_enclosure_to_gcs(self, stories_id: int, enclosure: StoryEnclosureDict) -> None: - """ - Fetch enclosure and store it to GCS as an episode. - - Doesn't do transcoding or anything because transcoding or any subsequent steps might fail, and if they do, we - want to have the raw episode fetched and safely stored somewhere. - - :param stories_id: Story to fetch the enclosure for. - :param enclosure: Enclosure to fetch (as dict). - """ - raise NotImplementedError - - @activity_method( - task_queue=TASK_QUEUE, - - # Let's expect super long episodes or super slow servers - start_to_close_timeout=timedelta(hours=2), - - retry_parameters=dataclasses.replace( - DEFAULT_RETRY_PARAMETERS, - - # Wait for a minute before trying again (GCS might be down) - initial_interval=timedelta(minutes=1), - - # Hope for GCS to resurrect in a day - maximum_interval=timedelta(days=1), - - # Limit attempts because transcoding itself might be broken, and we don't want to be fetching huge objects - # from GCS periodically - maximum_attempts=20, - ), - ) - async def fetch_transcode_store_episode(self, stories_id: int) -> MediaFileInfoAudioStreamDict: - """ - Fetch episode from GCS, transcode it if needed and store it to GCS again in a separate bucket. - - Now that the raw episode file is safely located in GCS, we can try transcoding it. - - :param stories_id: Story ID the episode of which should be transcoded. - :return: Metadata of the best audio stream determined as part of the transcoding (as dict). - """ - raise NotImplementedError - - @activity_method( - task_queue=TASK_QUEUE, - - # Give a bit more time as the implementation is likely to do some non-Temporal retries on weird Speech API - # errors - start_to_close_timeout=timedelta(minutes=5), - - retry_parameters=dataclasses.replace( - DEFAULT_RETRY_PARAMETERS, - - # Given that the thing is costly, wait a whole hour before retrying anything - initial_interval=timedelta(hours=1), - - # Hope for the Speech API to resurrect in a week - maximum_interval=timedelta(weeks=1), - - # Don't retry too much as each try is potentially very costly - maximum_attempts=10, - ), - ) - async def submit_transcribe_operation(self, - stories_id: int, - episode_metadata: MediaFileInfoAudioStreamDict, - bcp47_language_code: str) -> str: - """ - Submit a long-running transcription operation to the Speech API. - - :param stories_id: Story ID of the episode which should be submitted for transcribing. - :param episode_metadata: Metadata of transcoded episode (as dict). - :param bcp47_language_code: BCP 47 language code of the story. - :return: Speech API operation ID for the transcription operation. - """ - raise NotImplementedError +class FeedsMergeActivities(object): + """Activities interface.""" @activity_method( task_queue=TASK_QUEUE, start_to_close_timeout=timedelta(seconds=60), retry_parameters=DEFAULT_RETRY_PARAMETERS, ) - async def fetch_store_raw_transcript_json(self, stories_id: int, speech_operation_id: str) -> None: - """ - Fetch a finished transcription and store the raw JSON of it into a GCS bucket. - - Raises an exception if the transcription operation is not finished yet. - - :param stories_id: Story ID the episode of which should be submitted for transcribing. - :param speech_operation_id: Speech API operation ID. - """ + async def migrate_child_entries(self, table: str, table_id_field: str, id_list: list, child_feed_id: int, + parent_feed_id: int) -> None: raise NotImplementedError @activity_method( @@ -189,25 +39,7 @@ async def fetch_store_raw_transcript_json(self, stories_id: int, speech_operatio start_to_close_timeout=timedelta(seconds=60), retry_parameters=DEFAULT_RETRY_PARAMETERS, ) - async def fetch_store_transcript(self, stories_id: int) -> None: - """ - Fetch a raw JSON transcript from a GCS bucket, store it to "download_texts". - - :param stories_id: Story ID the transcript of which should be stored into the database. - """ - raise NotImplementedError - - @activity_method( - task_queue=TASK_QUEUE, - start_to_close_timeout=timedelta(minutes=2), - retry_parameters=DEFAULT_RETRY_PARAMETERS, - ) - async def add_to_extraction_queue(self, stories_id: int) -> None: - """ - Add a story to the extraction queue. - - :param stories_id: Story ID to be added to the extraction queue. - """ + async def delete_child_entries(self, child_feed_id: int, table: str) -> None: raise NotImplementedError @@ -215,5 +47,5 @@ class FeedsMergeWorkflow(object): """Workflow interface.""" @workflow_method(task_queue=TASK_QUEUE) - async def merge_feeds(self, child_feeds_id: int, parent_feeds_id: int) -> None: + async def merge_feeds(self) -> None: raise NotImplementedError diff --git a/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py index cf408a7e98..5936b7528c 100644 --- a/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py +++ b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py @@ -1,6 +1,5 @@ -import os +import csv from datetime import timedelta -from typing import Union # noinspection PyPackageRequirements import pytest @@ -10,56 +9,60 @@ from temporal.workflow import WorkflowOptions from mediawords.db import connect_to_db -from mediawords.dbi.downloads.store import fetch_content +from mediawords.db.handler import DatabaseHandler from mediawords.test.db.create import create_test_feed, create_test_medium, create_test_story, create_download_for_feed -from mediawords.test.hash_server import HashServer from mediawords.util.log import create_logger -from mediawords.util.network import random_unused_port from mediawords.workflow.client import workflow_client from mediawords.workflow.worker import stop_worker_faster +from merge_media_sources import FeedsMergeWorkflow, FeedsMergeWorkflowImpl, FeedsMergeActivities, FeedsMergeActivitiesImpl log = create_logger(__name__) +def check_successful_feed_migration(db: DatabaseHandler, table: str, parent_feed_id: int) -> None: + results = db.select(table=f'{table}', what_to_select='*').hashes() + assert len(results) == 1 + assert results[0]['feeds_id'] == parent_feed_id + + @pytest.mark.asyncio -async def test_feeds_workflow(): +async def test_feeds_merge_workflow() -> None: db = connect_to_db() - test_medium = create_test_medium(db=db, label='test') - test_parent_feed = create_test_feed(db=db, label='parent', medium=test_medium) - test_child_feeds = [] - for i in range(1,4): - test_child_feeds.append(create_test_feed(db=db, label=F'child_feed_{str(i)}', medium=test_medium)) - - for feed in test_child_feeds: - create_download_for_feed(db=db, feed=feed) - create_test_story(db=db, label=F"story for {feed['label']}", feed=feed) - db.insert(table='scraped_feeds', insert_hash={ - 'feeds_id': int(feed['feeds_id']), - 'url': feed['url'], - 'scrape_date': 'NOW()', - 'import_module': 'mediawords' - }) - db.insert(table='feeds_from_yesterday', insert_hash={ - 'feeds_id': int(feed['feeds_id']), - 'media_id': int(test_medium['media_id']), - 'name': F"feed_from_yesterday_{feed['name']}", - 'url': feed['url'], - 'type': 'test', - 'active': True - }) - db.insert(table='feeds_tags_map', insert_hash={ - 'feeds_id': int(feed['feeds_id']), - 'tags_id': int(test_medium['media_id']), - }) - + child_feed = create_test_feed(db=db, label='child_feed', medium=test_medium) + parent_feed = create_test_feed(db=db, label='parent_feed', medium=test_medium) + + create_download_for_feed(db=db, feed=child_feed) + + db.insert(table='feeds_stories_map_p', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'stories_id': 1 + }) + db.insert(table='scraped_feeds', insert_hash={ + 'feeds_id': child_feed['feed_id'], + 'url': child_feed['url'], + 'scrape_date': 'NOW()', + 'import_module': 'mediawords' + }) + db.insert(table='feeds_from_yesterday', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'media_id': test_medium['media_id'], + 'name': F"feed_from_yesterday_{child_feed['name']}", + 'url': child_feed['url'], + 'type': 'test', + 'active': True + }) + db.insert(table='feeds_tags_map', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'tags_id': test_medium['media_id'], + }) client = workflow_client() # Start worker factory = WorkerFactory(client=client, namespace=client.namespace) - worker = factory.new_worker(task_queue=TASK_QUEUE) + worker = factory.new_worker(task_queue="merge-feeds") activities = FeedsMergeActivities() @@ -71,32 +74,29 @@ async def test_feeds_workflow(): factory.start() # Initialize workflow instance + workflow: FeedsMergeWorkflow = client.new_workflow_stub( cls=FeedsMergeWorkflow, - workflow_options=WorkflowOptions( - workflow_id=str(stories_id), - - # By default, if individual activities of the workflow fail, they will get restarted pretty much - # indefinitely, and so this test might run for days (or rather just timeout on the CI). So we cap the - # workflow so that if it doesn't manage to complete in X minutes, we consider it as failed. - workflow_run_timeout=timedelta(minutes=5), - - ), + workflow_options=WorkflowOptions(workflow_id='test'), ) + # Fire and forget as the workflow will do everything (including adding a extraction job) itself + await client.start(workflow.merge_feeds, child_feed['feeds_id'], parent_feed['feeds_id']) # Wait for the workflow to complete - await workflow.merge_feeds(feeds_id=feed[], parent_feeds_id=test_parent_feed['feeds_id']) + await workflow.merge_feeds( child_feed['feeds_id'], parent_feed['feeds_id']) downloads = db.select(table='downloads', what_to_select='*').hashes() assert len(downloads) == 1 first_download = downloads[0] - assert first_download['stories_id'] == stories_id - assert first_download['type'] == 'content' - assert first_download['state'] == 'success' + assert first_download['feeds_id'] == parent_feed['feeds_id'] + + tables = ['downloads', 'scraped_feeds', 'feeds_from_yesterday', 'feeds_tags_map', 'feeds_stories_map_p'] + for table in tables: + check_successful_feed_migration(db, table, parent_feed['feeds_id']) + results = db.select(table='feeds', what_to_select='*', condition_hash={'id': child_feed['feeds_id']}).hashes() + assert len(results) == 0 - # Initiate the worker shutdown in the background while we do the GCS cleanup so that the stop_workers_faster() - # doesn't have to wait that long await worker.stop(background=True) log.info("Stopping workers...")