From afcfef8f18e291f6d397b17d11ab3c6773b01b4e Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Tue, 13 Aug 2024 16:37:04 +0200 Subject: [PATCH] Added publish handler for raw messages (#31) --- src/Remote/AMQP/Publish.php | 22 ++++++++++++++++++++++ src/Remote/AMQP/Transport.php | 21 +++++++++++++++++++-- src/Remote/RemoteExtension.php | 4 ++++ 3 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 src/Remote/AMQP/Publish.php diff --git a/src/Remote/AMQP/Publish.php b/src/Remote/AMQP/Publish.php new file mode 100644 index 0000000..b195940 --- /dev/null +++ b/src/Remote/AMQP/Publish.php @@ -0,0 +1,22 @@ + $payload + */ + public static function create(string $exchange, string $queue, array $payload): self + { + return new self($exchange, $queue, json_encode($payload, JSON_THROW_ON_ERROR)); + } +} diff --git a/src/Remote/AMQP/Transport.php b/src/Remote/AMQP/Transport.php index c31e881..5f94e9d 100644 --- a/src/Remote/AMQP/Transport.php +++ b/src/Remote/AMQP/Transport.php @@ -4,11 +4,14 @@ namespace Onliner\CommandBus\Remote\AMQP; +use Onliner\CommandBus\Builder; +use Onliner\CommandBus\Context; +use Onliner\CommandBus\Extension; use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Remote\Transport as TransportContract; use Psr\Log\LoggerInterface; -final class Transport implements TransportContract +final class Transport implements TransportContract, Extension { public function __construct( private Connector $connector, @@ -28,9 +31,13 @@ public static function create(string $dsn, string $exchange = '', array $routes } public function send(Envelope $envelope): void + { + $this->publish($envelope, $this->router->match($envelope)); + } + + public function publish(Envelope $envelope, Route $route): void { $message = $this->packager->pack($envelope); - $route = $this->router->match($envelope); $channel = $this->connector->connect(); $channel->basic_publish( @@ -57,4 +64,14 @@ public function declare(Exchange|array $exchange): void $exchange->declare($this->connector->connect()); } + + public function setup(Builder $builder): void + { + $builder->handle(Publish::class, function (Publish $message, Context $context) { + $this->publish( + new Envelope(Publish::class, $message->payload, $context->all()), + new Route($message->exchange, $message->queue), + ); + }); + } } diff --git a/src/Remote/RemoteExtension.php b/src/Remote/RemoteExtension.php index a25c197..db4859d 100644 --- a/src/Remote/RemoteExtension.php +++ b/src/Remote/RemoteExtension.php @@ -32,6 +32,10 @@ public function local(string ...$local): void public function setup(Builder $builder): void { + if ($this->transport instanceof Extension) { + $this->transport->setup($builder); + } + $gateway = new Gateway($this->transport, $this->serializer); $builder->middleware(new RemoteMiddleware($gateway, $this->local));