Skip to content

Commit

Permalink
Split _cwl_insar from _cwl_demo #1034
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Feb 6, 2025
1 parent 87b45ce commit d6b3224
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
90 changes: 83 additions & 7 deletions openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
Script to start a production server on Kubernetes. This script can serve as the mainApplicationFile for the SparkApplication custom resource of the spark-operator
"""

import base64
import json
import logging
import os
import re
import textwrap
from pathlib import Path

from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME

Expand Down Expand Up @@ -152,17 +154,13 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):
launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
f"""
"""
cwlVersion: v1.0
class: CommandLineTool
baseCommand: echo
requirements:
DockerRequirement:
dockerPull: registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/openeo_insar:latest
EnvVarRequirement:
envDef:
AWS_ACCESS_KEY_ID: {json.dumps(os.environ.get("AWS_ACCESS_KEY_ID", ""))}
AWS_SECRET_ACCESS_KEY: {json.dumps(os.environ.get("AWS_SECRET_ACCESS_KEY", ""))}
- class: DockerRequirement
dockerPull: debian:stretch-slim
inputs:
message:
type: string
Expand Down Expand Up @@ -192,6 +190,84 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):
return results["output.txt"].read(encoding="utf8")


@non_standard_process(
ProcessSpec(id="_cwl_insar", description="Proof-of-concept process to run CWL based inSAR.")
.param(name="spatial_extent", description="Spatial extent.", schema={"type": "dict"}, required=False)
.param(name="temporal_extent", description="Temporal extent.", schema={"type": "dict"}, required=False)
.returns(description="the data as a data cube", schema={})
)
def _cwl_insar(args: ProcessArgs, env: EvalEnv):
"""Proof of concept openEO process to run CWL based processing"""
spatial_extent = args.get_optional(
"spatial_extent",
default=None,
)
temporal_extent = args.get_optional(
"temporal_extent",
default=None,
)

if env.get(ENV_DRY_RUN_TRACER):
return "dummy"

from openeo_driver import dry_run

# source_id = dry_run.DataSource.load_disk_data(**kwargs).get_source_id()
# load_params = _extract_load_parameters(env, source_id=source_id)

# TODO: move this imports to top-level?
import kubernetes.config

from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher

# TODO: better place to load this config?
if os.path.exists(SERVICE_TOKEN_FILENAME):
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_kube_config()

launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
f"""
cwlVersion: v1.0
class: CommandLineTool
baseCommand: insar.py
requirements:
DockerRequirement:
dockerPull: registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/openeo_insar:latest
EnvVarRequirement:
envDef:
AWS_ACCESS_KEY_ID: {json.dumps(os.environ.get("AWS_ACCESS_KEY_ID", ""))}
AWS_SECRET_ACCESS_KEY: {json.dumps(os.environ.get("AWS_SECRET_ACCESS_KEY", ""))}
inputs:
input_base64_json:
type: string
inputBinding:
position: 1
outputs:
output_file:
type:
type: array
items: File
outputBinding:
glob: "*.*"
"""
)
# correlation_id = get_job_id(default=None) or get_request_id(default=None)
input_base64_json = base64.b64encode(json.dumps(args).encode("utf8")).decode("ascii")
cwl_arguments = ["--input_base64_json", input_base64_json]

# TODO: Load the results as datacube with load_stac.
results = launcher.run_cwl_workflow(
cwl_content=cwl_content,
cwl_arguments=cwl_arguments,
output_paths=["output.txt"],
)

return results["output.txt"].read(encoding="utf8")



if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion openeogeotrellis/integrations/calrissian.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def launch_job_and_wait(
manifest: kubernetes.client.V1Job,
*,
sleep: float = 5,
timeout: float = 60,
timeout: float = 900,
) -> kubernetes.client.V1Job:
"""
Launch a k8s job and wait (with active polling) for it to finish.
Expand Down Expand Up @@ -326,6 +326,7 @@ def run_cwl_workflow(
# Input staging
input_staging_manifest, cwl_path = self.create_input_staging_job_manifest(cwl_content=cwl_content)
input_staging_job = self.launch_job_and_wait(manifest=input_staging_manifest)
assert input_staging_job.status.succeeded

# CWL job
cwl_manifest, relative_output_dir = self.create_cwl_job_manifest(
Expand Down

0 comments on commit d6b3224

Please sign in to comment.