Skip to content

Commit

Permalink
Merge pull request #15 from utopia-php/integrate-workers
Browse files Browse the repository at this point in the history
Integrate workers
  • Loading branch information
christyjacob4 authored Oct 16, 2023
2 parents 6e3d6db + 6c0a1b1 commit 229a7b1
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 17 deletions.
28 changes: 14 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/Platform/Action.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ abstract class Action

public const TYPE_OPTIONS = 'Options';

public const TYPE_WORKER_START = 'WorkerStart';

protected ?string $desc = null;

protected array $groups = [];
Expand Down
95 changes: 92 additions & 3 deletions src/Platform/Platform.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Exception;
use Utopia\App;
use Utopia\CLI\CLI;
use Utopia\Queue\Adapter\Swoole;
use Utopia\Queue\Server;
use Utopia\Route;

abstract class Platform
Expand All @@ -14,16 +16,19 @@ abstract class Platform
Service::TYPE_CLI => [],
Service::TYPE_HTTP => [],
Service::TYPE_GRAPHQL => [],
Service::TYPE_WORKER => [],
];

protected CLI $cli;

protected Server $worker;

/**
* Initialize Application
*
* @return void
*/
public function init(string $type): void
public function init(string $type, array $params = []): void
{
switch ($type) {
case Service::TYPE_HTTP:
Expand All @@ -35,6 +40,9 @@ public function init(string $type): void
case Service::TYPE_GRAPHQL:
$this->initGraphQL();
break;
case Service::TYPE_WORKER:
$this->initWorker($params);
break;
default:
throw new Exception('Please provide which type of initialization you want to carry out.');
}
Expand Down Expand Up @@ -151,6 +159,69 @@ protected function initCLI(): void
}
}

/**
* Init worker Services
*
* @param array $params
* @return void
*/
protected function initWorker(array $params): void
{
$connection = $params['connection'] ?? null;
$workersNum = $params['workersNum'] ?? 0;
$workerName = $params['workerName'] ?? null;
$queueName = $params['queueName'] ?? 'v1-'.$workerName;
$adapter = new Swoole($connection, $workersNum, $queueName);
$this->worker ??= new Server($adapter);
foreach ($this->services[Service::TYPE_WORKER] as $service) {
foreach ($service->getActions() as $key => $action) {
if (! str_contains(strtolower($key), $workerName)) {
continue;
}

switch ($action->getType()) {
case Action::TYPE_INIT:
$hook = $this->worker->init();
break;
case Action::TYPE_ERROR:
$hook = $this->worker->error();
break;
case Action::TYPE_SHUTDOWN:
$hook = $this->worker->shutdown();
break;
case Action::TYPE_WORKER_START:
$hook = $this->worker->workerStart();
break;
case Action::TYPE_DEFAULT:
default:
$hook = $this->worker->job();
break;
}
$hook
->groups($action->getGroups())
->desc($action->getDesc() ?? '');

foreach ($action->getOptions() as $key => $option) {
switch ($option['type']) {
case 'param':
$key = substr($key, stripos($key, ':') + 1);
$hook->param($key, $option['default'], $option['validator'], $option['description'], $option['optional'], $option['injections']);
break;
case 'injection':
$hook->inject($option['name']);
break;
}
}

foreach ($action->getLabels() as $key => $label) {
$hook->label($key, $label);
}

$hook->action($action->getCallback());
}
}
}

/**
* Initialize GraphQL Services
*
Expand Down Expand Up @@ -192,9 +263,9 @@ public function removeService(string $key): Platform
* Get Service
*
* @param string $key
* @return Service
* @return Service|null
*/
public function getService(string $key): Service
public function getService(string $key): ?Service
{
if (empty($this->services['all'][$key])) {
throw new Exception('Service '.$key.' not found');
Expand Down Expand Up @@ -230,4 +301,22 @@ public function setCli(CLI $cli): self

return $this;
}

/**
* Get the value of worker
*/
public function getWorker(): Server
{
return $this->worker;
}

/**
* Set the value of worker
*/
public function setWorker(Server $worker): self
{
$this->worker = $worker;

return $this;
}
}
2 changes: 2 additions & 0 deletions src/Platform/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ abstract class Service

public const TYPE_CLI = 'CLI';

public const TYPE_WORKER = 'Worker';

protected array $actions;

protected string $type;
Expand Down

0 comments on commit 229a7b1

Please sign in to comment.