diff --git a/setup.cfg b/setup.cfg index 0541f947..84e2db7a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ install_requires = six >= 1.10 requests >= 2.28.1 typing-extensions >= 4.2.0 + astor >= 0.8.1 [options.packages.find] where = src diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 5bf3eae5..48aa8ced 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -8,6 +8,7 @@ from multiprocessing import Process, freeze_support from typing import List import ast +import astor import inspect import logging import os @@ -37,7 +38,7 @@ def __init__( ): if not isinstance(workers, list): workers = [workers] - if scan_for_annotated_workers is not False: + if scan_for_annotated_workers is True: for worker in get_annotated_workers(): workers.append(worker) self.__create_task_runner_processes( @@ -174,9 +175,9 @@ def __get_annotated_workers_from_subtree(pkg): if not isinstance(node, ast.FunctionDef): continue for decorator in node.decorator_list: - decorator_type, params = __extract_decorator_info( + params = __extract_decorator_info( decorator) - if decorator_type != 'WorkerTask': + if params is None: continue try: worker = __create_worker_from_ast_node( @@ -194,18 +195,24 @@ def __extract_decorator_info(decorator): if not isinstance(decorator, ast.Call): return None, None decorator_type = None - decorator_params = {} decorator_func = decorator.func if isinstance(decorator_func, ast.Attribute): decorator_type = decorator_func.attr elif isinstance(decorator_func, ast.Name): decorator_type = decorator_func.id + if decorator_type != 'WorkerTask': + return None + decorator_params = {} + if decorator.args: + for arg in decorator.args: + arg_value = astor.to_source(arg).strip() + decorator_params[arg_value] = ast.literal_eval(arg) if decorator.keywords: for keyword in decorator.keywords: param_name = keyword.arg - param_value = keyword.value.value + param_value = ast.literal_eval(keyword.value) decorator_params[param_name] = param_value - return decorator_type, decorator_params + return decorator_params def __create_worker_from_ast_node(node, params): diff --git a/tests/integration/resources/worker/python/python_worker.py b/tests/integration/resources/worker/python/python_worker.py index b928f1cd..fd320833 100644 --- a/tests/integration/resources/worker/python/python_worker.py +++ b/tests/integration/resources/worker/python/python_worker.py @@ -42,7 +42,12 @@ def get_domain(self) -> str: @WorkerTask(task_definition_name='test_python_decorated_worker') -def decorated_worker(input) -> object: +def simple_decorated_worker(input) -> object: + return {'message': 'python is so cool :)'} + + +@WorkerTask('test_python_decorated_worker', poll_interval_seconds=0.050) +def decorated_worker_with_poll_interval() -> object: return {'message': 'python is so cool :)'} diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index 310f687c..0b978514 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -40,7 +40,8 @@ def run_workflow_execution_tests(configuration: Configuration, workflow_executor generate_worker(worker_with_task_input_and_generic_output), generate_worker(worker_with_task_input_and_task_result_output), ], - configuration=configuration + configuration=configuration, + scan_for_annotated_workers=True, ) task_handler.start_processes() try: