diff --git a/examples/dispatch_deferred_after_handle.php b/examples/deferred.php similarity index 80% rename from examples/dispatch_deferred_after_handle.php rename to examples/deferred.php index 09ff2f4..a35a8c2 100644 --- a/examples/dispatch_deferred_after_handle.php +++ b/examples/deferred.php @@ -9,22 +9,16 @@ class Task { - public $task; - - public function __construct(string $task) - { - $this->task = $task; - } + public function __construct( + public string $task, + ) {} } class ResultMessage { - public $message; - - public function __construct(string $message) - { - $this->message = $message; - } + public function __construct( + public string $message, + ) {} } class TaskHandler @@ -61,4 +55,3 @@ public function __invoke(ResultMessage $command, Context $context) * Task: build report processing end * Result: success */ - diff --git a/examples/hello.php b/examples/hello.php index 7525059..ef432bb 100644 --- a/examples/hello.php +++ b/examples/hello.php @@ -8,12 +8,9 @@ class Hello { - public $message; - - public function __construct(string $message) - { - $this->message = $message; - } + public function __construct( + public string $message, + ) {} } $dispatcher = (new Builder()) diff --git a/examples/invoke.php b/examples/invoke.php index a63d6f5..eab2317 100644 --- a/examples/invoke.php +++ b/examples/invoke.php @@ -8,12 +8,9 @@ class Hello { - public $message; - - public function __construct(string $message) - { - $this->message = $message; - } + public function __construct( + public string $message, + ) {} } class HelloHandler diff --git a/examples/remote/builder.php b/examples/remote/bindings/builder.php similarity index 86% rename from examples/remote/builder.php rename to examples/remote/bindings/builder.php index 703fa1d..e4f10cf 100644 --- a/examples/remote/builder.php +++ b/examples/remote/bindings/builder.php @@ -6,7 +6,7 @@ use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; -require __DIR__ . '/../../vendor/autoload.php'; +require __DIR__ . '/../../../vendor/autoload.php'; require __DIR__ . '/messages.php'; $transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); diff --git a/examples/remote/bindings/consumer.php b/examples/remote/bindings/consumer.php new file mode 100644 index 0000000..430eff1 --- /dev/null +++ b/examples/remote/bindings/consumer.php @@ -0,0 +1,43 @@ +handle(SendEmail::class, function (SendEmail $command, Context $context) { + $exchange = $context->get(Packager::OPTION_EXCHANGE); + $routingKey = $context->get(Packager::OPTION_ROUTING_KEY); + + echo sprintf('Received message from %s with routing key %s', $exchange, $routingKey), PHP_EOL; + + // Throw exception to trigger DLE + if ($exchange === 'foo') { + throw new Exception("Something went wrong..."); + } +}); + +$transport = Transport::create('amqp://guest:guest@localhost:5672'); +$transport->declare(Exchange::create(['name' => 'dle'])); +$transport->declare(Exchange::create(['name' => 'foo'])); + +$consumer = $transport->consume(); +$consumer->consume(new Queue('my-queue', [ + 'foo' => '#', + 'dle' => 'sendemail', +], Flags::default(), args: [ + Queue::DEAD_LETTER => 'dle', +])); + +$consumer->run($builder->build(), [ + Consumer::OPTION_ATTEMPTS => 10, + Consumer::OPTION_INTERVAL => 100000, // 100 ms +]); diff --git a/examples/remote/messages.php b/examples/remote/bindings/messages.php similarity index 100% rename from examples/remote/messages.php rename to examples/remote/bindings/messages.php diff --git a/examples/remote/sender.php b/examples/remote/bindings/sender.php similarity index 100% rename from examples/remote/sender.php rename to examples/remote/bindings/sender.php diff --git a/examples/multi/builder.php b/examples/remote/multi/builder.php similarity index 91% rename from examples/multi/builder.php rename to examples/remote/multi/builder.php index bff1776..1ba7acd 100644 --- a/examples/multi/builder.php +++ b/examples/remote/multi/builder.php @@ -7,7 +7,7 @@ use Onliner\CommandBus\Remote\RemoteExtension; use Onliner\CommandBus\Remote\Transport\MultiTransport; -require __DIR__ . '/../../vendor/autoload.php'; +require __DIR__ . '/../../../vendor/autoload.php'; require __DIR__ . '/messages.php'; $transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); diff --git a/examples/multi/consumer_bar.php b/examples/remote/multi/consumer_bar.php similarity index 100% rename from examples/multi/consumer_bar.php rename to examples/remote/multi/consumer_bar.php diff --git a/examples/multi/consumer_foo.php b/examples/remote/multi/consumer_foo.php similarity index 100% rename from examples/multi/consumer_foo.php rename to examples/remote/multi/consumer_foo.php diff --git a/examples/multi/messages.php b/examples/remote/multi/messages.php similarity index 100% rename from examples/multi/messages.php rename to examples/remote/multi/messages.php diff --git a/examples/multi/sender.php b/examples/remote/multi/sender.php similarity index 100% rename from examples/multi/sender.php rename to examples/remote/multi/sender.php diff --git a/examples/router/builder.php b/examples/remote/router/builder.php similarity index 87% rename from examples/router/builder.php rename to examples/remote/router/builder.php index 1c87192..13c99e2 100644 --- a/examples/router/builder.php +++ b/examples/remote/router/builder.php @@ -6,7 +6,7 @@ use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; -require __DIR__ . '/../../vendor/autoload.php'; +require __DIR__ . '/../../../vendor/autoload.php'; require __DIR__ . '/messages.php'; $transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo', [ diff --git a/examples/router/consumer_bar.php b/examples/remote/router/consumer_bar.php similarity index 100% rename from examples/router/consumer_bar.php rename to examples/remote/router/consumer_bar.php diff --git a/examples/router/consumer_foo.php b/examples/remote/router/consumer_foo.php similarity index 100% rename from examples/router/consumer_foo.php rename to examples/remote/router/consumer_foo.php diff --git a/examples/router/messages.php b/examples/remote/router/messages.php similarity index 100% rename from examples/router/messages.php rename to examples/remote/router/messages.php diff --git a/examples/router/sender.php b/examples/remote/router/sender.php similarity index 100% rename from examples/router/sender.php rename to examples/remote/router/sender.php diff --git a/examples/remote/simple/builder.php b/examples/remote/simple/builder.php new file mode 100644 index 0000000..e4f10cf --- /dev/null +++ b/examples/remote/simple/builder.php @@ -0,0 +1,15 @@ +use(new RemoteExtension($transport)); diff --git a/examples/remote/consumer.php b/examples/remote/simple/consumer.php similarity index 100% rename from examples/remote/consumer.php rename to examples/remote/simple/consumer.php diff --git a/examples/remote/simple/messages.php b/examples/remote/simple/messages.php new file mode 100644 index 0000000..98a3dde --- /dev/null +++ b/examples/remote/simple/messages.php @@ -0,0 +1,12 @@ +build(); +$dispatcher->dispatch(new SendEmail('example@mail.com', 'Onliner', 'Hello world!')); diff --git a/src/Middleware/LoggerMiddleware.php b/src/Middleware/LoggerMiddleware.php index d404297..84e8c44 100644 --- a/src/Middleware/LoggerMiddleware.php +++ b/src/Middleware/LoggerMiddleware.php @@ -22,7 +22,10 @@ public function call(object $message, Context $context, callable $next): void try { $next($message, $context); } catch (Throwable $error) { - $this->logger->log($this->level, $error->getMessage()); + $this->logger->log($this->level, $error->getMessage(), [ + 'file' => $error->getFile(), + 'line' => $error->getLine(), + ]); throw $error; } diff --git a/src/Remote/AMQP/Connector.php b/src/Remote/AMQP/Connector.php index 2fa2af7..3852bb3 100644 --- a/src/Remote/AMQP/Connector.php +++ b/src/Remote/AMQP/Connector.php @@ -9,13 +9,13 @@ use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender; -class Connector +final class Connector { private ?AMQPChannel $channel = null; private ?PCNTLHeartbeatSender $heartbeats = null; /** - * @param array> $hosts + * @param array> $hosts * @param array $options */ public function __construct( diff --git a/src/Remote/AMQP/Consumer.php b/src/Remote/AMQP/Consumer.php index 0cc9b42..cb52c60 100644 --- a/src/Remote/AMQP/Consumer.php +++ b/src/Remote/AMQP/Consumer.php @@ -38,7 +38,7 @@ final class Consumer implements ConsumerContract public function __construct( private Connector $connector, private Packager $packager, - LoggerInterface $logger = null, + ?LoggerInterface $logger = null, ) { $this->logger = $logger ?? new NullLogger(); } @@ -46,9 +46,9 @@ public function __construct( /** * @param string|array $bindings */ - public function listen(string $name, array|string $bindings = [], Flags $flags = null): void + public function listen(string $name, array|string $bindings = [], ?Flags $flags = null): void { - $this->consume(new Queue($name, $name, (array) $bindings, $flags ?? Flags::default())); + $this->consume(new Queue($name, (array) $bindings, $flags ?? Flags::default())); } public function consume(Queue $queue): void @@ -95,10 +95,12 @@ private function channel(Dispatcher $dispatcher, array $options): AMQPChannel $handler = function (AMQPMessage $message) use ($channel, $dispatcher) { try { $this->handle($message, $channel, $dispatcher); + + $message->ack(); } catch (Throwable $error) { $this->logger->error((string) $error); - } finally { - $message->ack(); + + $message->nack(); } }; @@ -155,8 +157,6 @@ private function handle(AMQPMessage $message, AMQPChannel $channel, Dispatcher $ return; } - $envelope = $this->packager->unpack($message); - - $dispatcher->dispatch($envelope); + $dispatcher->dispatch($this->packager->unpack($message)); } } diff --git a/src/Remote/AMQP/Flags.php b/src/Remote/AMQP/Flags.php index 45c3223..ebb5b6c 100644 --- a/src/Remote/AMQP/Flags.php +++ b/src/Remote/AMQP/Flags.php @@ -4,7 +4,7 @@ namespace Onliner\CommandBus\Remote\AMQP; -class Flags +final class Flags { public const PASSIVE = 1, diff --git a/src/Remote/AMQP/Headers.php b/src/Remote/AMQP/Headers.php index c2b55c8..0fe0084 100644 --- a/src/Remote/AMQP/Headers.php +++ b/src/Remote/AMQP/Headers.php @@ -4,7 +4,7 @@ namespace Onliner\CommandBus\Remote\AMQP; -class Headers +final class Headers { public const DELAY = 'x-delay', diff --git a/src/Remote/AMQP/Packager.php b/src/Remote/AMQP/Packager.php index a092520..d943a67 100644 --- a/src/Remote/AMQP/Packager.php +++ b/src/Remote/AMQP/Packager.php @@ -10,7 +10,7 @@ use PhpAmqpLib\Wire\AMQPAbstractCollection; use PhpAmqpLib\Wire\AMQPTable; -class Packager +final class Packager { /** * @deprecated diff --git a/src/Remote/AMQP/Publish.php b/src/Remote/AMQP/Publish.php index b195940..144f2d0 100644 --- a/src/Remote/AMQP/Publish.php +++ b/src/Remote/AMQP/Publish.php @@ -8,15 +8,15 @@ final class Publish { public function __construct( public string $exchange, - public string $queue, + public string $route, public string $payload, ) {} /** * @param array $payload */ - public static function create(string $exchange, string $queue, array $payload): self + public static function create(string $exchange, string $route, array $payload): self { - return new self($exchange, $queue, json_encode($payload, JSON_THROW_ON_ERROR)); + return new self($exchange, $route, json_encode($payload, JSON_THROW_ON_ERROR)); } } diff --git a/src/Remote/AMQP/Queue.php b/src/Remote/AMQP/Queue.php index 3a96843..4e47c0f 100644 --- a/src/Remote/AMQP/Queue.php +++ b/src/Remote/AMQP/Queue.php @@ -23,7 +23,6 @@ final class Queue */ public function __construct( public string $name, - public string $pattern, private array $bindings, public Flags $flags, public array $args = [], @@ -35,7 +34,7 @@ public function __construct( public static function create(array $options): self { $pattern = $options['pattern'] ?? '#'; - $name = $options['queue'] ?? $pattern; + $name = $options['name'] ?? $pattern; $bindings = $options['bindings'] ?? []; $args = $options['args'] ?? []; @@ -59,7 +58,11 @@ public static function create(array $options): self throw new InvalidArgumentException('Queue arguments must be an array'); } - return new self($name, $pattern, $bindings, Flags::compute($options), $args); + if (array_is_list($bindings)) { + $bindings = array_fill_keys($bindings, $pattern); + } + + return new self($name, $bindings, Flags::compute($options), $args); } public function is(int $flag): bool @@ -76,11 +79,11 @@ public function consume(AMQPChannel $channel, callable $handler): void $this->is(Flags::EXCLUSIVE), $this->is(Flags::DELETE), $this->is(Flags::NO_WAIT), - new AMQPTable($this->args) + new AMQPTable($this->args), ); - foreach ($this->bindings as $binding) { - $channel->queue_bind($this->name, $binding, $this->pattern); + foreach ($this->bindings as $exchange => $pattern) { + $channel->queue_bind($this->name, $exchange, $pattern); } $channel->basic_consume( diff --git a/src/Remote/AMQP/Transport.php b/src/Remote/AMQP/Transport.php index 5f94e9d..732ed27 100644 --- a/src/Remote/AMQP/Transport.php +++ b/src/Remote/AMQP/Transport.php @@ -53,15 +53,8 @@ public function consume(): Consumer return new Consumer($this->connector, $this->packager, $this->logger); } - /** - * @param Exchange|array $exchange - */ - public function declare(Exchange|array $exchange): void + public function declare(Exchange $exchange): void { - if (is_array($exchange)) { - $exchange = Exchange::create($exchange); - } - $exchange->declare($this->connector->connect()); } @@ -70,7 +63,7 @@ public function setup(Builder $builder): void $builder->handle(Publish::class, function (Publish $message, Context $context) { $this->publish( new Envelope(Publish::class, $message->payload, $context->all()), - new Route($message->exchange, $message->queue), + new Route($message->exchange, $message->route), ); }); } diff --git a/src/Remote/RemoteExtension.php b/src/Remote/RemoteExtension.php index db4859d..bf62e9b 100644 --- a/src/Remote/RemoteExtension.php +++ b/src/Remote/RemoteExtension.php @@ -19,7 +19,7 @@ final class RemoteExtension implements Extension Envelope::class, ]; - public function __construct(Transport $transport = null, Serializer $serializer = null) + public function __construct(?Transport $transport = null, ?Serializer $serializer = null) { $this->transport = $transport ?? new Transport\MemoryTransport(); $this->serializer = $serializer ?? new Serializer\NativeSerializer(); diff --git a/src/Remote/Transport/MultiTransport.php b/src/Remote/Transport/MultiTransport.php index 089ab1f..54309cb 100644 --- a/src/Remote/Transport/MultiTransport.php +++ b/src/Remote/Transport/MultiTransport.php @@ -8,7 +8,7 @@ use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Remote\Transport; -class MultiTransport implements Transport +final class MultiTransport implements Transport { /** * @var array diff --git a/src/Retry/RetryExtension.php b/src/Retry/RetryExtension.php index 8ddca38..aabb601 100644 --- a/src/Retry/RetryExtension.php +++ b/src/Retry/RetryExtension.php @@ -16,7 +16,7 @@ final class RetryExtension implements Extension */ private array $policies = []; - public function __construct(Policy $default = null) + public function __construct(?Policy $default = null) { $this->default = $default ?? new Policy\ThrowPolicy(); }