diff --git a/.github/workflows/sqaaas.yaml b/.github/workflows/sqaaas.yaml new file mode 100644 index 00000000..9813070f --- /dev/null +++ b/.github/workflows/sqaaas.yaml @@ -0,0 +1,25 @@ +name: SQAaaS OSCAR + +on: + push: + branches: ["sqa"] + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10 +jobs: + + sqaaas_job: + runs-on: ubuntu-latest + steps: + - name: Add tox unit test step definition for a SQAaaS assessment + uses: eosc-synergy/sqaaas-step-action@v1 + id: go_unit_test + with: + name: go_unit_test + container: "golang:1.21.4-alpine3.18" + tool: commands + commands: "go test ./... -v" + + - name: SQAaaS assessment step + uses: eosc-synergy/sqaaas-assessment-action@v2 + with: + qc_uni_steps: go_unit_test diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 3017a1ad..9d0cb12f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -8,10 +8,10 @@ jobs: runs-on: ubuntu-20.04 steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: '1.21' diff --git a/README.md b/README.md index a3d39b88..730e88f4 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,12 @@ license text. This development is partially funded by the [EGI Strategic and Innovation Fund](https://www.egi.eu/about/egi-council/egi-strategic-and-innovation-fund/). -Partially funded by the project [AI-SPRINT](https://ai-sprint-project.eu) "AI in Secure Privacy-Preserving Computing Continuum" that has received funding from the European Union’s Horizon 2020 Research and Innovation Programme under Grant 101016577. +Partially funded by the projects: + +- [AI-SPRINT](https://ai-sprint-project.eu) "AI in Secure Privacy-Preserving Computing Continuum" that has received funding from the European Union’s Horizon 2020 Research and Innovation Programme under Grant 101016577. +- [interTwin](https://intertwin.eu) "An interdisciplinary Digital Twin Engine for science" that has received funding from the European Union’s Horizon Europe Programme under Grant 101058386. +- [AI4EOSC](https://ai4eosc.eu) "Artificial Intelligence for the European Open Science Cloud" that has received funding from the European Union’s Horizon Europe Research and Innovation Programme under Grant 101058593." +- [iMagine](http://imagine-ai.eu) "AI-based image data analysis tools for aquatic research" that has received funding from the European Union’s Horizon Europe Research and Innovation Programme under Grant 101058625. Also, Grant PDC2021-120844-I00 funded by Ministerio de Ciencia e Innovación/Agencia Estatal de Investigación/ 10.13039/501100011033 and by “European Union NextGenerationEU/PRTR” and Grant PID2020-113126RB-I00 funded by Ministerio de Ciencia e Innovación/Agencia Estatal de Investigación/ 10.13039/501100011033. diff --git a/docs/images/bucket-list.png b/docs/images/bucket-list.png deleted file mode 100644 index dba9aa46..00000000 Binary files a/docs/images/bucket-list.png and /dev/null differ diff --git a/docs/images/minio-ui.png b/docs/images/minio-ui.png deleted file mode 100644 index f90d710a..00000000 Binary files a/docs/images/minio-ui.png and /dev/null differ diff --git a/docs/images/oscar-info.png b/docs/images/oscar-info.png deleted file mode 100644 index 1c9b969e..00000000 Binary files a/docs/images/oscar-info.png and /dev/null differ diff --git a/docs/integration-egi.md b/docs/integration-egi.md index 632897d0..18434035 100644 --- a/docs/integration-egi.md +++ b/docs/integration-egi.md @@ -87,3 +87,18 @@ create a new account configuration for the After that, clusters can be added with the command [`oscar-cli cluster add`](oscar-cli.md#add) specifying the oidc-agent account name with the `--oidc-account-name` flag. + +### Obtaining an Access Token + +Once logged in via EGI Check-In you can obtain an Access Token with one of this approaches: + +* From the command-line, using `oidc-agent` with the following command: + + ```sh + oidc-token + ``` + where `account-short-name` is the name of your account configuration. + +* From the EGI Check-In Token Portal: [https://aai.egi.eu/token](https://aai.egi.eu/token) + +![egi-checkin-token-portal.png](images/oidc/egi-checkin-token-portal.png) diff --git a/docs/minio_usage.md b/docs/minio_usage.md deleted file mode 100644 index a8d41b20..00000000 --- a/docs/minio_usage.md +++ /dev/null @@ -1,48 +0,0 @@ -# Accessing and managing MinIO storage provider - -Each OSCAR cluster includes a deployed MinIO instance, which is used to trigger service executions. When a service is configured to use MinIO as its storage provider, it monitors a specified input folder for new data. Whenever new data is added to this folder, it triggers the associated service to execute. - -## Using graphical interfaces - -These folders can be accessed via both the OSCAR UI and the MinIO console UI. - -- **Using OSCAR-UI**: The following image highlights the section where MinIO buckets are accessible. Users can view a list of available buckets and perform operations such as uploading and deleting files. - -![minio-buckets](images/bucket-list.png) - -- **Using the MinIO Console UI**: Access details for this interface are available in the "Info" tab within the OSCAR UI. This tab provides the MinIO console endpoint and the necessary credentials to log in, where the *Access Key* serves as the username, and the *Secret Key* functions as the password. - -![oscar-info](images/oscar-info.png) - -Finally, the following image provides an overview of the MinIO login panel and the "Object Browser" tab. Once logged in, the "Object Browser" tab allows users to navigate their available buckets, view stored objects, and perform various operations such as uploading, downloading, or deleting files. - -![oscar-info](images/minio-ui.png) - -Further information about the MinIO Console avaliable on [MinIO Console documentation](https://min.io/docs/minio/linux/administration/minio-console.html). - -## Using command-line interfaces - -MinIO buckets can also be managed through [oscar-cli command-line](https://github.com/grycap/oscar-cli) or the official [MinIO client](https://min.io/docs/minio/linux/reference/minio-mc.html). - -- **oscar-cli**: The OSCAR client provides a dedicated set of commands for accessing files within buckets. It is important to note that this interface does not support DELETE or UPDATE operations. Below is a brief overview of the available commands and their functionalities. - - [get-file](https://docs.oscar.grycap.net/oscar-cli/#get-file): Get file from a service's storage provider. - - [list-files](https://docs.oscar.grycap.net/oscar-cli/#list-files): List files from a service's storage provider path. - - [put-file](https://docs.oscar.grycap.net/oscar-cli/#put-file): Upload a file on a service storage provider. - - An example of a put-file operation: - ``` sh - oscar-cli service put-file fish-detector.yaml minio .path/to/your/images ./fish-detector/input/ - ``` - -- **mc**: If a user wants to use the MinIO client it needs to follow some previous steps. - - *Install the client*: Detailed instructions for installing the MinIO client (mc) are available in [the official documentation](https://min.io/docs/minio/linux/reference/minio-mc.html#install-mc). - - *Configure the MinIO instance*: The client requires credentials to connect and interact with the MinIO instance. This configuration can be set with the following command: - - ``` sh - mc alias set myminio https://minio.gracious-varahamihira6.im.grycap.net YOUR-ACCESS-KEY YOUR-SECRET-KEY - ``` - - Once the client is configured, users can perform various operations supported by the MinIO client. For a complete list of available commands and their usage, refer to the [MinIO client reference](https://min.io/docs/minio/linux/reference/minio-mc.html#command-quick-reference). The following example demonstrates a PUT operation, where a file is uploaded to a specific folder within a bucket. - ``` sh - mc cp /path/to/your/images/*.jpg myminio/fish-detector/input/ - ``` diff --git a/docs/oscar-service.md b/docs/oscar-service.md index 1dd0c584..b06e0ec1 100644 --- a/docs/oscar-service.md +++ b/docs/oscar-service.md @@ -37,10 +37,6 @@ The output of synchronous invocations will depend on the application itself: This way users can adapt OSCAR's services to their own needs. -### Container images - -Container images on asynchronous services use the tag `imagePullPolicy: Always`, which means that Kubernetes will check for the image digest on the image registry and download it if it is not present. -So, if you are using an image without a specific tag or with the latest tag, the service will automatically download and use the most recent version of the image on its executions, whenever the image is updated. You can follow one of the [examples](https://github.com/grycap/oscar/tree/master/examples) diff --git a/examples/stable-diffusion/Dockerfile b/examples/stable-diffusion/Dockerfile new file mode 100644 index 00000000..65bdbd95 --- /dev/null +++ b/examples/stable-diffusion/Dockerfile @@ -0,0 +1,21 @@ +FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04 + +RUN apt update && \ + apt install -y --no-install-recommends git wget python3-pip && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +RUN git clone https://github.com/srisco/stable-diffusion-tensorflow.git + +WORKDIR stable-diffusion-tensorflow + +RUN pip install -r requirements.txt && \ + rm -rf /root/.cache/pip/* && \ + rm -rf /tmp/* + +# DOWNLOAD WEIGHTS +RUN mkdir -p /root/.keras/datasets && \ + wget https://huggingface.co/fchollet/stable-diffusion/resolve/main/text_encoder.h5 -O /root/.keras/datasets/text_encoder.h5 && \ + wget https://huggingface.co/fchollet/stable-diffusion/resolve/main/diffusion_model.h5 -O /root/.keras/datasets/diffusion_model.h5 && \ + wget https://huggingface.co/fchollet/stable-diffusion/resolve/main/decoder.h5 -O /root/.keras/datasets/decoder.h5 && \ + wget https://huggingface.co/divamgupta/stable-diffusion-tensorflow/resolve/main/encoder_newW.h5 -O /root/.keras/datasets/encoder_newW.h5 diff --git a/examples/stable-diffusion/README.md b/examples/stable-diffusion/README.md new file mode 100644 index 00000000..3f8b7512 --- /dev/null +++ b/examples/stable-diffusion/README.md @@ -0,0 +1,31 @@ +# Stable Diffusion + +This example is based on a Keras / Tensorflow implementation of Stable Diffusion. The following repositories were used for the creation of the image: + +* [srisco/stable-diffusion-tensorflow](https://github.com/srisco/stable-diffusion-tensorflow) +* [huggingface.co/fchollet/stable-diffusion](https://huggingface.co/fchollet/stable-diffusion) +* [huggingface.co/divamgupta/stable-diffusion-tensorflow](https://huggingface.co/divamgupta/stable-diffusion-tensorflow/) + +The image if pushed to a public github registry [here](ghcr.io/grycap/stable-diffusion-tf:latest) but you can see the Dockerfile that generates it [here](Dockerfile). + +## Deploy an OSCAR cluster +Follow the instructions in the documentation for your desired IaaS cloud provider. +[See Deployment](https://docs.oscar.grycap.net/) + +## Create the OSCAR Service + +The Service can be created using the OSCAR GUI by providing the [FDL](stable-diff.yaml) and the [script.sh](script.sh) file. + +![OSCAR GUI Creation of a service](https://oscar.grycap.net/images/blog/post-20210803-1/create_service_gui.png) + +## Upload the input file to the MinIO bucket + +Once the service is created, you can upload the input file to the bucket. The input file should be a file containing the prompt that you want to process. + +For example, using the following prompt: + +`a chicken making a 360 with a skateboard with background flames` + +The following image is generated: + +![Cool chicken](prompt.txt.png) \ No newline at end of file diff --git a/examples/stable-diffusion/prompt.txt b/examples/stable-diffusion/prompt.txt new file mode 100644 index 00000000..b27d4aad --- /dev/null +++ b/examples/stable-diffusion/prompt.txt @@ -0,0 +1 @@ +a chicken making a 360 with a skateboard with background flames diff --git a/examples/stable-diffusion/prompt.txt.png b/examples/stable-diffusion/prompt.txt.png new file mode 100644 index 00000000..543cffb6 Binary files /dev/null and b/examples/stable-diffusion/prompt.txt.png differ diff --git a/examples/stable-diffusion/script.sh b/examples/stable-diffusion/script.sh new file mode 100644 index 00000000..8bb41054 --- /dev/null +++ b/examples/stable-diffusion/script.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +echo "SCRIPT: Invoked stable diffusion text to image." +FILE_NAME=`basename "$INPUT_FILE_PATH"` +OUTPUT_FILE="$TMP_OUTPUT_DIR/$FILE_NAME.png" + +prompt=`cat "$INPUT_FILE_PATH"` +echo "SCRIPT: Converting input prompt '$INPUT_FILE_PATH' to image :)" +python3 text2image.py --prompt="$prompt" --output=$OUTPUT_FILE \ No newline at end of file diff --git a/examples/stable-diffusion/stable-diff.yaml b/examples/stable-diffusion/stable-diff.yaml new file mode 100644 index 00000000..2ea74461 --- /dev/null +++ b/examples/stable-diffusion/stable-diff.yaml @@ -0,0 +1,17 @@ +functions: + oscar: + - oscar-intertwin: + name: stable-diffusion-tf + memory: 16Gi + cpu: '4' + image: ghcr.io/grycap/stable-diffusion-tf:latest + script: script.sh + log_level: DEBUG + vo: "vo.example.eu" + allowed_users: [] + input: + - storage_provider: minio.default + path: stablediff/input + output: + - storage_provider: minio.default + path: stablediff/output diff --git a/main.go b/main.go index a98e2a65..53bca6d4 100644 --- a/main.go +++ b/main.go @@ -105,12 +105,12 @@ func main() { system.GET("/status", handlers.MakeStatusHandler(kubeClientset, metricsClientset)) // Job path for async invocations - r.POST("/job/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeJobHandler(cfg, kubeClientset, back, resMan)) + r.POST("/job/:serviceName", handlers.MakeJobHandler(cfg, kubeClientset, back, resMan)) // Service path for sync invocations (only if ServerlessBackend is enabled) syncBack, ok := back.(types.SyncBackend) if cfg.ServerlessBackend != "" && ok { - r.POST("/run/:serviceName", auth.GetLoggerMiddleware(), handlers.MakeRunHandler(cfg, syncBack)) + r.POST("/run/:serviceName", handlers.MakeRunHandler(cfg, syncBack)) } // System info path diff --git a/mkdocs.yml b/mkdocs.yml index 6a6457cd..fa259848 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -21,7 +21,6 @@ nav: - oscar-cli.md - usage-ui.md - api.md - - minio-upload.md - Service Execution: - invoking.md diff --git a/pkg/backends/fake.go b/pkg/backends/fake.go index d39b6bba..c9ec6b26 100644 --- a/pkg/backends/fake.go +++ b/pkg/backends/fake.go @@ -81,7 +81,7 @@ func (f *FakeBackend) CreateService(service types.Service) error { // ReadService returns a Service (fake) func (f *FakeBackend) ReadService(name string) (*types.Service, error) { - return &types.Service{Token: "11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf"}, f.returnError(getCurrentFuncName()) + return &types.Service{Token: "AbCdEf123456"}, f.returnError(getCurrentFuncName()) } // UpdateService updates an existent service (fake) diff --git a/pkg/backends/k8s.go b/pkg/backends/k8s.go index 0879991d..c41a1f2a 100644 --- a/pkg/backends/k8s.go +++ b/pkg/backends/k8s.go @@ -57,21 +57,23 @@ func (k *KubeBackend) GetInfo() *types.ServerlessBackendInfo { // ListServices returns a slice with all services registered in the provided namespace func (k *KubeBackend) ListServices() ([]*types.Service, error) { - // Get the list with all Knative services - configmaps, err := getAllServicesConfigMaps(k.namespace, k.kubeClientset) + // Get the list with all podTemplates + podTemplates, err := k.kubeClientset.CoreV1().PodTemplates(k.namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { - log.Printf("WARNING: %v\n", err) return nil, err } - services := []*types.Service{} - for _, cm := range configmaps.Items { - service, err := getServiceFromConfigMap(&cm) + services := []*types.Service{} + for _, podTemplate := range podTemplates.Items { + // Get service from configMap's FDL + svc, err := getServiceFromFDL(podTemplate.Name, k.namespace, k.kubeClientset) if err != nil { - return nil, err + log.Printf("WARNING: %v\n", err) + } else { + services = append(services, svc) } - services = append(services, service) } + return services, nil } @@ -146,14 +148,8 @@ func (k *KubeBackend) ReadService(name string) (*types.Service, error) { return nil, err } - // Get the configMap of the Service - cm, err := k.kubeClientset.CoreV1().ConfigMaps(k.namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name) - } - // Get service from configMap's FDL - svc, err := getServiceFromConfigMap(cm) + svc, err := getServiceFromFDL(name, k.namespace, k.kubeClientset) if err != nil { return nil, err } @@ -216,14 +212,6 @@ func (k *KubeBackend) UpdateService(service types.Service) error { } } - //Create deaemonset to cache the service image on all the nodes - if service.ImagePrefetch { - err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset) - if err != nil { - return err - } - } - return nil } @@ -254,12 +242,17 @@ func (k *KubeBackend) DeleteService(service types.Service) error { return nil } -func getServiceFromConfigMap(cm *v1.ConfigMap) (*types.Service, error) { +func getServiceFromFDL(name string, namespace string, kubeClientset kubernetes.Interface) (*types.Service, error) { + // Get the configMap of the Service + cm, err := kubeClientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name) + } service := &types.Service{} // Unmarshal the FDL stored in the configMap - if err := yaml.Unmarshal([]byte(cm.Data[types.FDLFileName]), service); err != nil { - return nil, fmt.Errorf("the FDL of the service \"%s\" cannot be read", cm.Name) + if err = yaml.Unmarshal([]byte(cm.Data[types.FDLFileName]), service); err != nil { + return nil, fmt.Errorf("the FDL of the service \"%s\" cannot be read", name) } // Add the script to the service from configmap's script value @@ -372,17 +365,6 @@ func deleteServiceConfigMap(name string, namespace string, kubeClientset kuberne return nil } -func getAllServicesConfigMaps(namespace string, kubeClientset kubernetes.Interface) (*v1.ConfigMapList, error) { - listOpts := metav1.ListOptions{ - LabelSelector: "oscar_service", - } - configMapsList, err := kubeClientset.CoreV1().ConfigMaps(namespace).List(context.TODO(), listOpts) - if err != nil { - return nil, err - } - return configMapsList, nil -} - func deleteServiceJobs(name string, namespace string, kubeClientset kubernetes.Interface) error { // ListOptions to select all the associated jobs with the specified service listOpts := metav1.ListOptions{ diff --git a/pkg/backends/k8s_test.go b/pkg/backends/k8s_test.go index e9e4f30b..1a967bda 100644 --- a/pkg/backends/k8s_test.go +++ b/pkg/backends/k8s_test.go @@ -113,12 +113,29 @@ func TestKubeGetInfo(t *testing.T) { } func TestKubeListServices(t *testing.T) { + validPodTemplateListReactor := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + podTemplateList := &v1.PodTemplateList{ + Items: []v1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testnamespace", + }, + Template: v1.PodTemplateSpec{}, + }, + }, + } + return true, podTemplateList, nil + } t.Run("valid list", func(t *testing.T) { clientset := fake.NewSimpleClientset() back := MakeKubeBackend(clientset, testConfig) + // Return a valid PodTemplateList + back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor) + // Return a valid configMap back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", validConfigMapReaction) @@ -129,11 +146,29 @@ func TestKubeListServices(t *testing.T) { } }) + t.Run("listing podTemplates throws an error", func(t *testing.T) { + clientset := fake.NewSimpleClientset() + + back := MakeKubeBackend(clientset, testConfig) + + // Return an error listing PodTemplates + back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", errorReaction) + + // Call + _, err := back.ListServices() + if err == nil { + t.Error("expecting error, got: nil") + } + }) + t.Run("getServiceFromFDL throws error getting configMap", func(t *testing.T) { clientset := fake.NewSimpleClientset() back := MakeKubeBackend(clientset, testConfig) + // Return a valid PodTemplateList + back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor) + // Return an error getting the configMap back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", errorReaction) @@ -163,6 +198,9 @@ func TestKubeListServices(t *testing.T) { return true, validCM, nil } + // Return a valid PodTemplateList + back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("list", "podtemplates", validPodTemplateListReactor) + // Return a valid configMap with invalid FDL back.kubeClientset.(*fake.Clientset).Fake.PrependReactor("get", "configmaps", validConfigMapWithInvalidFDLReactor) diff --git a/pkg/backends/knative.go b/pkg/backends/knative.go index 566352c4..9e1391e9 100644 --- a/pkg/backends/knative.go +++ b/pkg/backends/knative.go @@ -76,20 +76,22 @@ func (kn *KnativeBackend) GetInfo() *types.ServerlessBackendInfo { // ListServices returns a slice with all services registered in the provided namespace func (kn *KnativeBackend) ListServices() ([]*types.Service, error) { // Get the list with all Knative services - configmaps, err := getAllServicesConfigMaps(kn.namespace, kn.kubeClientset) + knSvcs, err := kn.knClientset.ServingV1().Services(kn.namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { - log.Printf("WARNING: %v\n", err) return nil, err } - services := []*types.Service{} - for _, cm := range configmaps.Items { - service, err := getServiceFromConfigMap(&cm) + services := []*types.Service{} + for _, knSvc := range knSvcs.Items { + // Get service from configMap's FDL + svc, err := getServiceFromFDL(knSvc.Name, kn.namespace, kn.kubeClientset) if err != nil { - return nil, err + log.Printf("WARNING: %v\n", err) + } else { + services = append(services, svc) } - services = append(services, service) } + return services, nil } @@ -149,13 +151,8 @@ func (kn *KnativeBackend) ReadService(name string) (*types.Service, error) { return nil, err } - // Get the configMap of the Service - cm, err := kn.kubeClientset.CoreV1().ConfigMaps(kn.namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name) - } // Get service from configMap's FDL - svc, err := getServiceFromConfigMap(cm) + svc, err := getServiceFromFDL(name, kn.namespace, kn.kubeClientset) if err != nil { return nil, err } @@ -226,14 +223,6 @@ func (kn *KnativeBackend) UpdateService(service types.Service) error { } } - //Create deaemonset to cache the service image on all the nodes - if service.ImagePrefetch { - err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset) - if err != nil { - return err - } - } - return nil } diff --git a/pkg/backends/knative_test.go b/pkg/backends/knative_test.go index 064da7af..d21f16b2 100644 --- a/pkg/backends/knative_test.go +++ b/pkg/backends/knative_test.go @@ -144,6 +144,17 @@ func TestKnativeListServices(t *testing.T) { []k8stesting.SimpleReactor{knServiceListReactor}, false, }, + { + "Error listing knative services", + []k8stesting.SimpleReactor{}, + []k8stesting.SimpleReactor{ + { + Verb: "list", + Resource: "services", + Reaction: errorReaction, + }}, + true, + }, { "Error getting the configMap", []k8stesting.SimpleReactor{ diff --git a/pkg/backends/openfaas.go b/pkg/backends/openfaas.go index d03db44c..89ab0402 100644 --- a/pkg/backends/openfaas.go +++ b/pkg/backends/openfaas.go @@ -83,20 +83,21 @@ func (of *OpenfaasBackend) GetInfo() *types.ServerlessBackendInfo { // ListServices returns a slice with all services registered in the provided namespace func (of *OpenfaasBackend) ListServices() ([]*types.Service, error) { - // Get the list with all Knative services - configmaps, err := getAllServicesConfigMaps(of.namespace, of.kubeClientset) + // Get the list with all deployments + deployments, err := of.kubeClientset.AppsV1().Deployments(of.namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { - log.Printf("WARNING: %v\n", err) return nil, err } - services := []*types.Service{} - for _, cm := range configmaps.Items { - service, err := getServiceFromConfigMap(&cm) + services := []*types.Service{} + for _, deployment := range deployments.Items { + // Get service from configMap's FDL + svc, err := getServiceFromFDL(deployment.Name, of.namespace, of.kubeClientset) if err != nil { - return nil, err + log.Printf("WARNING: %v\n", err) + } else { + services = append(services, svc) } - services = append(services, service) } return services, nil @@ -229,13 +230,8 @@ func (of *OpenfaasBackend) ReadService(name string) (*types.Service, error) { return nil, err } - // Get the configMap of the Service - cm, err := of.kubeClientset.CoreV1().ConfigMaps(of.namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("the service \"%s\" does not have a registered ConfigMap", name) - } // Get service from configMap's FDL - svc, err := getServiceFromConfigMap(cm) + svc, err := getServiceFromFDL(name, of.namespace, of.kubeClientset) if err != nil { return nil, err } diff --git a/pkg/handlers/create.go b/pkg/handlers/create.go index ca0f0291..9d2bc194 100644 --- a/pkg/handlers/create.go +++ b/pkg/handlers/create.go @@ -40,7 +40,6 @@ const ( defaultMemory = "256Mi" defaultCPU = "0.2" defaultLogLevel = "INFO" - createPath = "/system/services" ) var errInput = errors.New("unrecognized input (valid inputs are MinIO and dCache)") @@ -57,7 +56,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand if len(strings.Split(authHeader, "Bearer")) == 1 { isAdminUser = true service.Owner = "cluster_admin" - createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner) + createLogger.Printf("Creating service for user: %s", service.Owner) } if err := c.ShouldBindJSON(&service); err != nil { @@ -70,22 +69,25 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand // Check if users in allowed_users have a MinIO associated user minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg) + // === DEBUG code === + loguid, _ := auth.GetUIDFromContext(c) + createLogger.Printf(">>> uid from context: %s", loguid) + // ============= + // Service is created by an EGI user if !isAdminUser { uid, err := auth.GetUIDFromContext(c) if err != nil { c.String(http.StatusInternalServerError, fmt.Sprintln(err)) - return } // Set UID from owner service.Owner = uid - createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner) + createLogger.Printf("Creating service for user: %s", service.Owner) mc, err := auth.GetMultitenancyConfigFromContext(c) if err != nil { c.String(http.StatusInternalServerError, fmt.Sprintln(err)) - return } full_uid := auth.FormatUID(uid) @@ -96,7 +98,6 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand err := checkIdentity(&service, cfg, authHeader) if err != nil { c.String(http.StatusBadRequest, fmt.Sprintln(err)) - return } break } @@ -170,11 +171,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand log.Println(err.Error()) } } - uid := service.Owner - if service.Owner == "" { - uid = "nil" - } - createLogger.Printf("%s | %v | %s | %s | %s", "POST", 200, createPath, service.Name, uid) + createLogger.Println("Service created with name: ", service.Name) c.Status(http.StatusCreated) } } diff --git a/pkg/handlers/job.go b/pkg/handlers/job.go index 73668e12..1fa48b5b 100644 --- a/pkg/handlers/job.go +++ b/pkg/handlers/job.go @@ -18,12 +18,11 @@ package handlers import ( "context" - "encoding/json" + "encoding/base64" "fmt" "io" "log" "net/http" - "os" "strconv" "strings" @@ -31,8 +30,6 @@ import ( "github.com/google/uuid" "github.com/grycap/oscar/v3/pkg/resourcemanager" "github.com/grycap/oscar/v3/pkg/types" - "github.com/grycap/oscar/v3/pkg/utils/auth" - genericErrors "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -47,8 +44,7 @@ var ( // Don't restart jobs in order to keep logs restartPolicy = v1.RestartPolicyNever // command used for passing the event to faas-supervisor - command = []string{"/bin/sh"} - jobLogger = log.New(os.Stdout, "[JOB-HANDLER] ", log.Flags()) + command = []string{"/bin/sh"} ) const ( @@ -90,45 +86,10 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back c.Status(http.StatusUnauthorized) return } - - // Check if reqToken is the service token - rawToken := strings.TrimSpace(splitToken[1]) - if len(rawToken) == tokenLength { - - if rawToken != service.Token { - c.Status(http.StatusUnauthorized) - return - } - } - - // If isn't service token check if it is an oidc token - var uidFromToken string - if len(rawToken) != tokenLength { - oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups) - - if !oidcManager.IsAuthorised(rawToken) { - c.Status(http.StatusUnauthorized) - return - } - - hasVO, err := oidcManager.UserHasVO(rawToken, service.VO) - - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - - if !hasVO { - c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO) - return - } - - // Get UID from token - var uidErr error - uidFromToken, uidErr = oidcManager.GetUID(rawToken) - if uidErr != nil { - jobLogger.Println("WARNING:", uidErr) - } + reqToken := strings.TrimSpace(splitToken[1]) + if reqToken != service.Token { + c.Status(http.StatusUnauthorized) + return } // Get the event from request body @@ -138,48 +99,37 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back return } - // Check if it has the MinIO event format - uid, sourceIPAddress, err := decodeEventBytes(eventBytes) - if err != nil { - // Check if the request was made with OIDC token to get user UID - if uidFromToken != "" { - c.Set("uidOrigin", uidFromToken) - } else { - // Set as nil string if unable to get an UID - jobLogger.Println("WARNING:", err) - c.Set("uidOrigin", "nil") - } - } else { - c.Set("IPAddress", sourceIPAddress) - c.Set("uidOrigin", uid) - } - - c.Next() - - // Initialize event envVar and args var + // Make event envVar event := v1.EnvVar{} - var args []string + var args string if cfg.InterLinkAvailable && service.InterLinkNodeName != "" { - command, event, args = types.SetInterlinkJob(podSpec, service, cfg, eventBytes) - } else { - - if service.Mount.Provider != "" { - args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath()) + ";echo \"I finish\" > /tmpfolder/finish-file;"} - types.SetMount(podSpec, *service, cfg) - } else { - args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())} + event = v1.EnvVar{ + Name: types.EventVariable, + Value: base64.StdEncoding.EncodeToString([]byte(eventBytes)), } - + args = fmt.Sprintf("\" wget %s -O %s && chmod 0755 %s && echo \\$%s | base64 -d | %s \"", cfg.SupervisorURL, SupervisorPath, SupervisorPath, types.EventVariable, SupervisorPath) + podSpec.NodeSelector = map[string]string{ + NodeSelectorKey: service.InterLinkNodeName, + } + podSpec.DNSPolicy = InterLinkDNSPolicy + podSpec.RestartPolicy = InterLinkRestartPolicy + podSpec.Tolerations = []v1.Toleration{ + { + Key: InterLinkTolerationKey, + Operator: InterLinkTolerationOperator, + }, + } + } else { event = v1.EnvVar{ Name: types.EventVariable, Value: string(eventBytes), } + args = fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath()) } // Make JOB_UUID envVar jobUUID := uuid.New().String() - jobUUID = service.Name + "-" + jobUUID jobUUIDVar := v1.EnvVar{ Name: types.JobUUIDVariable, Value: jobUUID, @@ -200,12 +150,16 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back for i, c := range podSpec.Containers { if c.Name == types.ContainerName { podSpec.Containers[i].Command = command - podSpec.Containers[i].Args = args + podSpec.Containers[i].Args = []string{"-c", args} podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, event) podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, jobUUIDVar) podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, resourceIDVar) } } + if service.Mount.Provider != "" { + types.SetMount(podSpec, *service, cfg) + podSpec.Containers[0].Args = []string{"-c", args + ";echo \"I finish\" > /tmpfolder/finish-file;"} + } // Delegate job if can't be scheduled and has defined replicas if rm != nil && service.HasReplicas() { @@ -216,7 +170,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back c.Status(http.StatusCreated) return } - jobLogger.Printf("unable to delegate job. Error: %v\n", err) + log.Printf("unable to delegate job. Error: %v\n", err) } } @@ -251,6 +205,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back } } + // Create job _, err = kubeClientset.BatchV1().Jobs(cfg.ServicesNamespace).Create(context.TODO(), job, metav1.CreateOptions{}) if err != nil { c.String(http.StatusInternalServerError, err.Error()) @@ -259,31 +214,3 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back c.Status(http.StatusCreated) } } - -func decodeEventBytes(eventBytes []byte) (string, string, error) { - - defer func() { - // recover from panic, if one occurs - if r := recover(); r != nil { - jobLogger.Println("Recovered from panic:", r) - } - }() - // Extract user UID from MinIO event - var decoded map[string]interface{} - if err := json.Unmarshal(eventBytes, &decoded); err != nil { - return "", "", err - } - - if records, panicErr := decoded["Records"].([]interface{}); panicErr { - r := records[0].(map[string]interface{}) - - eventInfo := r["requestParameters"].(map[string]interface{}) - uid := eventInfo["principalId"] - sourceIPAddress := eventInfo["sourceIPAddress"] - - return uid.(string), sourceIPAddress.(string), nil - } else { - return "", "", genericErrors.New("Failed to decode records") - } - -} diff --git a/pkg/handlers/list.go b/pkg/handlers/list.go index ba8ae64e..2a5cb130 100644 --- a/pkg/handlers/list.go +++ b/pkg/handlers/list.go @@ -19,7 +19,6 @@ package handlers import ( "fmt" "net/http" - "slices" "strings" "github.com/gin-gonic/gin" @@ -43,13 +42,19 @@ func MakeListHandler(back types.ServerlessBackend) gin.HandlerFunc { uid, err := auth.GetUIDFromContext(c) if err != nil { c.String(http.StatusInternalServerError, fmt.Sprintln(err)) - return } var allowedServicesForUser []*types.Service for _, service := range services { - if len(service.AllowedUsers) == 0 || slices.Contains(service.AllowedUsers, uid) { + if len(service.AllowedUsers) == 0 { allowedServicesForUser = append(allowedServicesForUser, service) + continue + } + for _, id := range service.AllowedUsers { + if uid == id { + allowedServicesForUser = append(allowedServicesForUser, service) + break + } } } diff --git a/pkg/handlers/run.go b/pkg/handlers/run.go index ee7dec56..f289713a 100644 --- a/pkg/handlers/run.go +++ b/pkg/handlers/run.go @@ -23,14 +23,9 @@ import ( "github.com/gin-gonic/gin" "github.com/grycap/oscar/v3/pkg/types" - "github.com/grycap/oscar/v3/pkg/utils/auth" "k8s.io/apimachinery/pkg/api/errors" ) -const ( - tokenLength = 64 -) - // MakeRunHandler makes a handler to manage sync invocations sending them to the gateway of the ServerlessBackend func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc { return func(c *gin.Context) { @@ -52,44 +47,10 @@ func MakeRunHandler(cfg *types.Config, back types.SyncBackend) gin.HandlerFunc { c.Status(http.StatusUnauthorized) return } - - // Check if reqToken is the service token - rawToken := strings.TrimSpace(splitToken[1]) - if len(rawToken) == tokenLength { - - if rawToken != service.Token { - c.Status(http.StatusUnauthorized) - return - } - } else { - oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups) - - if !oidcManager.IsAuthorised(rawToken) { - c.Status(http.StatusUnauthorized) - return - } - - hasVO, err := oidcManager.UserHasVO(rawToken, service.VO) - - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - - if !hasVO { - c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO) - return - } - - ui, err := oidcManager.GetUserInfo(rawToken) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - uid := ui.Subject - c.Set("uidOrigin", uid) - c.Next() - + reqToken := strings.TrimSpace(splitToken[1]) + if reqToken != service.Token { + c.Status(http.StatusUnauthorized) + return } proxy := &httputil.ReverseProxy{ diff --git a/pkg/handlers/run_test.go b/pkg/handlers/run_test.go index e7c9fe3b..e1b9e626 100644 --- a/pkg/handlers/run_test.go +++ b/pkg/handlers/run_test.go @@ -63,10 +63,10 @@ func TestMakeRunHandler(t *testing.T) { for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { w := httptest.NewRecorder() - serviceName := "testName" + serviceName := "test" req, _ := http.NewRequest("POST", "/run/"+serviceName, nil) - req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513acf") + req.Header.Set("Authorization", "Bearer AbCdEf123456") if s.returnError { switch s.errType { @@ -78,7 +78,7 @@ func TestMakeRunHandler(t *testing.T) { case "splitErr": req.Header.Set("Authorization", "AbCdEf123456") case "diffErr": - req.Header.Set("Authorization", "Bearer 11e387cf727630d899925d57fceb4578f478c44be6cde0ae3fe886d8be513dfg") + req.Header.Set("Authorization", "Bearer AbC123456") } } diff --git a/pkg/handlers/update.go b/pkg/handlers/update.go index cd97f826..6a5a96f0 100644 --- a/pkg/handlers/update.go +++ b/pkg/handlers/update.go @@ -45,11 +45,7 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand // Check service values and set defaults checkValues(&newService, cfg) - authHeader := c.GetHeader("Authorization") - if len(strings.Split(authHeader, "Bearer")) == 1 { - isAdminUser = true - createLogger.Printf("[*] Updating service as admin user") - } + // Read the current service oldService, err := back.ReadService(newService.Name) @@ -62,34 +58,34 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand } return } - if !isAdminUser { - uid, err := auth.GetUIDFromContext(c) - if err != nil { - c.String(http.StatusInternalServerError, fmt.Sprintln("Couldn't get UID from context")) - } - if oldService.Owner != uid { - c.String(http.StatusForbidden, "User %s doesn't have permision to modify this service", uid) - return - } + uid, err := auth.GetUIDFromContext(c) + if err != nil { + c.String(http.StatusInternalServerError, fmt.Sprintln("Couldn't get UID from context")) + } - // Set the owner on the new service definition - newService.Owner = oldService.Owner + if oldService.Owner != uid { + c.String(http.StatusForbidden, "User %s doesn't have permision to modify this service", uid) + return + } - // If the service has changed VO check permisions again - if newService.VO != "" && newService.VO != oldService.VO { - for _, vo := range cfg.OIDCGroups { - if vo == newService.VO { - authHeader := c.GetHeader("Authorization") - err := checkIdentity(&newService, cfg, authHeader) - if err != nil { - c.String(http.StatusBadRequest, fmt.Sprintln(err)) - } - break + // Set the owner on the new service definition + newService.Owner = oldService.Owner + + // If the service has changed VO check permisions again + if newService.VO != "" && newService.VO != oldService.VO { + for _, vo := range cfg.OIDCGroups { + if vo == newService.VO { + authHeader := c.GetHeader("Authorization") + err := checkIdentity(&newService, cfg, authHeader) + if err != nil { + c.String(http.StatusBadRequest, fmt.Sprintln(err)) } + break } } } + minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg) // Update the service if err := back.UpdateService(newService); err != nil { diff --git a/pkg/imagepuller/daemonset.go b/pkg/imagepuller/daemonset.go index 7d5a6f9f..037685ca 100644 --- a/pkg/imagepuller/daemonset.go +++ b/pkg/imagepuller/daemonset.go @@ -59,7 +59,7 @@ var stopper chan struct{} // Create daemonset func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error { - DaemonSetLoggerInfo.Println("Creating daemonset for service:", service.Name) + //Set needed variables setWorkingNodes(kubeClientset) podGroup = generatePodGroupName() @@ -73,6 +73,8 @@ func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kub if err != nil { DaemonSetLoggerInfo.Println(err) return fmt.Errorf("failed to create daemonset: %s", err.Error()) + } else { + DaemonSetLoggerInfo.Println("Created daemonset for service:", service.Name) } //Set watcher informer diff --git a/pkg/resourcemanager/.ipynb_checkpoints/delegate-checkpoint.go b/pkg/resourcemanager/.ipynb_checkpoints/delegate-checkpoint.go new file mode 100644 index 00000000..1e59702d --- /dev/null +++ b/pkg/resourcemanager/.ipynb_checkpoints/delegate-checkpoint.go @@ -0,0 +1,997 @@ +/* +Copyright (C) GRyCAP - I3M - UPV + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcemanager + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "log" + "math" + "math/rand" + "net/http" + "net/url" + "path" + "sort" + "strconv" + "strings" + + //"sync" + "time" + + "github.com/grycap/oscar/v3/pkg/types" +) + +const ( + oscarReplicaType = "oscar" + endpointReplicaType = "endpoint" + noDelegateCode = 101 +) + +// tokenCache map to store tokens from services and endpoints -> [CLUSTER_ENDPOINT][SERVICE_NAME] +var tokenCache = map[string]map[string]string{} + +//var mutex sync.Mutex + +// DelegatedEvent wraps the original input event by adding the storage provider ID +type DelegatedEvent struct { + StorageProviderID string `json:"storage_provider"` + Event string `json:"event"` +} + +type GeneralInfo struct { + NumberNodes int64 `json:"numberNodes"` + CPUFreeTotal int64 `json:"cpuFreeTotal"` + CPUMaxFree int64 `json:"cpuMaxFree"` + MemoryFreeTotal int64 `json:"memoryFreeTotal"` + MemoryMaxFree int64 `json:"memoryMaxFree"` + DetailsNodes []NodeInfo `json:"detail"` +} + +type NodeInfo struct { + NodeName string `json:"nodeName"` + CPUCapacity string `json:"cpuCapacity"` + CPUUsage string `json:"cpuUsage"` + CPUPercentage string `json:"cpuPercentage"` + MemoryCapacity string `json:"memoryCapacity"` + MemoryUsage string `json:"memoryUsage"` + MemoryPercentage string `json:"memoryPercentage"` +} + +type Alternative struct { + Index int // Número de la alternativa + Preference float64 // Valor de la preferencia +} + +type JobStatus struct { + Status string `json:"status"` + CreationTime string `json:"creation_time"` + StartTime string `json:"start_time"` + FinishTime string `json:"finish_time"` +} +type JobStatuses map[string]JobStatus + +// Function to execute TOPSIS method +// Normalizes a column by dividing each value by the square root of the sum of squares. +func normalizeMatrix(matrix [][]float64) [][]float64 { + rows := len(matrix) + cols := len(matrix[0]) + normalized := make([][]float64, rows) + for i := range normalized { + normalized[i] = make([]float64, cols) + } + + for j := 0; j < cols; j++ { + // Calculate the norm (square root of the sum of squares of the column) + add := 0.0 + for i := 0; i < rows; i++ { + add += matrix[i][j] * matrix[i][j] + } + norm := math.Sqrt(add) + // Normalize the values ​​of the column + for i := 0; i < rows; i++ { + normalized[i][j] = matrix[i][j] / norm + } + } + return normalized +} + +// Multiply the normalized matrix by the weights. +func weightMatrix(matrix [][]float64, weight []float64) [][]float64 { + rows := len(matrix) + cols := len(matrix[0]) + weighted := make([][]float64, rows) + for i := range weighted { + weighted[i] = make([]float64, cols) + for j := 0; j < cols; j++ { + weighted[i][j] = matrix[i][j] * weight[j] + } + } + return weighted +} + +// Calculate the ideal and anti-ideal solutions. +func calculateSolutions(matrix [][]float64) (ideal []float64, antiIdeal []float64) { + rows := len(matrix) + cols := len(matrix[0]) + + ideal = make([]float64, cols) + antiIdeal = make([]float64, cols) + + for j := 0; j < cols; j++ { + // If the criterion is minimization (let's assume that the first criterion is the one we want to minimize) + + if j == 0 || j == 4 || j == 5 { + // For the ideal solution, we select the minimum value (instead of the maximum) + ideal[j] = matrix[0][j] + antiIdeal[j] = matrix[0][j] + for i := 0; i < rows; i++ { + if matrix[i][j] < ideal[j] { + ideal[j] = matrix[i][j] + } + if matrix[i][j] > antiIdeal[j] { + antiIdeal[j] = matrix[i][j] + } + } + } else { + // For maximization criteria, we normally use the maximum and minimum values + ideal[j] = matrix[0][j] + antiIdeal[j] = matrix[0][j] + for i := 0; i < rows; i++ { + if matrix[i][j] > ideal[j] { + ideal[j] = matrix[i][j] + } + if matrix[i][j] < antiIdeal[j] { + antiIdeal[j] = matrix[i][j] + } + } + } + } + return ideal, antiIdeal +} + +// Calculate the Euclidean distance between an alternative and the ideal or anti-ideal solution +func calculateDistance(alternative []float64, solution []float64) float64 { + add := 0.0 + for i := 0; i < len(alternative); i++ { + add += (alternative[i] - solution[i]) * (alternative[i] - solution[i]) + } + return math.Sqrt(add) +} + +// Calculate the preference index for each alternative +func calculatePreferences(matrix [][]float64, ideal []float64, antiIdeal []float64) []float64 { + rows := len(matrix) + preferences := make([]float64, rows) + + for i := 0; i < rows; i++ { + distanceIdeal := calculateDistance(matrix[i], ideal) + distanceAntiIdeal := calculateDistance(matrix[i], antiIdeal) + preferences[i] = distanceAntiIdeal / (distanceIdeal + distanceAntiIdeal) + } + return preferences +} + +// Order the alternatives from best to worst according to the preference index +func sortAlternatives(preferences []float64) []Alternative { + alternatives := make([]Alternative, len(preferences)) + + // Create a list of alternatives with their preference indices + for i := 0; i < len(preferences); i++ { + alternatives[i] = Alternative{ + Index: i + 1, // Alternativa 1, 2, etc. + Preference: preferences[i], + } + } + + // Sort the alternatives in descending order of preference + sort.Slice(alternatives, func(i, j int) bool { + return alternatives[i].Preference > alternatives[j].Preference + }) + + return alternatives +} + +func distancesFromBetter(alternatives []Alternative) []float64 { + distances := make([]float64, len(alternatives)-1) + + // Calculate distances with the first element + for i := 1; i < len(alternatives); i++ { + distances[i-1] = math.Abs(alternatives[0].Preference - alternatives[i].Preference) + } + + return distances +} + +// Function to randomly rearrange elements whose distance from the first is less than a threshold, including the first +func reorganizeIfNearby(alternatives []Alternative, distances []float64, threshold float64) []Alternative { + + // List of nearby elements (with distance less than the threshold, including the first element) + nearby := []Alternative{alternatives[0]} + + // Identify the other nearby elements + for i := 0; i < len(distances); i++ { + if distances[i] < threshold { + nearby = append(nearby, alternatives[i+1]) + } + } + + // Randomly shuffle nearby items + //rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(nearby), func(i, j int) { + nearby[i], nearby[j] = nearby[j], nearby[i] + }) + + // Create a new reorganized alternative list + newAlternatives := []Alternative{} + j := 0 + + // Insert the rearranged or unarranged elements + for i := 0; i < len(alternatives); i++ { + if i == 0 || distances[i-1] < threshold { + newAlternatives = append(newAlternatives, nearby[j]) // Add the rearranged items + j++ + } else { + newAlternatives = append(newAlternatives, alternatives[i]) // Keep non-close elements + } + } + + return newAlternatives +} + +// DelegateJob sends the event to a service's replica +func DelegateJob(service *types.Service, event string, logger *log.Logger) error { + + //Block access before executing the function + //mutex.Lock() + //defer mutex.Unlock() + + //Determine priority level of each replica to delegate + if service.Delegation == "topsis" { + results := [][]float64{} + //ServiceCPU, err := strconv.ParseFloat(service.CPU, 64) + //if err != nil { + // fmt.Println("Error to converter CPU of service to int: ", err) + // continue + //} + for _, cred := range service.Replicas { + + cluster, ok := service.Clusters[cred.ClusterID] + if !ok { + //logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) + continue + } + + // Get token + token, err := getServiceToken(cred, cluster) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } + + // Parse the cluster's endpoint URL and add the service's path + JobURL, err := url.Parse(cluster.Endpoint) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + continue + } + JobURL.Path = path.Join(JobURL.Path, "/system/logs/", cred.ServiceName) + + // Make request to get service's definition (including token) from cluster + req, err := http.NewRequest("GET", JobURL.String(), nil) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) + continue + } + + // Add Headers + for k, v := range cred.Headers { + req.Header.Add(k, v) + } + + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Make HTTP client + + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + resp, err := client.Do(req) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) // Utilizar io.ReadAll para leer el cuerpo + if err != nil { + fmt.Printf("Error al leer el cuerpo de la respuesta para %s: %v\n", cred.URL, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + var jobStatuses JobStatuses + err = json.Unmarshal(body, &jobStatuses) + if err != nil { + fmt.Println("Error decoding the JSON of the response:", err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Show job statuses + + // Count job statuses + averageExecutionTime, pendingCount := countJobs(jobStatuses) + + JobURL, err = url.Parse(cluster.Endpoint) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + continue + } + JobURL.Path = path.Join(JobURL.Path, "/system/status/") + req1, err := http.NewRequest("GET", JobURL.String(), nil) + + if err != nil { + fmt.Printf("Error creating request for %s: %v\n", cred.URL, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Add Headers + for k, v := range cred.Headers { + req.Header.Add(k, v) + } + + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Make the HTTP request + start := time.Now() + resp1, err := client.Do(req1) + duration := time.Since(start) + if err != nil { + //fmt.Printf("Error making request for %s: %v\n", cred.URL, err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + + defer resp1.Body.Close() + var clusterStatus GeneralInfo + err = json.NewDecoder(resp1.Body).Decode(&clusterStatus) + if err != nil { + fmt.Println("Error decoding the JSON of the response:", err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + + serviceCPU, err := strconv.ParseFloat(service.CPU, 64) + + if err != nil { + fmt.Println("Error converting service CPU to float: ", err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + + results = createParameters(results, duration, clusterStatus, serviceCPU, averageExecutionTime, float64(pendingCount)) + + } + // Print results as a matrix + fmt.Println("Results matrix:") + for _, row := range results { + fmt.Println(row) + } + + // Criteria weights () + weight := []float64{1, 8, 18, 65, 2, 6} + preferences := topsisMethod(results, weight) + newAlternatives := sortbyThreshold(preferences, 20) + + // Print reordered alternatives + fmt.Println("\nAlternatives reordered by threshold:") + for _, alt := range newAlternatives { + fmt.Printf("Alternative %d: %f\n", alt.Index, alt.Preference) + service.Replicas[alt.Index].Priority = uint(alt.Preference) + } + fmt.Println("Replicas stable to topsis method: ", service.Replicas) + + //fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") + } else { + getClusterStatus(service) + fmt.Println("Replicas: ", service.Replicas) + + // Check if replicas are sorted by priority and sort it if needed + if !sort.IsSorted(service.Replicas) { + sort.Stable(service.Replicas) + fmt.Println("Replicas Stable: ", service.Replicas) + } + } + + fmt.Println("Event : ", event) + + /*delegatedEvent := WrapEvent(service.ClusterID, event) + + eventJSON, err := json.Marshal(delegatedEvent) + if err != nil { + return fmt.Errorf("error marshalling delegated event: %v", err) + }*/ + + storage_provider := service.ClusterID + //Create event depending on delegation level + + eventJSON, provider := eventBuild(event, storage_provider) + fmt.Println("Storage_provider : ", provider) + fmt.Println(string(eventJSON)) + + //if provider != "" { //storatage _provider not empty in the delegation proccess. + + for _, replica := range service.Replicas { + // Manage if replica.Type is "oscar" and have the capacity to receive a service + fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) + if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { + // Check ClusterID is defined in 'Clusters' + fmt.Println("Delegating ...") + cluster, ok := service.Clusters[replica.ClusterID] + if !ok { + logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) + continue + } + + // Get token + token, err := getServiceToken(replica, cluster) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } + + // Parse the cluster's endpoint URL and add the service's path + postJobURL, err := url.Parse(cluster.Endpoint) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + continue + } + postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) + + // Make request to get service's definition (including token) from cluster + //fmt.Println(string(eventJSON)) + req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) + + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) + continue + } + + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } + + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Make HTTP client + + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue + } + + // Check status code + if res.StatusCode == http.StatusCreated { + logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID) + return nil + } else if res.StatusCode == http.StatusUnauthorized { + // Retry updating the token + token, err := updateServiceToken(replica, cluster) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Send the request + res, err = client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue + } + } + log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode) + } + + // Manage if replica.Type is "endpoint" + if strings.ToLower(replica.Type) == endpointReplicaType { + // Parse the replica URL to check if it's valid + replicaURL, err := url.Parse(replica.URL) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err) + continue + } + + // Make request to get service's definition (including token) from cluster + req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON)) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err) + continue + } + + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } + + // Make HTTP client + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify}, + } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err) + continue + } + + // Check status code + if res.StatusCode == http.StatusOK { + logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL) + return nil + } + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) + } + } + //} else { + // fmt.Println("Error by Storage_Provider empty.") + //} + + return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name) +} + +// WrapEvent wraps an event adding the storage_provider field (from the service's cluster_id) +func WrapEvent(providerID string, event string) DelegatedEvent { + return DelegatedEvent{ + StorageProviderID: providerID, + Event: event, + } +} + +func getServiceToken(replica types.Replica, cluster types.Cluster) (string, error) { + endpoint := strings.Trim(cluster.Endpoint, " /") + _, ok := tokenCache[endpoint] + if ok { + token, ok := tokenCache[endpoint][replica.ServiceName] + if ok && token != "" { + return token, nil + } + } + + return updateServiceToken(replica, cluster) +} + +func updateServiceToken(replica types.Replica, cluster types.Cluster) (string, error) { + // Clear tokenCache if there are more than 500 tokens stored + length := 0 + for _, subMap := range tokenCache { + length += len(subMap) + } + if length > 500 { + tokenCache = map[string]map[string]string{} + } + + // Parse the cluster's endpoint URL and add the service's path + getServiceURL, err := url.Parse(cluster.Endpoint) + if err != nil { + return "", fmt.Errorf("unable to parse cluster endpoint \"%s\": %v", cluster.Endpoint, err) + } + getServiceURL.Path = path.Join(getServiceURL.Path, "system", "services", replica.ServiceName) + + // Make request to get service's definition (including token) from cluster + req, err := http.NewRequest(http.MethodGet, getServiceURL.String(), nil) + if err != nil { + return "", fmt.Errorf("unable to make request to cluster endpoint \"%s\": %v", cluster.Endpoint, err) + } + + // Add cluster's basic auth credentials + req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword) + + // Make HTTP client + + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("unable to send request to cluster endpoint \"%s\": %v", cluster.Endpoint, err) + } + + // Check status code (OSCAR always returns 200 if it's OK) + if res.StatusCode != 200 { + return "", fmt.Errorf("error in response from cluster endpoint \"%s\": Status code %d", cluster.Endpoint, res.StatusCode) + } + + // Decode the response body into the info struct + var svc *types.Service + err = json.NewDecoder(res.Body).Decode(&svc) + if err != nil { + return "", fmt.Errorf("error decoding service info from cluster endpoint \"%s\":%v", cluster.Endpoint, err) + } + + // Update (or create) the service's token entry in tokenCache + endpoint := strings.Trim(cluster.Endpoint, " /") + _, ok := tokenCache[endpoint] + if !ok { + // Create empty map if nil + tokenCache[endpoint] = map[string]string{} + } + tokenCache[endpoint][replica.ServiceName] = svc.Token + + return svc.Token, nil +} + +func getClusterStatus(service *types.Service) { + fmt.Println("Process to getClusterStatus function.") + for id, replica := range service.Replicas { + // Manage if replica.Type is "oscar" + if strings.ToLower(replica.Type) == oscarReplicaType { + // Check ClusterID is defined in 'Clusters' + cluster, ok := service.Clusters[replica.ClusterID] + if !ok { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Printf("Error checking to ClusterID \"%s\": Cluster not defined\n", replica.ClusterID) + continue + } + // Parse the cluster's endpoint URL and add the service's path + getStatusURL, err := url.Parse(cluster.Endpoint) + if err != nil { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Printf("Error parsing the cluster's endpoint URL to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", replica.ClusterID, cluster.Endpoint, err) + continue + } + getStatusURL.Path = path.Join(getStatusURL.Path, "system", "status") + + // Make request to get status from cluster + req, err := http.NewRequest(http.MethodGet, getStatusURL.String(), nil) + if err != nil { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Printf("Error making request to ClusterID \"%s\": unable to make request: %v\n", replica.ClusterID, err) + continue + } + // Add cluster's basic auth credentials + req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword) + + // Make HTTP client + fmt.Println("SSLVerify :", cluster.SSLVerify) + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + fmt.Println("Transport : ", transport) + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + res, err := client.Do(req) + fmt.Println("StatusCode : ", res.StatusCode) + if err != nil { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Printf("Error getting cluster status to ClusterID \"%s\": unable to send request: %v\n", replica.ClusterID, err) + continue + } + + // Check status code + if res.StatusCode == http.StatusOK { + fmt.Printf("Successful get of cluster status to ClusterID\"%s\"\n", replica.ClusterID) + + //Convert cluster status response to JSON + var clusterStatus *GeneralInfo + err = json.NewDecoder(res.Body).Decode(&clusterStatus) + if err != nil { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Println("Error decoding the JSON of the response:", err) + continue + } + + // CPU is in miliCPU + // CPU required to deploy the service + serviceCPU, err := strconv.ParseFloat(service.CPU, 64) + if err != nil { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Println("Error to converter CPU of service to int: ", err) + continue + } + fmt.Println("serviceCPU: ", serviceCPU) + maxNodeCPU := float64(clusterStatus.CPUMaxFree) + fmt.Println("maxNodeCPU: ", maxNodeCPU) + //Calculate CPU difference to determine whether to delegate a replica to the cluster + dist := maxNodeCPU - (1000 * serviceCPU) + fmt.Println("CPU difference: ", dist) + + //The priority of delegating the service is set based on the free CPU of the cluster as long as it has free CPU on a node to delegate the service. + if dist >= 0 { + fmt.Println("Resources available in ClusterID", replica.ClusterID) + if service.Delegation == "random" { + randPriority := rand.Intn(noDelegateCode) + service.Replicas[id].Priority = uint(randPriority) + //replica.Priority = uint(randPriority) + fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") + } else if service.Delegation == "load-based" { + //Map the totalClusterCPU range to a smaller range (input range 0 to 32 cpu to output range 100 to 0 priority) + totalClusterCPU := clusterStatus.CPUFreeTotal + mappedCPUPriority := mapToRange(totalClusterCPU, 0, 32000, 100, 0) + service.Replicas[id].Priority = uint(mappedCPUPriority) + //replica.Priority = uint(mappedCPUPriority) + fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") + + } else if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + fmt.Println("Error when declaring the type of delegation in ClusterID ", replica.ClusterID) + continue + } + + } else { + fmt.Println("No CPU capacity to delegate job in ClusterID ", replica.ClusterID) + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + continue + } + fmt.Println("Status Cluster ", clusterStatus) + fmt.Println("Priority: ", service.Replicas[id].Priority) + + } else { + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } + fmt.Printf("Error to get of cluster status to ClusterID\"%s\"\n", replica.ClusterID) + } + + } + } + +} + +func mapToRange(value, minInput, maxInput, maxOutput, minOutput int64) int { + mappedValue := maxOutput - (maxOutput-minOutput)*(value-minInput)/(maxInput-minInput) + mappedInt := int(mappedValue) + if mappedInt > int(maxOutput) { + mappedInt = int(maxOutput) + } + if mappedInt < int(minOutput) { + mappedInt = int(minOutput) + } + + return mappedInt +} + +func topsisMethod(results [][]float64, weight []float64) []float64 { + + // Step 1: Normalize the matrix + matrixNormalized := normalizeMatrix(results) + //fmt.Println("//Normalized matrix: ") + //for _, row := range matrizNormalizada { + // fmt.Println(row) + //} + + // Step 2: Weight the matrix + matrixWeighted := weightMatrix(matrixNormalized, weight) + //fmt.Println("\n//Weighted Matrix:") + //for _, row := range matrizPonderada { + // fmt.Println(row) + //} + + // Step 3: Compute the ideal and anti-ideal solution + ideal, antiIdeal := calculateSolutions(matrixWeighted) + //fmt.Println("\nIdeal Solution:", ideal) + //fmt.Println("Anti-Ideal Solution:", antiIdeal) + + // Step 4: Compute the distances and preference index + preferences := calculatePreferences(matrixWeighted, ideal, antiIdeal) + fmt.Println("\nPreference index:", preferences) + + return preferences + +} + +func sortbyThreshold(preferences []float64, umbral int) []Alternative { + // Step 5: Order alternatives from best to worst + alternativesSort := sortAlternatives(preferences) + + fmt.Println("\nAlternatives ordered from best to worst:") + for _, alt := range alternativesSort { + fmt.Printf("Alternative %d: %f\n", alt.Index, alt.Preference) + + //mapped := mapToRange(int64(alt*100.0), 0, 100, 100, 0) + //fmt.Printf("Original Preference: %.4f -> Mapped %d\n", alt, mapped) + } + + distancesFromBetter := distancesFromBetter(alternativesSort) + + // Threshold to reorder nearby elements + f := float64(umbral) / 100.0 + threshold := alternativesSort[0].Preference * f + fmt.Printf("The threshold is the %d %% of the best value: %f ", umbral, threshold) + + // Randomly reorder elements whose distance is less than the threshold, including the first one + newAlternatives := reorganizeIfNearby(alternativesSort, distancesFromBetter, threshold) + + var priority_map int + for id, alt := range newAlternatives { + //fmt.Printf("Alternative %d: %d\n", alt.Index, int(alt.Preference*100)) + priority_map = mapToRange(int64(alt.Preference*100.0), 0, 100, 100, 0) + newAlternatives[id].Preference = float64(priority_map) + //fmt.Printf("Alternative %d: %f\n", alt.Index, newAlternatives[id].Preference) + } + + return newAlternatives + +} + +func countJobs(jobStatuses map[string]JobStatus) (float64, int) { + totalJobs := 0 + succeededCount := 0 + failedCount := 0 + pendingCount := 0 + totalExecutionTime := 0.0 + for _, status := range jobStatuses { + totalJobs++ + switch status.Status { + case "Succeeded": + succeededCount++ + creationTime, _ := time.Parse(time.RFC3339, status.CreationTime) + finishTime, _ := time.Parse(time.RFC3339, status.FinishTime) + duration := finishTime.Sub(creationTime).Seconds() // Duration in seconds + totalExecutionTime += duration + case "Failed": + failedCount++ + case "Pending": // Pending jobs + pendingCount++ + } + } + + var averageExecutionTime float64 = 1e6 + if succeededCount > 0 { + averageExecutionTime = totalExecutionTime / float64(succeededCount) + } + return averageExecutionTime, pendingCount + +} + +func createParameters(results [][]float64, duration time.Duration, clusterStatus GeneralInfo, serviceCPU float64, averageExecutionTime float64, pendingCount float64) [][]float64 { + maxNodeCPU := float64(clusterStatus.CPUMaxFree) + dist := maxNodeCPU - (1000 * serviceCPU) + + if dist >= 0 { + results = append(results, []float64{ + duration.Seconds(), // Latency (ms) + float64(clusterStatus.NumberNodes), // Number of nodes + float64(clusterStatus.MemoryFreeTotal), // Total Memory Free + float64(clusterStatus.CPUFreeTotal), // Total CPU Free + averageExecutionTime, // averageExecutionTime + pendingCount + 0.1, //pendingCount + // More criteria can be added here if needed + }) + } else { + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + } + return results +} + +func eventBuild(event string, storage_provider string) ([]byte, string) { + fmt.Println("eventBuild Function...") + var eventMap map[string]interface{} + var cluster_storage string + err := json.Unmarshal([]byte(event), &eventMap) + if err != nil { + fmt.Println("Error to read the event:", err) + return nil, "" + } + //fmt.Println(len(eventMap)) + var eventJSON []byte + if storage, exists := eventMap["storage_provider"]; exists { + fmt.Println("The 'storage_provider' field exists in the event ") + eventValue := eventMap["event"] + //eventString, _ := json.Marshal(eventValue) + + //delegatedEvent1 := WrapEvent(storage.(string), string(eventString)) + delegatedEvent1 := WrapEvent(storage.(string), eventValue.(string)) + fmt.Println("New DelegatedEvent:", delegatedEvent1) + + k, err1 := json.Marshal(delegatedEvent1) + + //k, err1 := json.Marshal(event) + if err1 != nil { + fmt.Printf("error marshalling delegated event: %v ", err1) + return nil, "" + } + + eventJSON = k + cluster_storage = storage.(string) + + } else { + fmt.Println("The 'storage_provider' field does NOT exist in the event") + + delegatedEvent := WrapEvent(storage_provider, event) + fmt.Println("New DelegatedEvent:", delegatedEvent) + + z, err2 := json.Marshal(delegatedEvent) + if err2 != nil { + fmt.Printf("error marshalling delegated event: %v", err2) + return nil, "" + } + eventJSON = z + cluster_storage = storage_provider + } + + //fmt.Println(string(eventJson)) + return eventJSON, cluster_storage +} diff --git a/pkg/resourcemanager/delegate.go b/pkg/resourcemanager/delegate.go index dfa5a225..cc430aa3 100644 --- a/pkg/resourcemanager/delegate.go +++ b/pkg/resourcemanager/delegate.go @@ -21,7 +21,10 @@ import ( "crypto/tls" "encoding/json" "fmt" + + "io" "log" + "math" "math/rand" "net/http" "net/url" @@ -29,6 +32,8 @@ import ( "sort" "strconv" "strings" + + //"sync" "time" "github.com/grycap/oscar/v3/pkg/types" @@ -43,6 +48,8 @@ const ( // tokenCache map to store tokens from services and endpoints -> [CLUSTER_ENDPOINT][SERVICE_NAME] var tokenCache = map[string]map[string]string{} +//var mutex sync.Mutex + // DelegatedEvent wraps the original input event by adding the storage provider ID type DelegatedEvent struct { StorageProviderID string `json:"storage_provider"` @@ -68,23 +75,385 @@ type NodeInfo struct { MemoryPercentage string `json:"memoryPercentage"` } +type Alternative struct { + Index int // Número de la alternativa + Preference float64 // Valor de la preferencia +} + +type JobStatus struct { + Status string `json:"status"` + CreationTime string `json:"creation_time"` + StartTime string `json:"start_time"` + FinishTime string `json:"finish_time"` +} +type JobStatuses map[string]JobStatus + +// Function to execute TOPSIS method +// Normalizes a column by dividing each value by the square root of the sum of squares. +func normalizeMatrix(matrix [][]float64) [][]float64 { + rows := len(matrix) + cols := len(matrix[0]) + normalized := make([][]float64, rows) + for i := range normalized { + normalized[i] = make([]float64, cols) + } + + for j := 0; j < cols; j++ { + // Calculate the norm (square root of the sum of squares of the column) + add := 0.0 + for i := 0; i < rows; i++ { + add += matrix[i][j] * matrix[i][j] + } + norm := math.Sqrt(add) + // Normalize the values ​​of the column + for i := 0; i < rows; i++ { + normalized[i][j] = matrix[i][j] / norm + } + } + return normalized +} + +// Multiply the normalized matrix by the weights. +func weightMatrix(matrix [][]float64, weight []float64) [][]float64 { + rows := len(matrix) + cols := len(matrix[0]) + weighted := make([][]float64, rows) + for i := range weighted { + weighted[i] = make([]float64, cols) + for j := 0; j < cols; j++ { + weighted[i][j] = matrix[i][j] * weight[j] + } + } + return weighted +} + +// Calculate the ideal and anti-ideal solutions. +func calculateSolutions(matrix [][]float64) (ideal []float64, antiIdeal []float64) { + rows := len(matrix) + cols := len(matrix[0]) + + ideal = make([]float64, cols) + antiIdeal = make([]float64, cols) + + for j := 0; j < cols; j++ { + // If the criterion is minimization (let's assume that the first criterion is the one we want to minimize) + + if j == 0 || j == 4 || j == 5 { + // For the ideal solution, we select the minimum value (instead of the maximum) + ideal[j] = matrix[0][j] + antiIdeal[j] = matrix[0][j] + for i := 0; i < rows; i++ { + if matrix[i][j] < ideal[j] { + ideal[j] = matrix[i][j] + } + if matrix[i][j] > antiIdeal[j] { + antiIdeal[j] = matrix[i][j] + } + } + } else { + // For maximization criteria, we normally use the maximum and minimum values + ideal[j] = matrix[0][j] + antiIdeal[j] = matrix[0][j] + for i := 0; i < rows; i++ { + if matrix[i][j] > ideal[j] { + ideal[j] = matrix[i][j] + } + if matrix[i][j] < antiIdeal[j] { + antiIdeal[j] = matrix[i][j] + } + } + } + } + return ideal, antiIdeal +} + +// Calculate the Euclidean distance between an alternative and the ideal or anti-ideal solution +func calculateDistance(alternative []float64, solution []float64) float64 { + add := 0.0 + for i := 0; i < len(alternative); i++ { + add += (alternative[i] - solution[i]) * (alternative[i] - solution[i]) + } + return math.Sqrt(add) +} + +// Calculate the preference index for each alternative +func calculatePreferences(matrix [][]float64, ideal []float64, antiIdeal []float64) []float64 { + rows := len(matrix) + preferences := make([]float64, rows) + + for i := 0; i < rows; i++ { + distanceIdeal := calculateDistance(matrix[i], ideal) + distanceAntiIdeal := calculateDistance(matrix[i], antiIdeal) + preferences[i] = distanceAntiIdeal / (distanceIdeal + distanceAntiIdeal) + } + return preferences +} + +// Order the alternatives from best to worst according to the preference index +func sortAlternatives(preferences []float64) []Alternative { + alternatives := make([]Alternative, len(preferences)) + + // Create a list of alternatives with their preference indices + for i := 0; i < len(preferences); i++ { + alternatives[i] = Alternative{ + Index: i + 1, // Alternativa 1, 2, etc. + Preference: preferences[i], + } + } + + // Sort the alternatives in descending order of preference + sort.Slice(alternatives, func(i, j int) bool { + return alternatives[i].Preference > alternatives[j].Preference + }) + + return alternatives +} + +func distancesFromBetter(alternatives []Alternative) []float64 { + distances := make([]float64, len(alternatives)-1) + + // Calculate distances with the first element + for i := 1; i < len(alternatives); i++ { + distances[i-1] = math.Abs(alternatives[0].Preference - alternatives[i].Preference) + } + + return distances +} + +// Function to randomly rearrange elements whose distance from the first is less than a threshold, including the first +func reorganizeIfNearby(alternatives []Alternative, distances []float64, threshold float64) []Alternative { + + // List of nearby elements (with distance less than the threshold, including the first element) + nearby := []Alternative{alternatives[0]} + + // Identify the other nearby elements + for i := 0; i < len(distances); i++ { + if distances[i] < threshold { + nearby = append(nearby, alternatives[i+1]) + } + } + + // Randomly shuffle nearby items + //rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(nearby), func(i, j int) { + nearby[i], nearby[j] = nearby[j], nearby[i] + }) + + // Create a new reorganized alternative list + newAlternatives := []Alternative{} + j := 0 + + // Insert the rearranged or unarranged elements + for i := 0; i < len(alternatives); i++ { + if i == 0 || distances[i-1] < threshold { + newAlternatives = append(newAlternatives, nearby[j]) // Add the rearranged items + j++ + } else { + newAlternatives = append(newAlternatives, alternatives[i]) // Keep non-close elements + } + } + + return newAlternatives +} + // DelegateJob sends the event to a service's replica func DelegateJob(service *types.Service, event string, logger *log.Logger) error { + + //Block access before executing the function + //mutex.Lock() + //defer mutex.Unlock() + //Determine priority level of each replica to delegate - getClusterStatus(service) - fmt.Println("Replicas: ", service.Replicas) + if service.Delegation == "topsis" { + results := [][]float64{} + //ServiceCPU, err := strconv.ParseFloat(service.CPU, 64) + //if err != nil { + // fmt.Println("Error to converter CPU of service to int: ", err) + // continue + //} + for _, cred := range service.Replicas { + + cluster, ok := service.Clusters[cred.ClusterID] + if !ok { + //logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } - // Check if replicas are sorted by priority and sort it if needed - if !sort.IsSorted(service.Replicas) { - sort.Stable(service.Replicas) - fmt.Println("Replicas Stable: ", service.Replicas) + // Get token + token, err := getServiceToken(cred, cluster) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Parse the cluster's endpoint URL and add the service's path + JobURL, err := url.Parse(cluster.Endpoint) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + JobURL.Path = path.Join(JobURL.Path, "/system/logs/", cred.ServiceName) + + // Make request to get service's definition (including token) from cluster + req2, err := http.NewRequest("GET", JobURL.String(), nil) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Add Headers + for k, v := range cred.Headers { + req2.Header.Add(k, v) + } + + // Add service token to the request + req2.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Make HTTP client + + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } + + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } + + // Send the request + resp2, err := client.Do(req2) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + defer resp2.Body.Close() + body, err := io.ReadAll(resp2.Body) // io.ReadAll-> read body request + if err != nil { + fmt.Printf("Error to read body request to %s: %v\n", cred.URL, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + var jobStatuses JobStatuses + err = json.Unmarshal(body, &jobStatuses) + if err != nil { + fmt.Println("Error decoding the JSON of the response:", err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Show job statuses + + // Count job statuses + averageExecutionTime, pendingCount := countJobs(jobStatuses) + + JobURL, err = url.Parse(cluster.Endpoint) + if err != nil { + //logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + JobURL.Path = path.Join(JobURL.Path, "/system/status/") + req1, err := http.NewRequest("GET", JobURL.String(), nil) + + if err != nil { + fmt.Printf("Error creating request for %s: %v\n", cred.URL, err) + results = append(results, []float64{20, 0, 0, 0, 1e6, 1e6}) + continue + } + + // Add Headers + for k, v := range cred.Headers { + req1.Header.Add(k, v) + } + + // Add service token to the request + req1.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Make the HTTP request + start := time.Now() + resp1, err := client.Do(req1) + duration := time.Since(start) + if err != nil { + //fmt.Printf("Error making request for %s: %v\n", cred.URL, err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + + defer resp1.Body.Close() + var clusterStatus GeneralInfo + err = json.NewDecoder(resp1.Body).Decode(&clusterStatus) + if err != nil { + fmt.Println("Error decoding the JSON of the response:", err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + + serviceCPU, err := strconv.ParseFloat(service.CPU, 64) + + if err != nil { + fmt.Println("Error converting service CPU to float: ", err) + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + continue + } + //results = createParameters(results, duration, clusterStatus, serviceCPU, 1e6, 1e6) + results = createParameters(results, duration, clusterStatus, serviceCPU, averageExecutionTime, float64(pendingCount)) + + } + // Print results as a matrix + fmt.Println("Results matrix:") + for _, row := range results { + fmt.Println(row) + } + + // Criteria weights () + weight := []float64{1, 8, 18, 65, 2, 6} + preferences := topsisMethod(results, weight) + newAlternatives := sortbyThreshold(preferences, 20) + + // Print reordered alternatives + fmt.Println("\nAlternatives reordered by threshold:") + for _, alt := range newAlternatives { + fmt.Printf("Alternative %d: %f\n", alt.Index, alt.Preference) + service.Replicas[alt.Index].Priority = uint(alt.Preference) + } + fmt.Println("Replicas stable to topsis method: ", service.Replicas) + + //fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") + } else { + getClusterStatus(service) + fmt.Println("Replicas: ", service.Replicas) + + // Check if replicas are sorted by priority and sort it if needed + if !sort.IsSorted(service.Replicas) { + sort.Stable(service.Replicas) + fmt.Println("Replicas Stable: ", service.Replicas) + } } - delegatedEvent := WrapEvent(service.ClusterID, event) + fmt.Println("Event : ", event) + + /*delegatedEvent := WrapEvent(service.ClusterID, event) + eventJSON, err := json.Marshal(delegatedEvent) if err != nil { return fmt.Errorf("error marshalling delegated event: %v", err) - } + }*/ + + storage_provider := service.ClusterID + //Create event depending on delegation level + + eventJSON, provider := eventBuild(event, storage_provider) + fmt.Println("Storage_provider : ", provider) + //fmt.Println(string(eventJSON)) + + //if provider != "" { //storatage _provider not empty in the delegation proccess. for _, replica := range service.Replicas { // Manage if replica.Type is "oscar" and have the capacity to receive a service @@ -114,7 +483,9 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) // Make request to get service's definition (including token) from cluster + //fmt.Println(string(eventJSON)) req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) + if err != nil { logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) continue @@ -129,10 +500,12 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) // Make HTTP client + var transport http.RoundTripper = &http.Transport{ // Enable/disable SSL verification TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, } + client := &http.Client{ Transport: transport, Timeout: time.Second * 20, @@ -215,6 +588,9 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) } } + //} else { + // fmt.Println("Error by Storage_Provider empty.") + //} return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name) } @@ -267,6 +643,7 @@ func updateServiceToken(replica types.Replica, cluster types.Cluster) (string, e req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword) // Make HTTP client + var transport http.RoundTripper = &http.Transport{ // Enable/disable SSL verification TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, @@ -307,15 +684,16 @@ func updateServiceToken(replica types.Replica, cluster types.Cluster) (string, e } func getClusterStatus(service *types.Service) { - fmt.Println("Process to getClusterStatus function") - for _, replica := range service.Replicas { + fmt.Println("Process to getClusterStatus function.") + for id, replica := range service.Replicas { // Manage if replica.Type is "oscar" if strings.ToLower(replica.Type) == oscarReplicaType { // Check ClusterID is defined in 'Clusters' cluster, ok := service.Clusters[replica.ClusterID] if !ok { if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } fmt.Printf("Error checking to ClusterID \"%s\": Cluster not defined\n", replica.ClusterID) continue @@ -323,7 +701,10 @@ func getClusterStatus(service *types.Service) { // Parse the cluster's endpoint URL and add the service's path getStatusURL, err := url.Parse(cluster.Endpoint) if err != nil { - replica.Priority = noDelegateCode + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } fmt.Printf("Error parsing the cluster's endpoint URL to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", replica.ClusterID, cluster.Endpoint, err) continue } @@ -333,7 +714,8 @@ func getClusterStatus(service *types.Service) { req, err := http.NewRequest(http.MethodGet, getStatusURL.String(), nil) if err != nil { if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } fmt.Printf("Error making request to ClusterID \"%s\": unable to make request: %v\n", replica.ClusterID, err) continue @@ -342,10 +724,12 @@ func getClusterStatus(service *types.Service) { req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword) // Make HTTP client + fmt.Println("SSLVerify :", cluster.SSLVerify) var transport http.RoundTripper = &http.Transport{ // Enable/disable SSL verification TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, } + fmt.Println("Transport : ", transport) client := &http.Client{ Transport: transport, Timeout: time.Second * 20, @@ -353,9 +737,11 @@ func getClusterStatus(service *types.Service) { // Send the request res, err := client.Do(req) + fmt.Println("StatusCode : ", res.StatusCode) if err != nil { if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } fmt.Printf("Error getting cluster status to ClusterID \"%s\": unable to send request: %v\n", replica.ClusterID, err) continue @@ -370,7 +756,8 @@ func getClusterStatus(service *types.Service) { err = json.NewDecoder(res.Body).Decode(&clusterStatus) if err != nil { if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } fmt.Println("Error decoding the JSON of the response:", err) continue @@ -380,48 +767,58 @@ func getClusterStatus(service *types.Service) { // CPU required to deploy the service serviceCPU, err := strconv.ParseFloat(service.CPU, 64) if err != nil { - replica.Priority = noDelegateCode + if service.Delegation != "static" { + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode + } fmt.Println("Error to converter CPU of service to int: ", err) continue } - fmt.Println("serviceCPU :", serviceCPU) + fmt.Println("serviceCPU: ", serviceCPU) maxNodeCPU := float64(clusterStatus.CPUMaxFree) - fmt.Println("maxNodeCPU", maxNodeCPU) + fmt.Println("maxNodeCPU: ", maxNodeCPU) //Calculate CPU difference to determine whether to delegate a replica to the cluster dist := maxNodeCPU - (1000 * serviceCPU) - fmt.Println("CPU difference ", dist) + fmt.Println("CPU difference: ", dist) //The priority of delegating the service is set based on the free CPU of the cluster as long as it has free CPU on a node to delegate the service. if dist >= 0 { fmt.Println("Resources available in ClusterID", replica.ClusterID) if service.Delegation == "random" { randPriority := rand.Intn(noDelegateCode) - replica.Priority = uint(randPriority) - fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation") + service.Replicas[id].Priority = uint(randPriority) + //replica.Priority = uint(randPriority) + fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") } else if service.Delegation == "load-based" { //Map the totalClusterCPU range to a smaller range (input range 0 to 32 cpu to output range 100 to 0 priority) totalClusterCPU := clusterStatus.CPUFreeTotal mappedCPUPriority := mapToRange(totalClusterCPU, 0, 32000, 100, 0) - replica.Priority = uint(mappedCPUPriority) - fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation") + service.Replicas[id].Priority = uint(mappedCPUPriority) + //replica.Priority = uint(mappedCPUPriority) + fmt.Println("Priority ", service.Replicas[id].Priority, " with ", service.Delegation, " delegation") + } else if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode fmt.Println("Error when declaring the type of delegation in ClusterID ", replica.ClusterID) continue } + } else { fmt.Println("No CPU capacity to delegate job in ClusterID ", replica.ClusterID) if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } continue } fmt.Println("Status Cluster ", clusterStatus) - fmt.Println("Priority ", replica.Priority) + fmt.Println("Priority: ", service.Replicas[id].Priority) } else { if service.Delegation != "static" { - replica.Priority = noDelegateCode + service.Replicas[id].Priority = noDelegateCode + //replica.Priority = noDelegateCode } fmt.Printf("Error to get of cluster status to ClusterID\"%s\"\n", replica.ClusterID) } @@ -443,3 +840,166 @@ func mapToRange(value, minInput, maxInput, maxOutput, minOutput int64) int { return mappedInt } + +func topsisMethod(results [][]float64, weight []float64) []float64 { + + // Step 1: Normalize the matrix + matrixNormalized := normalizeMatrix(results) + //fmt.Println("//Normalized matrix: ") + //for _, row := range matrizNormalizada { + // fmt.Println(row) + //} + + // Step 2: Weight the matrix + matrixWeighted := weightMatrix(matrixNormalized, weight) + //fmt.Println("\n//Weighted Matrix:") + //for _, row := range matrizPonderada { + // fmt.Println(row) + //} + + // Step 3: Compute the ideal and anti-ideal solution + ideal, antiIdeal := calculateSolutions(matrixWeighted) + //fmt.Println("\nIdeal Solution:", ideal) + //fmt.Println("Anti-Ideal Solution:", antiIdeal) + + // Step 4: Compute the distances and preference index + preferences := calculatePreferences(matrixWeighted, ideal, antiIdeal) + fmt.Println("\nPreference index:", preferences) + + return preferences + +} + +func sortbyThreshold(preferences []float64, umbral int) []Alternative { + // Step 5: Order alternatives from best to worst + alternativesSort := sortAlternatives(preferences) + + fmt.Println("\nAlternatives ordered from best to worst:") + for _, alt := range alternativesSort { + fmt.Printf("Alternative %d: %f\n", alt.Index, alt.Preference) + + //mapped := mapToRange(int64(alt*100.0), 0, 100, 100, 0) + //fmt.Printf("Original Preference: %.4f -> Mapped %d\n", alt, mapped) + } + + distancesFromBetter := distancesFromBetter(alternativesSort) + + // Threshold to reorder nearby elements + f := float64(umbral) / 100.0 + threshold := alternativesSort[0].Preference * f + fmt.Printf("The threshold is the %d %% of the best value: %f ", umbral, threshold) + + // Randomly reorder elements whose distance is less than the threshold, including the first one + newAlternatives := reorganizeIfNearby(alternativesSort, distancesFromBetter, threshold) + + var priority_map int + for id, alt := range newAlternatives { + //fmt.Printf("Alternative %d: %d\n", alt.Index, int(alt.Preference*100)) + priority_map = mapToRange(int64(alt.Preference*100.0), 0, 100, 100, 0) + newAlternatives[id].Preference = float64(priority_map) + //fmt.Printf("Alternative %d: %f\n", alt.Index, newAlternatives[id].Preference) + } + + return newAlternatives + +} + +func countJobs(jobStatuses map[string]JobStatus) (float64, int) { + totalJobs := 0 + succeededCount := 0 + failedCount := 0 + pendingCount := 0 + totalExecutionTime := 0.0 + for _, status := range jobStatuses { + totalJobs++ + switch status.Status { + case "Succeeded": + succeededCount++ + creationTime, _ := time.Parse(time.RFC3339, status.CreationTime) + finishTime, _ := time.Parse(time.RFC3339, status.FinishTime) + duration := finishTime.Sub(creationTime).Seconds() // Duration in seconds + totalExecutionTime += duration + case "Failed": + failedCount++ + case "Pending": // Pending jobs + pendingCount++ + } + } + + var averageExecutionTime float64 = 1e6 + if succeededCount > 0 { + averageExecutionTime = totalExecutionTime / float64(succeededCount) + } + return averageExecutionTime, pendingCount + +} + +func createParameters(results [][]float64, duration time.Duration, clusterStatus GeneralInfo, serviceCPU float64, averageExecutionTime float64, pendingCount float64) [][]float64 { + maxNodeCPU := float64(clusterStatus.CPUMaxFree) + dist := maxNodeCPU - (1000 * serviceCPU) + + if dist >= 0 { + results = append(results, []float64{ + duration.Seconds(), // Latency (ms) + float64(clusterStatus.NumberNodes), // Number of nodes + float64(clusterStatus.MemoryFreeTotal), // Total Memory Free + float64(clusterStatus.CPUFreeTotal), // Total CPU Free + averageExecutionTime, // averageExecutionTime + pendingCount + 0.1, //pendingCount + // More criteria can be added here if needed + }) + } else { + results = append(results, []float64{duration.Seconds(), 0, 0, 0, 1e6, 1e6}) + } + return results +} + +func eventBuild(event string, storage_provider string) ([]byte, string) { + fmt.Println("eventBuild Function...") + var eventMap map[string]interface{} + var cluster_storage string + err := json.Unmarshal([]byte(event), &eventMap) + if err != nil { + fmt.Println("Error to read the event:", err) + return nil, "" + } + //fmt.Println(len(eventMap)) + var eventJSON []byte + if storage, exists := eventMap["storage_provider"]; exists { + fmt.Println("The 'storage_provider' field exists in the event ") + eventValue := eventMap["event"] + //eventString, _ := json.Marshal(eventValue) + + //delegatedEvent1 := WrapEvent(storage.(string), string(eventString)) + delegatedEvent1 := WrapEvent(storage.(string), eventValue.(string)) + fmt.Println("New DelegatedEvent:", delegatedEvent1) + + k, err1 := json.Marshal(delegatedEvent1) + + //k, err1 := json.Marshal(event) + if err1 != nil { + fmt.Printf("error marshalling delegated event: %v ", err1) + return nil, "" + } + + eventJSON = k + cluster_storage = storage.(string) + + } else { + fmt.Println("The 'storage_provider' field does NOT exist in the event") + + delegatedEvent := WrapEvent(storage_provider, event) + fmt.Println("New DelegatedEvent:", delegatedEvent) + + z, err2 := json.Marshal(delegatedEvent) + if err2 != nil { + fmt.Printf("error marshalling delegated event: %v", err2) + return nil, "" + } + eventJSON = z + cluster_storage = storage_provider + } + + //fmt.Println(string(eventJson)) + return eventJSON, cluster_storage +} diff --git a/pkg/types/config.go b/pkg/types/config.go index b871ba86..3777791e 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -189,7 +189,7 @@ type Config struct { IngressHost string `json:"-"` // Github path of FaaS Supervisor (needed for Interlink config) - SupervisorKitImage string `json:"-"` + SupervisorURL string `json:"-"` //Path to additional OSCAR configuration setted by users AdditionalConfigPath string `json:"-"` @@ -238,7 +238,7 @@ var configVars = []configVar{ {"OIDCSubject", "OIDC_SUBJECT", false, stringType, ""}, {"OIDCGroups", "OIDC_GROUPS", false, stringSliceType, ""}, {"IngressHost", "INGRESS_HOST", false, stringType, ""}, - {"SupervisorKitImage", "SUPERVISOR_KIT_IMAGE", false, stringType, ""}, + {"SupervisorURL", "SUPERVISOR_URL", false, stringType, "https://github.com/grycap/faas-supervisor/releases/download/1.5.8/supervisor"}, {"AdditionalConfigPath", "ADDITIONAL_CONFIG_PATH", false, stringType, "config.yaml"}, } diff --git a/pkg/types/interlink.go b/pkg/types/interlink.go deleted file mode 100644 index e80cbbea..00000000 --- a/pkg/types/interlink.go +++ /dev/null @@ -1,101 +0,0 @@ -/* -Copyright (C) GRyCAP - I3M - UPV - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -import ( - "encoding/base64" - - v1 "k8s.io/api/core/v1" -) - -const ( - ContainerSupervisorName = "supervisor-container" - SupervisorMountPath = "/data" - SupervisorArg = "cp -r /supervisor/* " + SupervisorMountPath - NameSupervisorVolume = "supervisor-share-data" - NodeSelectorKey = "kubernetes.io/hostname" - - // Annotations for InterLink nodes - InterLinkDNSPolicy = "ClusterFirst" - InterLinkRestartPolicy = "OnFailure" - InterLinkTolerationKey = "virtual-node.interlink/no-schedule" - InterLinkTolerationOperator = "Exists" -) - -var SupervisorCommand = []string{"/bin/sh", "-c"} -var OscarContainerCommand = []string{"echo $EVENT | base64 -d | " + SupervisorMountPath + "/supervisor"} - -// SetInterlinkJob Return interlink configuration for kubernetes job and add Interlink variables to podSpec -func SetInterlinkJob(podSpec *v1.PodSpec, service *Service, cfg *Config, eventBytes []byte) ([]string, v1.EnvVar, []string) { - command := SupervisorCommand - event := v1.EnvVar{ - Name: EventVariable, - Value: base64.StdEncoding.EncodeToString([]byte(eventBytes)), - } - args := OscarContainerCommand - podSpec.NodeSelector = map[string]string{ - NodeSelectorKey: service.InterLinkNodeName, - } - podSpec.DNSPolicy = InterLinkDNSPolicy - podSpec.RestartPolicy = InterLinkRestartPolicy - podSpec.Tolerations = []v1.Toleration{ - { - Key: InterLinkTolerationKey, - Operator: InterLinkTolerationOperator, - }, - } - - addInitContainer(podSpec, cfg) - return command, event, args -} - -// SetInterlinkService Add InterLink configuration to podSpec -func SetInterlinkService(podSpec *v1.PodSpec) { - podSpec.Containers[0].ImagePullPolicy = "Always" - shareDataVolumeMount := v1.VolumeMount{ - Name: NameSupervisorVolume, - MountPath: SupervisorMountPath, - } - - podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, shareDataVolumeMount) - - shareDataVolume := v1.Volume{ - Name: NameSupervisorVolume, - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - } - podSpec.Volumes = append(podSpec.Volumes, shareDataVolume) - -} - -func addInitContainer(podSpec *v1.PodSpec, cfg *Config) { - initContainer := v1.Container{ - Name: ContainerSupervisorName, - Command: SupervisorCommand, - Args: []string{SupervisorArg}, - Image: cfg.SupervisorKitImage, - ImagePullPolicy: v1.PullIfNotPresent, - VolumeMounts: []v1.VolumeMount{ - { - Name: NameSupervisorVolume, - MountPath: SupervisorMountPath, - }, - }, - } - podSpec.InitContainers = []v1.Container{initContainer} -} diff --git a/pkg/types/mount.go b/pkg/types/mount.go index 71915fe2..4c2e5621 100644 --- a/pkg/types/mount.go +++ b/pkg/types/mount.go @@ -49,6 +49,8 @@ done` // SetMount Creates the sidecar container that mounts the source volume onto the pod volume func SetMount(podSpec *v1.PodSpec, service Service, cfg *Config) { podSpec.Containers = append(podSpec.Containers, sidecarPodSpec(service)) + termination := int64(5) + podSpec.TerminationGracePeriodSeconds = &termination addVolume(podSpec) } diff --git a/pkg/types/service.go b/pkg/types/service.go index 1ed5c5d3..9645b41e 100644 --- a/pkg/types/service.go +++ b/pkg/types/service.go @@ -307,7 +307,7 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) { } if cfg.InterLinkAvailable && service.InterLinkNodeName != "" { // Add specs of InterLink - SetInterlinkService(podSpec) + podSpec.Containers[0].ImagePullPolicy = "Always" } else { // Add specs volumeMount := v1.VolumeMount{ diff --git a/pkg/utils/auth/auth.go b/pkg/utils/auth/auth.go index 559c6aad..40883f7c 100644 --- a/pkg/utils/auth/auth.go +++ b/pkg/utils/auth/auth.go @@ -18,9 +18,7 @@ package auth import ( "fmt" - "log" "strings" - "time" "github.com/gin-gonic/gin" "github.com/grycap/oscar/v3/pkg/types" @@ -64,49 +62,6 @@ func CustomAuth(cfg *types.Config, kubeClientset *kubernetes.Clientset) gin.Hand } } -// GetLoggerMiddleware returns a gin handler as middleware to log custom info about sync/async executions -func GetLoggerMiddleware() gin.HandlerFunc { - return func(c *gin.Context) { - - // Disable default printf timestamp to avoid inconsistencies on logs - log.SetFlags(0) - - startTime := time.Now() - - // Process request - c.Next() - - endTime := time.Now() - - // Log custom information after the request is processed - logTime := endTime.Format("2006/01/02 - 15:04:05") - latency := time.Since(startTime) - status := c.Writer.Status() - clientIP := c.ClientIP() - method := c.Request.Method - path := c.Request.URL.Path - - // Get EGI UID from context (if OIDC auth is used) - uid, uidExists := c.Get("uidOrigin") - var user string - if uidExists { - user, _ = uid.(string) - } else { - // Set OSCAR as default user when no UID is found - user = "oscar" - } - - // Get source IP from context for jobs triggered through MinIO events - IPAddress, AddressExists := c.Get("IPAddress") - if AddressExists { - clientIP, _ = IPAddress.(string) - } - - log.Printf("[GIN-EXECUTIONS-LOGGER] %s | %3d | %13v | %s | %-7s %s | %s", - logTime, status, latency, clientIP, method, path, user) - } -} - func GetUIDFromContext(c *gin.Context) (string, error) { uidOrigin, uidExists := c.Get("uidOrigin") if !uidExists { diff --git a/pkg/utils/auth/oidc.go b/pkg/utils/auth/oidc.go index fb60575b..9fb2a21f 100644 --- a/pkg/utils/auth/oidc.go +++ b/pkg/utils/auth/oidc.go @@ -51,8 +51,8 @@ type oidcManager struct { // userInfo custom struct to store essential fields from UserInfo type userInfo struct { - Subject string - Groups []string + subject string + groups []string } // newOIDCManager returns a new oidcManager or error if the oidc.Provider can't be created @@ -96,17 +96,17 @@ func getOIDCMiddleware(kubeClientset *kubernetes.Clientset, minIOAdminClient *ut rawToken := strings.TrimPrefix(authHeader, "Bearer ") // Check the token - if !oidcManager.IsAuthorised(rawToken) { + if !oidcManager.isAuthorised(rawToken) { c.AbortWithStatus(http.StatusUnauthorized) return } - ui, err := oidcManager.GetUserInfo(rawToken) + ui, err := oidcManager.getUserInfo(rawToken) if err != nil { c.String(http.StatusInternalServerError, fmt.Sprintf("%v", err)) return } - uid := ui.Subject + uid := ui.subject // Check if exist MinIO user in cached users list minioUserExists := mc.UserExists(uid) @@ -142,8 +142,8 @@ func (om *oidcManager) clearExpired() { } } -// GetUserInfo obtains UserInfo from the issuer -func (om *oidcManager) GetUserInfo(rawToken string) (*userInfo, error) { +// getUserInfo obtains UserInfo from the issuer +func (om *oidcManager) getUserInfo(rawToken string) (*userInfo, error) { ot := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: rawToken}) // Get OIDC UserInfo @@ -160,8 +160,8 @@ func (om *oidcManager) GetUserInfo(rawToken string) (*userInfo, error) { // Create "userInfo" struct and add the groups return &userInfo{ - Subject: ui.Subject, - Groups: getGroups(claims.EdupersonEntitlement), + subject: ui.Subject, + groups: getGroups(claims.EdupersonEntitlement), }, nil } @@ -184,11 +184,11 @@ func getGroups(urns []string) []string { // UserHasVO checks if the user contained on the request token is enrolled on a specific VO func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) { - ui, err := om.GetUserInfo(rawToken) + ui, err := om.getUserInfo(rawToken) if err != nil { return false, err } - for _, gr := range ui.Groups { + for _, gr := range ui.groups { if vo == gr { return true, nil } @@ -197,15 +197,16 @@ func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) { } func (om *oidcManager) GetUID(rawToken string) (string, error) { - ui, err := om.GetUserInfo(rawToken) + ui, err := om.getUserInfo(rawToken) + oidcLogger.Println("received uid: ", ui.subject) if err != nil { - return "", err + return ui.subject, nil } - return ui.Subject, nil + return "", err } -// IsAuthorised checks if a token is authorised to access the API -func (om *oidcManager) IsAuthorised(rawToken string) bool { +// isAuthorised checks if a token is authorised to access the API +func (om *oidcManager) isAuthorised(rawToken string) bool { // Check if the token is valid _, err := om.provider.Verifier(om.config).Verify(context.TODO(), rawToken) if err != nil { @@ -216,7 +217,7 @@ func (om *oidcManager) IsAuthorised(rawToken string) bool { ui, found := om.tokenCache[rawToken] if !found { // Get userInfo from the issuer - ui, err = om.GetUserInfo(rawToken) + ui, err = om.getUserInfo(rawToken) if err != nil { return false } @@ -230,12 +231,12 @@ func (om *oidcManager) IsAuthorised(rawToken string) bool { // Check if is authorised // Same subject - if ui.Subject == om.subject { + if ui.subject == om.subject { return true } // Groups - for _, tokenGroup := range ui.Groups { + for _, tokenGroup := range ui.groups { for _, authGroup := range om.groups { if tokenGroup == authGroup { return true diff --git a/ui b/ui index 3ce8470b..98de936c 160000 --- a/ui +++ b/ui @@ -1 +1 @@ -Subproject commit 3ce8470b994e73789ec086e612af66b9721f2aff +Subproject commit 98de936c5923d3b3ccaef500c1a92f35009bbf53