Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker #21

Open
5 tasks
pihme opened this issue Aug 10, 2022 · 0 comments
Open
5 tasks

Worker #21

pihme opened this issue Aug 10, 2022 · 0 comments

Comments

@pihme
Copy link
Collaborator

pihme commented Aug 10, 2022

Feel free to break down into smaller issues, as this s a big task.

  • Implement JobClient #37
  • Implement conveniance wrapper for ActivatedRecord
  • Register and stop worker
  • Implement Job Handler pool
  • Activate jobs and forward them to job handler

Terminology

  • job handler is a function that executes one job, it can complete the job or fail the job, it can also set variables of the job or update the retries, throw errors
  • worker is a component that manages a pool of job handler instances, activates jobs on Zeebe side, and assigns those jobs to job handler instances, and makes sure to get a steady flow of jobs

Job Handler

Has an interface like this (pseudo code):

##Job Handler

handle(client: JobClient, job: ActivatedJob): Result<_, Error>  // error will be picked up by worker which then fails the job

Job Client

### Minimal
completeJob(jobKey: int, variables: map<String, Object>) 
failJob(jobKey: int, retrries: int, errorMessage: Optional<String>, retryBackOff: Optional<int>) 

### Full Featured
setVariables(jobKey: int, variables: map<String, Object>) 
updateRetries(jobKey: int, retrries: int) 
throwError(jobKey: int, errorCode: String, errorMessage: Optional<String>) 

ActivatedJob

Wrapper around proto message ActivatedJob with some convenience methods for variable handling

### Minimal
getJobKey() : Int
getVariables(): Map<String, Object>

Full featured: https://github.com/camunda/zeebe/blob/main/clients/java/src/main/java/io/camunda/zeebe/client/api/response/ActivatedJob.java

(Basically, just lots of more metadata about the job)

Worker

Lifecycle of a worker:

  worker = zeebeClient.registerWorker( MyJobHandler::handleJob, "my-job-type", 10 /* Max Jobs to Activate */ )

  //on shutdown
  worker.close()

optional registration options:

  • name of the worker
  • Poll interval
  • request timeout
  • backOff supplier
  • variables which are to be fetched - then only these variables will be fetched instead of all variables

See JavaDoc here: https://github.com/camunda/zeebe/blob/main/clients/java/src/main/java/io/camunda/zeebe/client/api/worker/JobWorkerBuilderStep1.java

Inner workings of a worker:

  jobHandlerPool = createPool(maxJobsToActivate);

  schedule(pollInterval, PollTask::poll);
...

PollTask {
   maxJobsToActivate: int
   currentRunningJobs: int
   
  poll() {
     activatedJobs = zeebeClient.activateJobs(jobType, maxJobsToActivate - currentRunningJobs)
    
    for job in activatedJobs do async {
       currentRinningJobs++
       jobHandler = jobHandlerPool.take()
      
       result = jobHandler.handle(jobClient, job)
       match result {
         case Error e : zeebeClient.failJob(job.key, job.retries -1, toErrorMessage(e))
         case _ : //do nothing
       }
      
       jobHandlerPool.return(jobHandler)
       currentRunningJobs--
    } 
   }
}  
@pihme pihme self-assigned this Aug 11, 2022
@pihme pihme removed their assignment Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant