Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the execution controller restartable on kafka in k8s #1007

Closed
godber opened this issue Feb 6, 2019 · 15 comments · Fixed by #3740
Closed

Make the execution controller restartable on kafka in k8s #1007

godber opened this issue Feb 6, 2019 · 15 comments · Fixed by #3740
Assignees
Labels
enhancement k8s Applies to Teraslice in kubernetes cluster mode only. pkg/teraslice priority:future

Comments

@godber
Copy link
Member

godber commented Feb 6, 2019

I think it should be possible to make the execution controller restartable in the case of kafka readers and in k8s. I guess there's the question of persisting the in memory state in the execution controller, but it can't be that much in the case of kafka.

@peterdemartini maybe you can point me in the right direction. Or maybe this should be for you to do.

@godber godber added enhancement k8s Applies to Teraslice in kubernetes cluster mode only. pkg/teraslice labels Feb 6, 2019
@godber
Copy link
Member Author

godber commented Feb 6, 2019

I think the question of, "How do workers find the restarted execution controller?" Is answered by the use of k8s services. The workers already access the execution controller through a service, this service will route traffic to any execution controller pod that matches a specific criteria.

@godber
Copy link
Member Author

godber commented Feb 6, 2019

Also, this doesn't have to be discussed or resolve now, I just wanted to get the issue posted, since it would be a huge win in kubernetes.

@peterdemartini
Copy link
Contributor

There are a number of things that would need to change in the execution controller, particularly around the execution state and what happens when it restarts. The execution should update the state to a “paused,” a new “suspended” state, or something similar. The slicer would have to be able to specify that it stores no critical state and can be restartable.

The workers will need to be notified by the execution controller that it is “suspended” so that it doesn’t shut down when the execution controller shutdowns.

@kstaken
Copy link
Member

kstaken commented Aug 6, 2019

This basically requires HA support for execution controllers.

@godber
Copy link
Member Author

godber commented Aug 20, 2024

At this point we're experimenting with how things work and will come up with a proposed solution. After Joseph does some experimenting and code reading, we should all talk with Jared since he has broader/deeper understanding of the existing imeplementations.

@sotojn
Copy link
Contributor

sotojn commented Aug 29, 2024

After looking into this and testing some jobs using the kubernetesV2 backend I feel I have an understanding of what would be lost in memory and the effects of restarting an execution pod.

For context, the kubernetesV2 backend allows the execution controller to come back up in the event the pod is killed unexpectedly. This means that the pod died while the execution record was still in a "running" state. This happens when the execution controller doesn't have an opportunity to run the shutdown code. (NOTE: The shutdown code will change the execution record status)

Here is a list of data in memory that the execution controller will lose when restarted:

  • Several state counters that include the slicer analytics
    • Inside the top level ExecutionController class
      • pendingDispatches
      • pendingSlices Starts at 0 and used to check for pending slices
      • startTime A Date.now() that is triggered when execution starts running
    • Inside the Scheduler class
      • _creating I believe this is the number of slices being created to be put into queue
      • queue A queue class that holds pending slices
    • Inside the ExecutionAnalytics class
      • The executionAnalytics object that contains the following:
        • workers_available starting value is 0
        • workers_active starting value is 0
        • workers_joined starting value is 0
        • workers_reconnected starting value is 0
        • workers_disconnected starting value is 0
        • job_duration starting value is 0
        • failed starting value is 0
        • subslices starting value is 0
        • queued starting value is 0
        • slice_range_expansion starting value is 0
        • processed starting value is 0
        • slicers starting value is 0
        • subslice_by_key starting value is 0
        • started starting value is undefined
        • queuing_complete starting value is undefined
      • The pushedAnalytics object that contains the following:
        • processed starting value is 0
        • failed starting value is 0
        • queued starting value is 0
        • job_duration starting value is 0
        • workers_joined starting value is 0
        • workers_disconnected starting value is 0
        • workers_reconnected starting value is 0
    • Inside the ExController.Server Class
      • queue Starts with a new Queue()
      • _activeWorkers starts as an empty object
    • inside the Metrics class
      • gcStats starts as gc-stats default
      • eventLoopStats starts as imported event-loop-stats module
      • _intervals starts as empty Array
      • _typesCollectedAt starts as empty object

I next want to list all the slicers and what will be lost in memory. From what I tested, it seems that in the case of an execution restart, it will boot up as if it were it's first time booting up and start from the beginning. I made a once data-gen-to-kafka job that would generate 1 million records and write them into kafka. I killed the ex controller about 300k recored in and then let it run to completion. This is resulted in about 1.3 million records in kafka which is expected.

