Skip to content

Commit

Permalink
Merge pull request #186 from conductor-sdk/fix-worker-annotation
Browse files Browse the repository at this point in the history
Fixed worker annotation discovery
  • Loading branch information
gardusig authored Jun 5, 2023
2 parents f0e8c89 + 7a20f32 commit d0f1cd2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ install_requires =
six >= 1.10
requests >= 2.28.1
typing-extensions >= 4.2.0
astor >= 0.8.1

[options.packages.find]
where = src
19 changes: 13 additions & 6 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from multiprocessing import Process, freeze_support
from typing import List
import ast
import astor
import inspect
import logging
import os
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
):
if not isinstance(workers, list):
workers = [workers]
if scan_for_annotated_workers is not False:
if scan_for_annotated_workers is True:
for worker in get_annotated_workers():
workers.append(worker)
self.__create_task_runner_processes(
Expand Down Expand Up @@ -174,9 +175,9 @@ def __get_annotated_workers_from_subtree(pkg):
if not isinstance(node, ast.FunctionDef):
continue
for decorator in node.decorator_list:
decorator_type, params = __extract_decorator_info(
params = __extract_decorator_info(
decorator)
if decorator_type != 'WorkerTask':
if params is None:
continue
try:
worker = __create_worker_from_ast_node(
Expand All @@ -194,18 +195,24 @@ def __extract_decorator_info(decorator):
if not isinstance(decorator, ast.Call):
return None, None
decorator_type = None
decorator_params = {}
decorator_func = decorator.func
if isinstance(decorator_func, ast.Attribute):
decorator_type = decorator_func.attr
elif isinstance(decorator_func, ast.Name):
decorator_type = decorator_func.id
if decorator_type != 'WorkerTask':
return None
decorator_params = {}
if decorator.args:
for arg in decorator.args:
arg_value = astor.to_source(arg).strip()
decorator_params[arg_value] = ast.literal_eval(arg)
if decorator.keywords:
for keyword in decorator.keywords:
param_name = keyword.arg
param_value = keyword.value.value
param_value = ast.literal_eval(keyword.value)
decorator_params[param_name] = param_value
return decorator_type, decorator_params
return decorator_params


def __create_worker_from_ast_node(node, params):
Expand Down
7 changes: 6 additions & 1 deletion tests/integration/resources/worker/python/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ def get_domain(self) -> str:


@WorkerTask(task_definition_name='test_python_decorated_worker')
def decorated_worker(input) -> object:
def simple_decorated_worker(input) -> object:
return {'message': 'python is so cool :)'}


@WorkerTask('test_python_decorated_worker', poll_interval_seconds=0.050)
def decorated_worker_with_poll_interval() -> object:
return {'message': 'python is so cool :)'}


Expand Down
3 changes: 2 additions & 1 deletion tests/integration/workflow/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def run_workflow_execution_tests(configuration: Configuration, workflow_executor
generate_worker(worker_with_task_input_and_generic_output),
generate_worker(worker_with_task_input_and_task_result_output),
],
configuration=configuration
configuration=configuration,
scan_for_annotated_workers=True,
)
task_handler.start_processes()
try:
Expand Down

0 comments on commit d0f1cd2

Please sign in to comment.