Skip to content

Commit

Permalink
Merge pull request #624 from DalgoT4D/615-run-workspace-source-connec…
Browse files Browse the repository at this point in the history
…tor-addition-via-celery

615 run workspace source connector addition via celery
  • Loading branch information
fatchat authored May 25, 2024
2 parents 2202628 + d284622 commit b3cc806
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
19 changes: 19 additions & 0 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ddpui.utils.helpers import runcmd, runcmd_with_output, subprocess
from ddpui.utils import secretsmanager
from ddpui.utils.taskprogress import TaskProgress
from ddpui.ddpairbyte import airbyte_service
from ddpui.ddpprefect.prefect_service import (
update_dbt_core_block_schema,
get_dbt_cli_profile_block,
Expand Down Expand Up @@ -504,6 +505,24 @@ def sync_flow_runs_of_deployments(self, deployment_ids: list[str] = None):
continue


@app.task(bind=True)
def add_custom_connectors_to_workspace(self, workspace_id, custom_sources: list[dict]):
"""
This function will add custom sources to a workspace
"""
for source in custom_sources:
airbyte_service.create_custom_source_definition(
workspace_id=workspace_id,
name=source["name"],
docker_repository=source["docker_repository"],
docker_image_tag=source["docker_image_tag"],
documentation_url=source["documentation_url"],
)
logger.info(
f"added custom source {source['name']} [{source['docker_repository']}:{source['docker_image_tag']}]"
)


@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""check for old locks every minute"""
Expand Down
3 changes: 2 additions & 1 deletion ddpui/core/orgfunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def create_organization(payload: OrgSchema):

try:
airbytehelpers.setup_airbyte_workspace_v1(org.slug, org)
except Exception:
except Exception as err:
logger.error("could not create airbyte workspace: " + str(err))
# delete the org or we won't be able to create it once airbyte comes back up
org.delete()
return None, "could not create airbyte workspace"
Expand Down
19 changes: 6 additions & 13 deletions ddpui/ddpairbyte/airbytehelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ddpui.assets.whitelist import DEMO_WHITELIST_SOURCES
from ddpui.core.pipelinefunctions import setup_airbyte_sync_task_config
from ddpui.core.orgtaskfunctions import fetch_orgtask_lock
from ddpui.celeryworkers.tasks import add_custom_connectors_to_workspace

logger = CustomLogger("airbyte")

Expand Down Expand Up @@ -119,19 +120,6 @@ def setup_airbyte_workspace_v1(wsname: str, org: Org) -> AirbyteWorkspace:
org.airbyte_workspace_id = workspace["workspaceId"]
org.save()

try:
for custom_source_info in settings.AIRBYTE_CUSTOM_SOURCES.values():
add_custom_airbyte_connector(
workspace["workspaceId"],
custom_source_info["name"],
custom_source_info["docker_repository"],
custom_source_info["docker_image_tag"],
custom_source_info["documentation_url"],
)
except Exception as error:
logger.error("Error creating custom source definitions: %s", str(error))
raise error

# Airbyte server block details. prefect doesn't know the workspace id
block_name = f"{org.slug}-{slugify(AIRBYTESERVER)}"

Expand Down Expand Up @@ -165,6 +153,11 @@ def setup_airbyte_workspace_v1(wsname: str, org: Org) -> AirbyteWorkspace:
"could not create orgprefectblock for airbyte-server"
) from error

# add custom sources to this workspace
add_custom_connectors_to_workspace.delay(
workspace["workspaceId"], list(settings.AIRBYTE_CUSTOM_SOURCES.values())
)

return AirbyteWorkspace(
name=workspace["name"],
workspaceId=workspace["workspaceId"],
Expand Down
29 changes: 19 additions & 10 deletions ddpui/tests/helper/test_airbytehelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,34 @@ def test_upgrade_custom_sources_add(
mock_create_workspace=Mock(),
)
@patch(
"ddpui.ddpairbyte.airbytehelpers.add_custom_airbyte_connector",
mock_add_custom_airbyte_connector=Mock(),
"ddpui.ddpairbyte.airbytehelpers.prefect_service.get_airbyte_server_block_id",
mock_get_airbyte_server_block_id=Mock(),
)
@patch(
"ddpui.ddpairbyte.airbytehelpers.prefect_service.delete_airbyte_server_block",
mock_delete_airbyte_server=Mock(),
)
def test_setup_airbyte_workspace_v1_fail(
mock_add_custom_airbyte_connector: Mock, create_workspace: Mock
mock_delete_airbyte_server: Mock,
mock_get_airbyte_server_block_id: Mock,
create_workspace: Mock,
):
"""failing test"""
org_save = Mock()
org = Mock(save=org_save)
org = Org.objects.create(name="org", slug="org")
create_workspace.return_value = {
"workspaceId": "wsid",
"initialSetupComplete": False,
"name": "sda",
}
mock_add_custom_airbyte_connector.side_effect = Exception("error")
with pytest.raises(Exception):
setup_airbyte_workspace_v1("workspace_name", org)
org_save.assert_called_once()
mock_get_airbyte_server_block_id.return_value = "blockid"
with patch(
"ddpui.celeryworkers.tasks.add_custom_connectors_to_workspace.delay",
side_effect=Exception("error"),
) as delay:
with pytest.raises(Exception):
setup_airbyte_workspace_v1("workspace_name", org)
delay.assert_called_once()
assert org.airbyte_workspace_id == "wsid"
mock_add_custom_airbyte_connector.assert_called_once()


@patch(
Expand Down

0 comments on commit b3cc806

Please sign in to comment.