Skip to content

Commit

Permalink
gateway scheduler refactor prototype
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiko Kuroda <[email protected]>
  • Loading branch information
akihikokuroda committed Nov 11, 2023
1 parent 0449a5b commit 22558ef
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 136 deletions.
130 changes: 5 additions & 125 deletions charts/quantum-serverless/charts/gateway/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ spec:
- name: gateway-pv-storage
persistentVolumeClaim:
claimName: {{ .Values.cos.claimName }}
- name: ray-cluster-template
configMap:
name: rayclustertemplate
containers:
- name: {{ .Chart.Name }}
securityContext:
Expand All @@ -81,6 +84,8 @@ spec:
volumeMounts:
- mountPath: "/usr/src/app/media/"
name: gateway-pv-storage
- mountPath: "/tmp/templates/"
name: ray-cluster-template
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
Expand Down Expand Up @@ -169,119 +174,6 @@ spec:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.databasePassword }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: scheduler
labels:
{{- include "scheduler.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: 1
{{- end }}
selector:
matchLabels:
{{- include "scheduler.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "scheduler.selectorLabels" . | nindent 8 }}
spec:
volumes:
- name: gateway-pv-storage
persistentVolumeClaim:
claimName: {{ .Values.cos.claimName }}
- name: ray-cluster-template
configMap:
name: rayclustertemplate
serviceAccountName: {{ include "gateway.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
initContainers:
- name: waitpostresql
image: actions/pg_isready
command: ['sh', '-c', 'until pg_isready -U ${DATABASE_USER} -d "dbname=${DATABASE_NAME}" -h ${DATABASE_HOST} -p ${DATABASE_PORT}; do echo waiting for myservice; sleep 2; done']
env:
- name: DATABASE_HOST
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.host }}
- name: DATABASE_PORT
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.port }}
- name: DATABASE_NAME
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.databaseName }}
- name: DATABASE_USER
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.userName }}
containers:
- name: gateway-scheduler
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command: ["./scripts/scheduler.sh"]
volumeMounts:
- mountPath: "/usr/src/app/media/"
name: gateway-pv-storage
- mountPath: "/tmp/templates/"
name: ray-cluster-template
env:
- name: DEBUG
value: {{ .Values.application.debug | quote }}
- name: DJANGO_SECRET_KEY
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.secretKey.name }}
key: {{ .Values.secrets.secretKey.key }}
- name: DATABASE_HOST
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.host }}
- name: DATABASE_PORT
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.port }}
- name: DATABASE_NAME
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.databaseName }}
- name: DATABASE_USER
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.userName }}
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: {{ .Values.secrets.servicePsql.name }}
key: {{ .Values.secrets.servicePsql.key.databasePassword }}
- name: RAY_KUBERAY_NAMESPACE
value: {{ .Release.Namespace }}
- name: RAY_NODE_IMAGE
Expand All @@ -294,18 +186,6 @@ spec:
- name: RAY_CLUSTER_NO_DELETE_ON_COMPLETE
value: "True"
{{- end }}
- name: OTEL_ENABLED
value: {{ .Values.application.ray.openTelemetryCollector.enabled | quote }}
- name: OTEL_SERVICE_NAME
value: "Gateway"
- name: OTEL_TRACES_EXPORTER
value: console,otlp
- name: OTEL_METRICS_EXPORTER
value: console
- name: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
value: {{ .Values.application.ray.openTelemetryCollector.host }}:{{ .Values.application.ray.openTelemetryCollector.port }}
- name: OTEL_EXPORTER_OTLP_TRACES_INSECURE
value: {{ .Values.application.ray.openTelemetryCollector.insecure | quote }}
- name: PROGRAM_TIMEOUT
value: {{ .Values.application.limits.programTimeoutDays | quote }}
{{- with .Values.nodeSelector }}
Expand Down
1 change: 1 addition & 0 deletions client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RayProvider,
LocalProvider,
save_result,
set_status,
)
from .quantum_serverless import (
QuantumServerless,
Expand Down
1 change: 1 addition & 0 deletions client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
LocalJobClient,
Job,
save_result,
set_status,
)
from .pattern import (
QiskitPattern,
Expand Down
32 changes: 32 additions & 0 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,35 @@ def _map_status_to_serverless(status: str) -> str:
return status_map[status]
except KeyError:
return status


def set_status(status: str):
"""set job status."""

version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION)
if version is None:
version = GATEWAY_PROVIDER_VERSION_DEFAULT

token = os.environ.get(ENV_JOB_GATEWAY_TOKEN)
if token is None:
logging.warning(
"Results will be saved as logs since"
"there is no information about the"
"authorization token in the environment."
)
return False

url = (
f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/"
f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/status/"
)
response = requests.post(
url,
data={"status": status},
headers={"Authorization": f"Bearer {token}"},
timeout=REQUESTS_TIMEOUT,
)
if not response.ok:
logging.warning("Something went wrong: %s", response.text)

return response.ok
25 changes: 24 additions & 1 deletion gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def submit(self, job: Job) -> Optional[str]:
)
file.extractall(extract_folder)

entrypoint = f"python {program.entrypoint}"
entrypoint = f"python launcher.py {program.entrypoint}"
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
env_w_span = json.loads(job.env_vars)
Expand All @@ -87,6 +87,29 @@ def submit(self, job: Job) -> Optional[str]:
except KeyError:
pass

f = open(extract_folder + "/launcher.py", "w")
f.write(
'''
import subprocess
from subprocess import Popen
import sys
from quantum_serverless import set_status
with Popen(
["python", sys.argv[1]],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
) as pipe:
status = "SUCCEEDED"
if pipe.wait():
status = "FAILED"
output, _ = pipe.communicate()
print(output)
set_status(status)
'''
)
f.close()

ray_job_id = retry_function(
callback=lambda: self.client.submit_job(
entrypoint=entrypoint,
Expand Down
43 changes: 42 additions & 1 deletion gateway/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def run_existing(self, request):
author=request.user,
status=Job.QUEUED,
)
job.save()

carrier = {}
TraceContextTextMapPropagator().inject(carrier)
Expand Down Expand Up @@ -229,6 +228,48 @@ def result(self, request, pk=None): # pylint: disable=invalid-name,unused-argum
serializer = self.get_serializer(job)
return Response(serializer.data)

@action(methods=["POST"], detail=True)
def status(self, request, pk=None): # pylint: disable=invalid-name,unused-argument
"""set status of a job."""
tracer = trace.get_tracer("gateway.tracer")
ctx = TraceContextTextMapPropagator().extract(carrier=request.headers)
with tracer.start_as_current_span("gateway.job.status", context=ctx):
org = self.get_object()
job_handler = get_job_handler(org.compute_resource.host)
if job_handler:
logs = job_handler.logs(org.ray_job_id)

saved = False
attempts_left = 10
while not saved:
if attempts_left <= 0:
return Response(
{"error": "All attempts to save results failed."}, status=500
)

attempts_left -= 1

try:
job = self.get_object()
job.status = request.data.get("status")
job.logs = logs
job.save()
saved = True
print("Setting status")
print(request.data.get("status"))
except RecordModifiedError:
logger.warning(
"Job[%s] record has not been updated due to lock. "
"Retrying. Attempts left %s",
job.id,
attempts_left,
)
continue
time.sleep(1)

serializer = self.get_serializer(job)
return Response(serializer.data)

@action(methods=["GET"], detail=True)
def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argument
"""Returns logs from job."""
Expand Down
17 changes: 17 additions & 0 deletions gateway/main/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db.models.signals import post_save
from django.dispatch import receiver
from api.models import Job
from api.management.commands.schedule_queued_jobs import Command as Schedule
from api.management.commands.free_resources import Command as Free


@receiver(post_save, sender=Job)
def save_job(sender, instance, created, **kwargs):
print("save_job")
print(instance)
print("Free")
free = Free()
free.handle()
print("Schedule")
schedule = Schedule()
schedule.handle()
1 change: 1 addition & 0 deletions gateway/main/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from django.urls import path, include, re_path
from django.views.generic import TemplateView
from rest_framework import routers
from .signals import save_job

from api.views import KeycloakLogin, KeycloakUsersView
import probes.views
Expand Down
9 changes: 0 additions & 9 deletions gateway/scripts/scheduler.sh

This file was deleted.

0 comments on commit 22558ef

Please sign in to comment.