Skip to content

Commit

Permalink
_cwl_insar and varia. #1034
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Feb 6, 2025
1 parent 0d5f3e0 commit e7f38b2
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 2 deletions.
87 changes: 86 additions & 1 deletion openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
Script to start a production server on Kubernetes. This script can serve as the mainApplicationFile for the SparkApplication custom resource of the spark-operator
"""

import base64
import json
import logging
import os
import re
import textwrap

from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME

from openeo_driver.processes import ProcessArgs
from openeo_driver.ProcessGraphDeserializer import (
ENV_DRY_RUN_TRACER,
Expand Down Expand Up @@ -141,7 +145,10 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):
from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher

# TODO: better place to load this config?
kubernetes.config.load_incluster_config()
if os.path.exists(SERVICE_TOKEN_FILENAME):
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_kube_config()

launcher = CalrissianJobLauncher.from_context()

Expand Down Expand Up @@ -183,5 +190,83 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):



@non_standard_process(
ProcessSpec(id="_cwl_insar", description="Proof-of-concept process to run CWL based inSAR.")
.param(name="spatial_extent", description="Spatial extent.", schema={"type": "dict"}, required=False)
.param(name="temporal_extent", description="Temporal extent.", schema={"type": "dict"}, required=False)
.returns(description="the data as a data cube", schema={})
)
def _cwl_insar(args: ProcessArgs, env: EvalEnv):
"""Proof of concept openEO process to run CWL based processing"""
spatial_extent = args.get_optional(
"spatial_extent",
default=None,
)
temporal_extent = args.get_optional(
"temporal_extent",
default=None,
)

if env.get(ENV_DRY_RUN_TRACER):
return "dummy"

from openeo_driver import dry_run

# source_id = dry_run.DataSource.load_disk_data(**kwargs).get_source_id()
# load_params = _extract_load_parameters(env, source_id=source_id)

# TODO: move this imports to top-level?
import kubernetes.config

from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher

# TODO: better place to load this config?
if os.path.exists(SERVICE_TOKEN_FILENAME):
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_kube_config()

launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
f"""
cwlVersion: v1.0
class: CommandLineTool
baseCommand: insar.py
requirements:
DockerRequirement:
dockerPull: registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/openeo_insar:latest
EnvVarRequirement:
envDef:
AWS_ACCESS_KEY_ID: {json.dumps(os.environ.get("AWS_ACCESS_KEY_ID", ""))}
AWS_SECRET_ACCESS_KEY: {json.dumps(os.environ.get("AWS_SECRET_ACCESS_KEY", ""))}
inputs:
input_base64_json:
type: string
inputBinding:
position: 1
outputs:
output_file:
type:
type: array
items: File
outputBinding:
glob: "*.*"
"""
)
# correlation_id = get_job_id(default=None) or get_request_id(default=None)
input_base64_json = base64.b64encode(json.dumps(args).encode("utf8")).decode("ascii")
cwl_arguments = ["--input_base64_json", input_base64_json]

# TODO: Load the results as datacube with load_stac.
results = launcher.run_cwl_workflow(
cwl_content=cwl_content,
cwl_arguments=cwl_arguments,
output_paths=["output.txt"],
)

return results["output.txt"].read(encoding="utf8")


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions openeogeotrellis/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from openeo_driver.utils import smart_bool
from openeo_driver.views import build_app
from openeogeotrellis.config import get_backend_config
from . import kube

_log = logging.getLogger(__name__)

Expand Down
6 changes: 5 additions & 1 deletion openeogeotrellis/integrations/calrissian.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import kubernetes.client
import time

import yaml

from openeo.util import ContextTimer
from openeo_driver.utils import generate_unique_id
from openeogeotrellis.config import get_backend_config
Expand Down Expand Up @@ -106,6 +108,8 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete
"""
name = self._build_unique_name(infix="cal-inp")
_log.info(f"Creating input staging job manifest: {name=}")
yaml_parsed = list(yaml.safe_load_all(cwl_content))
assert len(yaml_parsed) >= 1

# Serialize CWL content to string that is safe to pass as command line argument
cwl_serialized = base64.b64encode(cwl_content.encode("utf8")).decode("ascii")
Expand Down Expand Up @@ -253,7 +257,7 @@ def launch_job_and_wait(
manifest: kubernetes.client.V1Job,
*,
sleep: float = 5,
timeout: float = 60,
timeout: float = 900,
) -> kubernetes.client.V1Job:
"""
Launch a k8s job and wait (with active polling) for it to finish.
Expand Down

0 comments on commit e7f38b2

Please sign in to comment.