Skip to content

Commit

Permalink
Update command-bus (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser authored Aug 23, 2024
1 parent 29b6bc1 commit c6670d7
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 125 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"ext-pcntl": "*",
"illuminate/support": ">=6.0",
"illuminate/console": ">=6.0",
"onliner/command-bus": "^1.0"
"illuminate/redis": ">=6.0",
"onliner/command-bus": "^1.1"
},
"autoload": {
"psr-4": {
Expand Down
18 changes: 15 additions & 3 deletions config/commandbus.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@
// 'key' => 'value',
],
],
// 'amqp' => [
// 'url' => 'amqp://guest:guest@localhost:5672/vhost',
// 'options' => [
// 'exchange' => 'commands',
// 'durable' => true,
// 'type' => 'topic',
// ],
// ],
],
'routes' => [
'*' => 'memory',
],
// 'routes' => [
// '*' => 'memory',
// ],
],
'consumer' => [
'options' => [
Expand All @@ -35,7 +43,11 @@
],
'queues' => [
// 'pattern' => [
// 'name' => 'my-queue',
// 'durable' => true,
// 'bindings' => [
// 'exchange' => 'pattern',
// ],
// 'args' => [
// Queue::MAX_PRIORITY => 3,
// ],
Expand Down
31 changes: 18 additions & 13 deletions src/Console/CommandBusProcessCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Config;
use Onliner\CommandBus\Dispatcher;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Queue;
use Onliner\CommandBus\Remote\Consumer;
use Onliner\CommandBus\Remote\Transport;
Expand Down Expand Up @@ -38,15 +37,10 @@ protected function getOptions(): array
{
return [
['user', 'u', InputOption::VALUE_OPTIONAL, 'User for which run workers'],
['exchange', 'e', InputOption::VALUE_OPTIONAL, 'Exchange to bind consumer queue'],
];
}

/**
* @param Dispatcher $dispatcher
* @param Transport $transport
*
* @return int
*/
public function handle(Dispatcher $dispatcher, Transport $transport): int
{
$this->setupUser();
Expand All @@ -62,16 +56,17 @@ public function handle(Dispatcher $dispatcher, Transport $transport): int

$options['pattern'] = $pattern;

if (!isset($options['bindings'])) {
$options['bindings'] = $this->option('exchange') ?: $this->getDefaultExchange();
}

$this->consumer = $transport->consume();
$this->consumer->consume(Queue::create($options));
$this->consumer->run($dispatcher, $config['options'] ?? []);

return 0;
}

/**
* @return void
*/
private function setupUser(): void
{
$user = $this->option('user');
Expand All @@ -86,9 +81,6 @@ private function setupUser(): void
posix_setuid($data['uid']);
}

/**
* @return void
*/
private function subscribeSignals(): void
{
pcntl_async_signals(true);
Expand All @@ -99,4 +91,17 @@ private function subscribeSignals(): void
});
}
}

private function getDefaultExchange(): string
{
$config = Config::get('commandbus.remote.transport');
$default = $config['default'] ?? null;
$connections = $config['connections'] ?? [];

if (empty($default) || !isset($connections[$default])) {
$default = array_key_first($connections);
}

return $connections[$default]['options']['exchange'];
}
}
6 changes: 0 additions & 6 deletions src/Factory/SerializerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ class SerializerFactory
{
public const DEFAULT = 'native';

/**
* @param string $type
* @param array $options
*
* @return Serializer
*/
public static function create(string $type, array $options = []): Serializer
{
return match ($type) {
Expand Down
43 changes: 20 additions & 23 deletions src/Factory/TransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,23 @@

use Illuminate\Contracts\Container\Container;
use InvalidArgumentException;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP;
use Onliner\CommandBus\Remote\Transport;
use Onliner\Laravel\CommandBus\Exception;

class TransportFactory
{
public const DEFAULT = 'memory://memory';

/**
* @param Container $container
*/
public function __construct(private Container $container)
{
}
public function __construct(
private Container $container,
) {}

/**
* @return Transport
*/
public function default(): Transport
{
return $this->createFromUrl(self::DEFAULT);
}

/**
* @param string $key
* @param string|array $config
*
* @return Transport
*/
public function create(string $key, string|array $config): Transport
{
if (is_array($config) && array_key_exists('url', $config)) {
Expand All @@ -58,18 +46,27 @@ public function create(string $key, string|array $config): Transport
throw new InvalidArgumentException(sprintf('Invalid transport "%s" configuration.', $key));
}

/**
* @param string $url
* @param array $options
*
* @return Transport
*/
private function createFromUrl(string $url, array $options = []): Transport
{
return match (parse_url($url, PHP_URL_SCHEME)) {
'amqp' => AMQPTransport::create($url, $options),
'amqp' => $this->createAmqpTransport($url, $options),
'memory' => new Transport\MemoryTransport(),
default => throw new Exception\BadTransportException($url),
};
}

private function createAmqpTransport(string $url, array $options): AMQP\Transport
{
$query = [];

parse_str((string) parse_url($url, PHP_URL_QUERY), $query);

$options = array_replace($query, $options);

if (!isset($options['exchange'])) {
throw new InvalidArgumentException('AMQP exchange is not specified');
}

return AMQP\Transport::create($url, $options['exchange'], $options['routes'] ?? []);
}
}
40 changes: 40 additions & 0 deletions src/Middlewares/RedisReconnectMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Onliner\Laravel\CommandBus\Middlewares;

use Exception;
use Illuminate\Redis\Connections\Connection;
use Illuminate\Redis\RedisManager;
use Onliner\CommandBus\Context;
use Onliner\CommandBus\Middleware;

class RedisReconnectMiddleware implements Middleware
{
public function __construct(
private RedisManager $redis,
) {}

public function call(object $message, Context $context, callable $next): void
{
$connections = $this->redis->connections() ?: [];

foreach ($connections as $name => $connection) {
if (!$this->ping($connection)) {
$this->redis->purge($name);
}
}

$next($message, $context);
}

private function ping(Connection $connection, string $msg = 'ok'): bool
{
try {
return $connection->ping($msg) === $msg;
} catch (Exception) {
return false;
}
}
}
Loading

0 comments on commit c6670d7

Please sign in to comment.