I'm also looking into how recovery works within the execution controller and how we might be able to leverage that code into gathering where the execution previously left off. We might be able to trigger recovery automatically when the ex controller comes up and notices that its execution record is already in a running state during the initializing step.

@sotojn
Copy link
Contributor

sotojn commented Sep 3, 2024

Just to confirm and document the behavior of restarting an execution controller, I created a kafka job that dies and restarts. For context there is a kafka topic named test-topic-1 with exactly 500000 messages on it.

Job file:

{
    "name": "kafka-to-es-ex-shutdown",
    "lifecycle": "persistent",
    "workers": 1,
    "assets": [
        "kafka",
        "elasticsearch"
    ],
    "operations": [
        {
            "_op": "kafka_reader",
            "topic": "test-topic-1",
            "group": "test-group-ex-shutdown",
            "size": 25000
        },
        {
            "_op": "elasticsearch_bulk",
            "size": 25000,
            "index": "es-test-ex-shutdown"
        }
    ]
}

Midway through the running the execution I force shut it down. The worker logs show this when the ex_controller pod dies:

[2024-09-03T19:53:54.687Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: execution: 761b8589-2f80-4742-b71d-76dfc4550929 initialized worker (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:53:58.734Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: got new assignments partition: 0 (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:53:59.438Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 4.65sec (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:01.098Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "a9a448d4-9776-4329-85b3-fac3fd597586", slicer_id: 0, slicer_order: 1, _created: "2024-09-03T19:53:46.120Z", time: [4646, 1649], memory: [62987896, -4534080], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=a9a448d4-9776-4329-85b3-fac3fd597586)
[2024-09-03T19:54:01.100Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice a9a448d4-9776-4329-85b3-fac3fd597586 completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:01.120Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:01.491Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 371ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:02.596Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "63084bab-67f5-412f-84ce-ea2fddf081d5", slicer_id: 0, slicer_order: 2, _created: "2024-09-03T19:53:54.783Z", time: [372, 1094], memory: [-32556544, 65140240], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=63084bab-67f5-412f-84ce-ea2fddf081d5)
[2024-09-03T19:54:02.635Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice 63084bab-67f5-412f-84ce-ea2fddf081d5 completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:02.699Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:03.037Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 337ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:04.170Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "7737b62a-1f9e-47d1-955f-5aeb76a78a8c", slicer_id: 0, slicer_order: 3, _created: "2024-09-03T19:53:54.788Z", time: [338, 1056], memory: [77386688, -77747800], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=7737b62a-1f9e-47d1-955f-5aeb76a78a8c)
[2024-09-03T19:54:04.170Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice 7737b62a-1f9e-47d1-955f-5aeb76a78a8c completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:04.183Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:04.635Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 452ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:05.273Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 disconnected { reason: 'transport close' } (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:05.595Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: flushed 6 records to index ts-dev1__analytics* (assignment=worker, module=analytics_storage, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:05.596Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "6436b7ea-9a54-4b1c-bdcb-77bf549019bd", slicer_id: 0, slicer_order: 4, _created: "2024-09-03T19:54:01.121Z", time: [453, 953], memory: [-20268584, 63360832], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=6436b7ea-9a54-4b1c-bdcb-77bf549019bd)
[2024-09-03T19:54:05.669Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice 6436b7ea-9a54-4b1c-bdcb-77bf549019bd completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:05.671Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: server is not ready and not-connected, waiting for the ready event (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:06.125Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:08.900Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:12.873Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:15.485Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: flushed 2 records to index ts-dev1__analytics* (assignment=worker, module=analytics_storage, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:17.879Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:22.884Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:27.892Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 is reconnecting... (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:27.904Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 reconnected (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:27.911Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: client 10.244.0.13__3QYqY6X4 connected (assignment=worker, module=messaging:client, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:27.932Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:28.481Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 549ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:29.395Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "c1436e6f-9e74-4903-b7d3-92d507f5859e", slicer_id: 0, slicer_order: 1, _created: "2024-09-03T19:54:23.966Z", time: [549, 909], memory: [-22310976, 36781928], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=c1436e6f-9e74-4903-b7d3-92d507f5859e)
[2024-09-03T19:54:29.397Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice c1436e6f-9e74-4903-b7d3-92d507f5859e completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:29.414Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:29.881Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 467ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:31.389Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "f6c94e73-2a99-4edb-a56e-d1b50311cbaf", slicer_id: 0, slicer_order: 2, _created: "2024-09-03T19:54:27.927Z", time: [467, 1497], memory: [-37931904, 59981856], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=f6c94e73-2a99-4edb-a56e-d1b50311cbaf)
[2024-09-03T19:54:31.389Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice f6c94e73-2a99-4edb-a56e-d1b50311cbaf completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:31.478Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:31.879Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 401ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:32.956Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: analytics for slice: slice_id: "15458bf5-5c33-44d9-9f40-1c484b17a5de", slicer_id: 0, slicer_order: 3, _created: "2024-09-03T19:54:28.021Z", time: [401, 1066], memory: [-20560096, 65265576], size: [25000, 25000] (assignment=worker, module=slice, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, slice_id=15458bf5-5c33-44d9-9f40-1c484b17a5de)
[2024-09-03T19:54:32.958Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: slice 15458bf5-5c33-44d9-9f40-1c484b17a5de completed (assignment=worker, module=worker, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c)
[2024-09-03T19:54:33.041Z] DEBUG: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: partition 0 for topic test-topic-1 is behind 1 commits (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)
[2024-09-03T19:54:33.445Z]  INFO: teraslice/50 on ts-wkr-kafka-to-es-ex-shutdown-8be608de-db2b-65cb44fcdf-5rnhl: Resolving with 25000 results, took 404ms (assignment=worker, module=operation-api, worker_id=3QYqY6X4, ex_id=761b8589-2f80-4742-b71d-76dfc4550929, job_id=8be608de-db2b-4576-b2b1-2b09b9c11a9c, apiName=kafka_reader_api:kafka_reader-0)

It will briefly disconnect and reconnect and continue. I've verifed that the consumer group completes it offset and finishes before stopping the job. It seems it takes elasticsearch a good couple of minutes to update the records to the index. But eventually all 500000 records show up to the index which is expected.

health status index                      uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   ts-dev1__ex                27ejA2uhSn2s62J-s1Yqjg   5   1          2            0     93.3kb         93.3kb
yellow open   ts-dev1__jobs              L1wmabIFSfihcxOXBlisZQ   5   1          2            0       10kb           10kb
yellow open   ts-dev1__assets            9pNrQlUGQ3-5JrWkGKEP8g   5   1          3            0      4.2mb          4.2mb
yellow open   ts-dev1__state-2024.09     oCPjgyLcQ5ydc0HyBVv6wQ   5   1         48           50    336.5kb        336.5kb
yellow open   es-test-ex-shutdown        gWTuZoi3Rgu60XZEC3IsTQ   1   1     500000            0    333.6mb        333.6mb
yellow open   ts-dev1__analytics-2024.09 meVoAQwoRiaNT8ykfhQiPA   5   1         80            0    176.8kb        176.8kb

Note that I killed the ex_controller in a way that would not allow the shutdown code to run. I did this by making a temporary endpoint on the ex_controller to run process.exit(1). There is still a lot to think about how the execution controller should shutdown in each senario. All senarios that cause the shutdown code to run will result in the deletion of the kubernetes job resource related to execution id. We need to consider how the execution shuts down in the event you want to migrate pods from one cluster node to another.

@godber
Copy link
Member Author

godber commented Sep 3, 2024

There is still a lot to think about how the execution controller should shutdown in each senario. All senarios that cause the shutdown code to run will result in the deletion of the kubernetes job resource related to execution id. We need to consider how the execution shuts down in the event you want to migrate pods from one cluster node to another.

Yeah, that's an important point.

@sotojn
Copy link
Contributor

sotojn commented Sep 3, 2024

What I need to do is modify the execution shutdown code to check to see if the execution status is changed to stopping at the time shutdown has been called. If this is the case, that means that we are trying to stop a job so we should run the entire shutdown process and close out. If not, it means we aren't stopping the job and to prepare to restart without signaling to all the workers to shutdown.

@godber
Copy link
Member Author

godber commented Sep 3, 2024

One of the main goals here is to allow kubectl drain node1 (where node1 has a running execution controller) and kubectl delete pod ts-exc-foo to work without stopping a job. The work will be temporarily suspended while the execution controller moves or restarts since the workers will not get slices, but the job should not stop.

We have two scenarios we want to support, when an execution controller:

  1. shuts down cleanly (like shown above)
  2. disappears without shutting down cleanly, like when a host dies

Let's concentrate on solving those "process shutdown" problems without confusing them with "job shutdown" for Kafka only since that's the easy case. Then we can deal with slicers that may need changing.

An open question is, in case 1. above, when the execution controller shuts down cleanly, should it change the job state to something like "paused". It might be nice to have the state change so if we look at the status of the job we know why lag is happening or something. I doubt we want to reuse pause, since that indicates a user initiated action, this is a system initiated action.

@sotojn
Copy link
Contributor

sotojn commented Sep 4, 2024

I have made changes to the execution controller that will shutdown in one of 2 ways in the event of a SIGTERM. When the SIGTERM event is triggered, the execution controller will refresh the state storage and check the current status of the execution.

  • If the status is in a running state: The execution controller will log that it's Skipping shutdown to allow restart... then proceed to close out without notifying workers to shutdown and without changing state.
  • If the status is in a stopping state: The execution will proceed with it's normal shutdown process which includes flushing flushing all sorts of slices, analytics, etc and notifying workers to shutdown.
  • NOTE: We use the stopping state as an indicator to shutdown and not restart because the cluster master sets the execution to the stopping state when the api calls stop on a job/execution.

I did some tests with this new logic on a persistent job that read from kafka and wrote into elasticsearch. I ran kubectl delete pod <ex-pod-id> several times randomly on the execution pod. After coming back up several times It re-connected to the workers and continued. I then called stop on the execution where it then properly shutdown and cleaned up all k8s resources.

@sotojn
Copy link
Contributor

sotojn commented Sep 5, 2024

I've went through the four primary assets that we maintain and have listed all the slicers:

elasticsearch-assets:

Slicer name Has recovery
id_reader yes but only on once jobs
spaces_reader yes
elasticsearch_reader yes

file-assets:

Slicer name Has recovery
hdfs_reader yes but only if autorecover is enabled
s3_reader yes but only if autorecover is enabled
file_reader yes but only if autorecover is enabled

I want to look into why autorecover has to be enabled for this to work

standard-assets:

Slicer name Has recovery
data_generator yes

kafka-assets:

Slicer name Has recovery
kafka_reader yes

@godber
Copy link
Member Author

godber commented Sep 5, 2024

I had a realization that we're kind of confusing recovery with restartable. I think we need both. recovery may be useful when it comes to restartable, but lets make sure we consider recovery to be, roughly speaking, gathering uncompleted slices either in the manually restarted job case or in the automatically restarted execution controller case.

I think we will introduce an isRestartable in the slicers first. I feel like we have most of the pieces here, we need to propose a concrete solution and share that with others to get feedback, then try and implement it. To complicate things, I really don't want to introduce this in the kubernetes backend ... which you are now testing against because of problems with the kubernetesv2 backend. We'll have to talk about how exactly to proceed. Basically we've destabilized kubernetesv2 enough that it's no longer useful as a relatively safe alternative implementation until we work out the bugs.

@sotojn
Copy link
Contributor

sotojn commented Sep 5, 2024

I had a realization that we're kind of confusing recovery with restartable. I think we need both. recovery may be useful when it comes to restartable, but lets make sure we consider recovery to be, roughly speaking, gathering uncompleted slices either in the manually restarted job case or in the automatically restarted execution controller case.

I think we will introduce an isRestartable in the slicers first. I feel like we have most of the pieces here, we need to propose a concrete solution and share that with others to get feedback, then try and implement it. To complicate things, I really don't want to introduce this in the kubernetes backend ... which you are now testing against because of problems with the kubernetesv2 backend. We'll have to talk about how exactly to proceed. Basically we've destabilized kubernetesv2 enough that it's no longer useful as a relatively safe alternative implementation until we work out the bugs.

I have resolved most of the issues related to V2 and have all of this logic only applied to V2. It should not changes how V1 operates

@godber
Copy link
Member Author

godber commented Sep 5, 2024

I've just realized that you're going to have to make changes outside of the kubernetes backend so sticking to kuberenetsv2 isn't going to protect us.

godber pushed a commit that referenced this issue Sep 11, 2024
…3740)

This PR makes the following changes:

## New features
- Teraslice running in `KubernetesV2` will have the ability to restart
in a new pod automatically under certain conditions
- These conditions include receiving a `SIGTERM` signal not associated
with a job shutdown, and the slicer being restartable.
- Added new function `isRestartable()` to the base slicer 
- `isRestartable()` will return a `boolean` to tell wether the slicer is
compatible with the restartable feature or not. This allows for an
initial implementation for `kafka` slicers without having to worry about
the complexity of other slicers.

refs: #1007
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement k8s Applies to Teraslice in kubernetes cluster mode only. pkg/teraslice priority:future
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants