From 786f737ca47ab13454457636f805baef2adfd9a0 Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Tue, 13 Aug 2024 14:43:00 +0300 Subject: [PATCH] Push redelivered messages to queue end; Examples fixes --- examples/multi/builder.php | 7 +++---- examples/multi/consumer_bar.php | 12 +++--------- examples/multi/consumer_foo.php | 12 +++--------- examples/multi/messages.php | 30 ++++++------------------------ examples/remote/builder.php | 7 ++----- examples/remote/consumer.php | 13 ++++--------- examples/remote/messages.php | 31 +++++-------------------------- examples/router/builder.php | 11 +++-------- examples/router/consumer_bar.php | 7 ++----- examples/router/consumer_foo.php | 6 ++---- examples/router/messages.php | 30 ++++++------------------------ src/Remote/AMQP/Consumer.php | 14 ++++++++++---- src/Remote/Gateway.php | 5 ++++- 13 files changed, 53 insertions(+), 132 deletions(-) diff --git a/examples/multi/builder.php b/examples/multi/builder.php index 2fcd667..bff1776 100644 --- a/examples/multi/builder.php +++ b/examples/multi/builder.php @@ -5,7 +5,7 @@ use Onliner\CommandBus\Builder; use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; -use Onliner\CommandBus\Remote\Transport; +use Onliner\CommandBus\Remote\Transport\MultiTransport; require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; @@ -13,9 +13,8 @@ $transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); $transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar'); -$transport = new Transport\MultiTransport($transportFoo); +$transport = new MultiTransport($transportFoo); $transport->add('Bar\*', $transportBar); return (new Builder()) - ->use(new RemoteExtension($transport)) -; + ->use(new RemoteExtension($transport)); diff --git a/examples/multi/consumer_bar.php b/examples/multi/consumer_bar.php index d0b15b6..75cc8e2 100644 --- a/examples/multi/consumer_bar.php +++ b/examples/multi/consumer_bar.php @@ -12,13 +12,7 @@ echo sprintf('Hello %s from bar!', $command->name), PHP_EOL; }); -$dispatcher = $builder->build(); - -$transport = Transport::create('amqp://guest:guest@localhost:5673', [ - 'exchange' => 'bar', -]); - -/** @var Consumer $consumer */ +$transport = Transport::create('amqp://guest:guest@localhost:5673'); $consumer = $transport->consume(); -$consumer->listen('#'); -$consumer->run($dispatcher); +$consumer->listen('#', 'bar'); +$consumer->run($builder->build()); diff --git a/examples/multi/consumer_foo.php b/examples/multi/consumer_foo.php index baffaa7..bd35d0f 100644 --- a/examples/multi/consumer_foo.php +++ b/examples/multi/consumer_foo.php @@ -12,13 +12,7 @@ echo sprintf('Hello %s from foo!', $command->name), PHP_EOL; }); -$dispatcher = $builder->build(); - -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', -]); - -/** @var Consumer $consumer */ +$transport = Transport::create('amqp://guest:guest@localhost:5672'); $consumer = $transport->consume(); -$consumer->listen('#'); -$consumer->run($dispatcher); +$consumer->listen('#', 'foo'); +$consumer->run($builder->build()); diff --git a/examples/multi/messages.php b/examples/multi/messages.php index 5ec60ad..5066b9c 100644 --- a/examples/multi/messages.php +++ b/examples/multi/messages.php @@ -5,35 +5,17 @@ namespace Foo { class Hello { - /** - * @var string - */ - public $name; - - /** - * @param string $name - */ - public function __construct(string $name) - { - $this->name = $name; - } + public function __construct( + public string $name, + ) {} } } namespace Bar { class Hello { - /** - * @var string - */ - public $name; - - /** - * @param string $name - */ - public function __construct(string $name) - { - $this->name = $name; - } + public function __construct( + public string $name, + ) {} } } diff --git a/examples/remote/builder.php b/examples/remote/builder.php index 7db93b4..703fa1d 100644 --- a/examples/remote/builder.php +++ b/examples/remote/builder.php @@ -9,10 +9,7 @@ require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', -]); +$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); return (new Builder()) - ->use(new RemoteExtension($transport)) -; + ->use(new RemoteExtension($transport)); diff --git a/examples/remote/consumer.php b/examples/remote/consumer.php index 1080c96..54eb273 100644 --- a/examples/remote/consumer.php +++ b/examples/remote/consumer.php @@ -15,30 +15,25 @@ echo 'CONTENT: ', $command->content, \PHP_EOL; }); -$dispatcher = $builder->build(); - -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', -]); - -/** @var Consumer $consumer */ +$transport = Transport::create('amqp://guest:guest@localhost:5672'); $consumer = $transport->consume(); $pattern = $argv[1] ?? '#'; $priority = isset($argv[2]) ? (int) $argv[2] : 0; if ($priority === 0) { - $consumer->listen($pattern); + $consumer->listen($pattern, 'foo'); } else { $consumer->consume(Queue::create([ 'pattern' => $pattern, + 'bindings' => ['foo'], 'args' => [ Queue::MAX_PRIORITY => $priority, ], ])); } -$consumer->run($dispatcher, [ +$consumer->run($builder->build(), [ Consumer::OPTION_ATTEMPTS => 10, Consumer::OPTION_INTERVAL => 100000, // 100 ms ]); diff --git a/examples/remote/messages.php b/examples/remote/messages.php index e1d3a59..98a3dde 100644 --- a/examples/remote/messages.php +++ b/examples/remote/messages.php @@ -4,30 +4,9 @@ class SendEmail { - /** - * @var string - */ - public $to; - - /** - * @var string - */ - public $subject; - - /** - * @var string - */ - public $content; - - /** - * @param string $to - * @param string $subject - * @param string $content - */ - public function __construct(string $to, string $subject, string $content) - { - $this->to = $to; - $this->subject = $subject; - $this->content = $content; - } + public function __construct( + public string $to, + public string $subject, + public string $content, + ) {} } diff --git a/examples/router/builder.php b/examples/router/builder.php index 43f9912..1c87192 100644 --- a/examples/router/builder.php +++ b/examples/router/builder.php @@ -5,18 +5,13 @@ use Onliner\CommandBus\Builder; use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; -use Onliner\CommandBus\Remote\Transport; require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', - 'routes' => [ - 'Bar\*' => 'bar', - ], +$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo', [ + 'Bar\*' => 'bar', ]); return (new Builder()) - ->use(new RemoteExtension($transport)) -; + ->use(new RemoteExtension($transport)); diff --git a/examples/router/consumer_bar.php b/examples/router/consumer_bar.php index 9be671f..6449bed 100644 --- a/examples/router/consumer_bar.php +++ b/examples/router/consumer_bar.php @@ -13,12 +13,9 @@ }); $dispatcher = $builder->build(); - -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'bar', -]); +$transport = Transport::create('amqp://guest:guest@localhost:5672'); /** @var Consumer $consumer */ $consumer = $transport->consume(); -$consumer->listen('#'); +$consumer->listen('#', 'bar'); $consumer->run($dispatcher); diff --git a/examples/router/consumer_foo.php b/examples/router/consumer_foo.php index baffaa7..38d3534 100644 --- a/examples/router/consumer_foo.php +++ b/examples/router/consumer_foo.php @@ -14,11 +14,9 @@ $dispatcher = $builder->build(); -$transport = Transport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', -]); +$transport = Transport::create('amqp://guest:guest@localhost:5672'); /** @var Consumer $consumer */ $consumer = $transport->consume(); -$consumer->listen('#'); +$consumer->listen('#', 'foo'); $consumer->run($dispatcher); diff --git a/examples/router/messages.php b/examples/router/messages.php index 5ec60ad..5066b9c 100644 --- a/examples/router/messages.php +++ b/examples/router/messages.php @@ -5,35 +5,17 @@ namespace Foo { class Hello { - /** - * @var string - */ - public $name; - - /** - * @param string $name - */ - public function __construct(string $name) - { - $this->name = $name; - } + public function __construct( + public string $name, + ) {} } } namespace Bar { class Hello { - /** - * @var string - */ - public $name; - - /** - * @param string $name - */ - public function __construct(string $name) - { - $this->name = $name; - } + public function __construct( + public string $name, + ) {} } } diff --git a/src/Remote/AMQP/Consumer.php b/src/Remote/AMQP/Consumer.php index 825147f..0cc9b42 100644 --- a/src/Remote/AMQP/Consumer.php +++ b/src/Remote/AMQP/Consumer.php @@ -92,9 +92,9 @@ public function stop(): void private function channel(Dispatcher $dispatcher, array $options): AMQPChannel { $channel = $this->connect($options); - $handler = function (AMQPMessage $message) use ($dispatcher) { + $handler = function (AMQPMessage $message) use ($channel, $dispatcher) { try { - $this->handle($message, $dispatcher); + $this->handle($message, $channel, $dispatcher); } catch (Throwable $error) { $this->logger->error((string) $error); } finally { @@ -143,10 +143,16 @@ private function connect(array $options): AMQPChannel throw new AMQPIOException(); } - private function handle(AMQPMessage $message, Dispatcher $dispatcher): void + private function handle(AMQPMessage $message, AMQPChannel $channel, Dispatcher $dispatcher): void { if ($message->isRedelivered()) { - throw new AMQPIOException('Message redelivered'); + $channel->basic_publish( + new AMQPMessage($message->body, $message->get_properties()), + (string) $message->getExchange(), + (string) $message->getRoutingKey(), + ); + + return; } $envelope = $this->packager->unpack($message); diff --git a/src/Remote/Gateway.php b/src/Remote/Gateway.php index 419a270..a23d856 100644 --- a/src/Remote/Gateway.php +++ b/src/Remote/Gateway.php @@ -5,6 +5,7 @@ namespace Onliner\CommandBus\Remote; use Onliner\CommandBus\Context; +use Onliner\CommandBus\Remote\AMQP\Packager; final class Gateway { @@ -24,6 +25,8 @@ public function receive(Envelope $envelope, Context $context): void { $message = $this->serializer->unserialize($envelope); - $context->execute($message, $envelope->headers); + $context->dispatch($message, array_replace($envelope->headers, [ + Packager::OPTION_LOCAL => true, + ])); } }