From e8ed833a1464f4f14f99ac0bf88cbdce0dc40f9b Mon Sep 17 00:00:00 2001 From: gardusig Date: Sun, 4 Jun 2023 22:46:05 -0300 Subject: [PATCH 1/3] Fixed worker annotation discovery --- src/conductor/client/automator/task_handler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 5bf3eae5..1088f632 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -174,9 +174,9 @@ def __get_annotated_workers_from_subtree(pkg): if not isinstance(node, ast.FunctionDef): continue for decorator in node.decorator_list: - decorator_type, params = __extract_decorator_info( + params = __extract_decorator_info( decorator) - if decorator_type != 'WorkerTask': + if params is None: continue try: worker = __create_worker_from_ast_node( @@ -194,18 +194,20 @@ def __extract_decorator_info(decorator): if not isinstance(decorator, ast.Call): return None, None decorator_type = None - decorator_params = {} decorator_func = decorator.func if isinstance(decorator_func, ast.Attribute): decorator_type = decorator_func.attr elif isinstance(decorator_func, ast.Name): decorator_type = decorator_func.id + if decorator_type != 'WorkerTask': + return None + decorator_params = {} if decorator.keywords: for keyword in decorator.keywords: param_name = keyword.arg param_value = keyword.value.value decorator_params[param_name] = param_value - return decorator_type, decorator_params + return decorator_params def __create_worker_from_ast_node(node, params): From 572b2d1bb18f2aca0abd2b7627174af3c5eae432 Mon Sep 17 00:00:00 2001 From: gardusig Date: Mon, 5 Jun 2023 11:34:15 -0300 Subject: [PATCH 2/3] Fixed worker annotation discovery --- setup.cfg | 1 + src/conductor/client/automator/task_handler.py | 9 +++++++-- .../integration/resources/worker/python/python_worker.py | 7 ++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/setup.cfg b/setup.cfg index 0541f947..84e2db7a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ install_requires = six >= 1.10 requests >= 2.28.1 typing-extensions >= 4.2.0 + astor >= 0.8.1 [options.packages.find] where = src diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 1088f632..48aa8ced 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -8,6 +8,7 @@ from multiprocessing import Process, freeze_support from typing import List import ast +import astor import inspect import logging import os @@ -37,7 +38,7 @@ def __init__( ): if not isinstance(workers, list): workers = [workers] - if scan_for_annotated_workers is not False: + if scan_for_annotated_workers is True: for worker in get_annotated_workers(): workers.append(worker) self.__create_task_runner_processes( @@ -202,10 +203,14 @@ def __extract_decorator_info(decorator): if decorator_type != 'WorkerTask': return None decorator_params = {} + if decorator.args: + for arg in decorator.args: + arg_value = astor.to_source(arg).strip() + decorator_params[arg_value] = ast.literal_eval(arg) if decorator.keywords: for keyword in decorator.keywords: param_name = keyword.arg - param_value = keyword.value.value + param_value = ast.literal_eval(keyword.value) decorator_params[param_name] = param_value return decorator_params diff --git a/tests/integration/resources/worker/python/python_worker.py b/tests/integration/resources/worker/python/python_worker.py index b928f1cd..fd320833 100644 --- a/tests/integration/resources/worker/python/python_worker.py +++ b/tests/integration/resources/worker/python/python_worker.py @@ -42,7 +42,12 @@ def get_domain(self) -> str: @WorkerTask(task_definition_name='test_python_decorated_worker') -def decorated_worker(input) -> object: +def simple_decorated_worker(input) -> object: + return {'message': 'python is so cool :)'} + + +@WorkerTask('test_python_decorated_worker', poll_interval_seconds=0.050) +def decorated_worker_with_poll_interval() -> object: return {'message': 'python is so cool :)'} From 7a20f32a6570be93e70387f5ef1b068b7d5bbd5e Mon Sep 17 00:00:00 2001 From: gardusig Date: Mon, 5 Jun 2023 11:47:07 -0300 Subject: [PATCH 3/3] Enable worker discovery at tests --- tests/integration/workflow/test_workflow_execution.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index 310f687c..0b978514 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -40,7 +40,8 @@ def run_workflow_execution_tests(configuration: Configuration, workflow_executor generate_worker(worker_with_task_input_and_generic_output), generate_worker(worker_with_task_input_and_task_result_output), ], - configuration=configuration + configuration=configuration, + scan_for_annotated_workers=True, ) task_handler.start_processes() try: