From 3ab416aee96ac0c15a195559c31b751e39aa79b2 Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Tue, 13 Aug 2024 14:29:54 +0300 Subject: [PATCH] Added mandatory support; Simplify AMQP transport; Allow queue multiple bindings --- examples/multi/builder.php | 10 +-- examples/multi/consumer_bar.php | 8 +-- examples/multi/consumer_foo.php | 8 +-- examples/remote/builder.php | 4 +- examples/remote/consumer.php | 12 ++-- examples/router/builder.php | 4 +- examples/router/consumer_bar.php | 8 +-- examples/router/consumer_foo.php | 8 +-- src/Context.php | 17 ----- src/Remote/AMQP/AMQPTransport.php | 65 ----------------- .../AMQP/{AMQPConsumer.php => Consumer.php} | 46 ++++-------- src/Remote/AMQP/Exchange.php | 23 ++---- src/Remote/AMQP/{AMQPFlags.php => Flags.php} | 2 +- src/Remote/AMQP/Headers.php | 3 +- src/Remote/AMQP/Packager.php | 70 +++++++++++++++++++ src/Remote/AMQP/Queue.php | 39 +++++++---- src/Remote/AMQP/Route.php | 1 + src/Remote/AMQP/Router.php | 2 +- src/Remote/AMQP/SimpleRouter.php | 13 ++-- src/Remote/AMQP/Transport.php | 48 +++++++++++++ src/Remote/RemoteException.php | 9 +++ src/Remote/RemoteMiddleware.php | 8 +-- ...MQPTransportTest.php => TransportTest.php} | 12 ++-- 23 files changed, 223 insertions(+), 197 deletions(-) delete mode 100644 src/Remote/AMQP/AMQPTransport.php rename src/Remote/AMQP/{AMQPConsumer.php => Consumer.php} (71%) rename src/Remote/AMQP/{AMQPFlags.php => Flags.php} (98%) create mode 100644 src/Remote/AMQP/Packager.php create mode 100644 src/Remote/AMQP/Transport.php create mode 100644 src/Remote/RemoteException.php rename tests/Remote/AMQP/{AMQPTransportTest.php => TransportTest.php} (83%) diff --git a/examples/multi/builder.php b/examples/multi/builder.php index 752ae1d..2fcd667 100644 --- a/examples/multi/builder.php +++ b/examples/multi/builder.php @@ -3,19 +3,15 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; +use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; use Onliner\CommandBus\Remote\Transport; require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; -$transportFoo = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ - 'exchange' => 'foo', -]); -$transportBar = AMQPTransport::create('amqp://guest:guest@localhost:5673', [ - 'exchange' => 'bar', -]); +$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); +$transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar'); $transport = new Transport\MultiTransport($transportFoo); $transport->add('Bar\*', $transportBar); diff --git a/examples/multi/consumer_bar.php b/examples/multi/consumer_bar.php index f96838d..d0b15b6 100644 --- a/examples/multi/consumer_bar.php +++ b/examples/multi/consumer_bar.php @@ -3,8 +3,8 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; -use Onliner\CommandBus\Remote\AMQP\AMQPConsumer; +use Onliner\CommandBus\Remote\AMQP\Transport; +use Onliner\CommandBus\Remote\AMQP\Consumer; /** @var Builder $builder */ $builder = require __DIR__ . '/builder.php'; @@ -14,11 +14,11 @@ $dispatcher = $builder->build(); -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5673', [ +$transport = Transport::create('amqp://guest:guest@localhost:5673', [ 'exchange' => 'bar', ]); -/** @var AMQPConsumer $consumer */ +/** @var Consumer $consumer */ $consumer = $transport->consume(); $consumer->listen('#'); $consumer->run($dispatcher); diff --git a/examples/multi/consumer_foo.php b/examples/multi/consumer_foo.php index 938c9fa..baffaa7 100644 --- a/examples/multi/consumer_foo.php +++ b/examples/multi/consumer_foo.php @@ -3,8 +3,8 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; -use Onliner\CommandBus\Remote\AMQP\AMQPConsumer; +use Onliner\CommandBus\Remote\AMQP\Transport; +use Onliner\CommandBus\Remote\AMQP\Consumer; /** @var Builder $builder */ $builder = require __DIR__ . '/builder.php'; @@ -14,11 +14,11 @@ $dispatcher = $builder->build(); -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'foo', ]); -/** @var AMQPConsumer $consumer */ +/** @var Consumer $consumer */ $consumer = $transport->consume(); $consumer->listen('#'); $consumer->run($dispatcher); diff --git a/examples/remote/builder.php b/examples/remote/builder.php index d64bdde..7db93b4 100644 --- a/examples/remote/builder.php +++ b/examples/remote/builder.php @@ -3,13 +3,13 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; +use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'foo', ]); diff --git a/examples/remote/consumer.php b/examples/remote/consumer.php index 18357d2..1080c96 100644 --- a/examples/remote/consumer.php +++ b/examples/remote/consumer.php @@ -3,8 +3,8 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; -use Onliner\CommandBus\Remote\AMQP\AMQPConsumer; +use Onliner\CommandBus\Remote\AMQP\Transport; +use Onliner\CommandBus\Remote\AMQP\Consumer; use Onliner\CommandBus\Remote\AMQP\Queue; /** @var Builder $builder */ @@ -17,11 +17,11 @@ $dispatcher = $builder->build(); -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'foo', ]); -/** @var AMQPConsumer $consumer */ +/** @var Consumer $consumer */ $consumer = $transport->consume(); $pattern = $argv[1] ?? '#'; @@ -39,6 +39,6 @@ } $consumer->run($dispatcher, [ - AMQPConsumer::OPTION_ATTEMPTS => 10, - AMQPConsumer::OPTION_INTERVAL => 100000, // 100 ms + Consumer::OPTION_ATTEMPTS => 10, + Consumer::OPTION_INTERVAL => 100000, // 100 ms ]); diff --git a/examples/router/builder.php b/examples/router/builder.php index 4a027a4..43f9912 100644 --- a/examples/router/builder.php +++ b/examples/router/builder.php @@ -3,14 +3,14 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; +use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\RemoteExtension; use Onliner\CommandBus\Remote\Transport; require __DIR__ . '/../../vendor/autoload.php'; require __DIR__ . '/messages.php'; -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'foo', 'routes' => [ 'Bar\*' => 'bar', diff --git a/examples/router/consumer_bar.php b/examples/router/consumer_bar.php index 72d6bb1..9be671f 100644 --- a/examples/router/consumer_bar.php +++ b/examples/router/consumer_bar.php @@ -3,8 +3,8 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; -use Onliner\CommandBus\Remote\AMQP\AMQPConsumer; +use Onliner\CommandBus\Remote\AMQP\Transport; +use Onliner\CommandBus\Remote\AMQP\Consumer; /** @var Builder $builder */ $builder = require __DIR__ . '/builder.php'; @@ -14,11 +14,11 @@ $dispatcher = $builder->build(); -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'bar', ]); -/** @var AMQPConsumer $consumer */ +/** @var Consumer $consumer */ $consumer = $transport->consume(); $consumer->listen('#'); $consumer->run($dispatcher); diff --git a/examples/router/consumer_foo.php b/examples/router/consumer_foo.php index 938c9fa..baffaa7 100644 --- a/examples/router/consumer_foo.php +++ b/examples/router/consumer_foo.php @@ -3,8 +3,8 @@ declare(strict_types=1); use Onliner\CommandBus\Builder; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; -use Onliner\CommandBus\Remote\AMQP\AMQPConsumer; +use Onliner\CommandBus\Remote\AMQP\Transport; +use Onliner\CommandBus\Remote\AMQP\Consumer; /** @var Builder $builder */ $builder = require __DIR__ . '/builder.php'; @@ -14,11 +14,11 @@ $dispatcher = $builder->build(); -$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [ +$transport = Transport::create('amqp://guest:guest@localhost:5672', [ 'exchange' => 'foo', ]); -/** @var AMQPConsumer $consumer */ +/** @var Consumer $consumer */ $consumer = $transport->consume(); $consumer->listen('#'); $consumer->run($dispatcher); diff --git a/src/Context.php b/src/Context.php index fbda65e..7821271 100644 --- a/src/Context.php +++ b/src/Context.php @@ -8,8 +8,6 @@ final class Context { - private const OPTION_LOCAL = 'local'; - /** * @param array $options */ @@ -27,16 +25,6 @@ public function dispatch(object $message, array $options = []): void $this->dispatcher->dispatch($message, $options); } - /** - * @param array $options - */ - public function execute(object $message, array $options = []): void - { - $this->dispatcher->dispatch($message, array_replace($options, [ - self::OPTION_LOCAL => true, - ])); - } - /** * @param array $options */ @@ -78,9 +66,4 @@ public function del(string $option): self return $this; } - - public function isLocal(): bool - { - return $this->has(self::OPTION_LOCAL); - } } diff --git a/src/Remote/AMQP/AMQPTransport.php b/src/Remote/AMQP/AMQPTransport.php deleted file mode 100644 index c3a75fa..0000000 --- a/src/Remote/AMQP/AMQPTransport.php +++ /dev/null @@ -1,65 +0,0 @@ - AMQPMessage::DELIVERY_MODE_PERSISTENT, - ]; - - public function __construct( - private Connector $connector, - private Exchange $exchange, - private Router $router, - private ?LoggerInterface $logger = null, - ) {} - - /** - * @param array $options - */ - public static function create(string $dsn, array $options = []): self - { - if (!is_array($routes = $options['routes'] ?? false)) { - $routes = []; - } - - $router = new SimpleRouter(array_filter($routes, 'is_string')); - - return new self(Connector::create($dsn), Exchange::create($options), $router); - } - - public function send(Envelope $envelope): void - { - $headers = $envelope->headers + [ - Exchange::HEADER_MESSAGE_TYPE => $envelope->class, - ]; - - $message = new AMQPMessage($envelope->payload, self::MESSAGE_PROPERTIES); - $message->set('application_headers', new AMQPTable($headers)); - - $route = $this->router->match($envelope, $this->exchange); - - // TODO: add support for `mandatory` and `immediate` options - $channel = $this->connector->connect(); - $channel->basic_publish( - $message, - $route->exchange, - $route->name, - ); - } - - public function consume(): Consumer - { - return new AMQPConsumer($this->connector, $this->exchange, $this->logger); - } -} diff --git a/src/Remote/AMQP/AMQPConsumer.php b/src/Remote/AMQP/Consumer.php similarity index 71% rename from src/Remote/AMQP/AMQPConsumer.php rename to src/Remote/AMQP/Consumer.php index 873619c..825147f 100644 --- a/src/Remote/AMQP/AMQPConsumer.php +++ b/src/Remote/AMQP/Consumer.php @@ -5,18 +5,16 @@ namespace Onliner\CommandBus\Remote\AMQP; use Onliner\CommandBus\Dispatcher; -use Onliner\CommandBus\Remote\Consumer; -use Onliner\CommandBus\Remote\Envelope; +use Onliner\CommandBus\Remote\Consumer as ConsumerContract; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPIOException; use PhpAmqpLib\Message\AMQPMessage; -use PhpAmqpLib\Wire\AMQPAbstractCollection; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Throwable; -final class AMQPConsumer implements Consumer +final class Consumer implements ConsumerContract { public const OPTION_ATTEMPTS = 'attempts', @@ -39,15 +37,18 @@ final class AMQPConsumer implements Consumer public function __construct( private Connector $connector, - private Exchange $exchange, + private Packager $packager, LoggerInterface $logger = null, ) { $this->logger = $logger ?? new NullLogger(); } - public function listen(string $pattern): void + /** + * @param string|array $bindings + */ + public function listen(string $name, array|string $bindings = [], Flags $flags = null): void { - $this->consume(new Queue($pattern, $pattern, $this->exchange->flags)); + $this->consume(new Queue($name, $name, (array) $bindings, $flags ?? Flags::default())); } public function consume(Queue $queue): void @@ -101,10 +102,8 @@ private function channel(Dispatcher $dispatcher, array $options): AMQPChannel } }; - $this->exchange->declare($channel); - foreach ($this->queues as $queue) { - $queue->consume($channel, $this->exchange, $handler); + $queue->consume($channel, $handler); } return $channel; @@ -146,31 +145,12 @@ private function connect(array $options): AMQPChannel private function handle(AMQPMessage $message, Dispatcher $dispatcher): void { - $headers = $message->get('application_headers'); - - if (!$headers instanceof AMQPAbstractCollection) { - $this->logger->warning('Message headers not found.'); - - return; - } - - $headers = array_replace($headers->getNativeData(), [ - Exchange::HEADER_EXCHANGE => $message->getExchange(), - Exchange::HEADER_REDELIVERED => $message->isRedelivered(), - Exchange::HEADER_ROUTING_KEY => $message->getRoutingKey(), - Exchange::HEADER_CONSUMER_TAG => $message->getConsumerTag(), - Exchange::HEADER_DELIVERY_TAG => $message->getDeliveryTag(), - ]); - - if (!isset($headers[Exchange::HEADER_MESSAGE_TYPE])) { - $this->logger->warning(sprintf('Header "%s" not found in message.', Exchange::HEADER_MESSAGE_TYPE)); - - return; + if ($message->isRedelivered()) { + throw new AMQPIOException('Message redelivered'); } - /** @var class-string $class */ - $class = $headers[Exchange::HEADER_MESSAGE_TYPE]; + $envelope = $this->packager->unpack($message); - $dispatcher->dispatch(new Envelope($class, $message->getBody(), $headers)); + $dispatcher->dispatch($envelope); } } diff --git a/src/Remote/AMQP/Exchange.php b/src/Remote/AMQP/Exchange.php index 24cd14f..7f135d2 100644 --- a/src/Remote/AMQP/Exchange.php +++ b/src/Remote/AMQP/Exchange.php @@ -18,22 +18,13 @@ final class Exchange TYPE_DELAYED = 'x-delayed-message' ; - public const - HEADER_EXCHANGE = 'exchange', - HEADER_REDELIVERED = 'redelivered', - HEADER_ROUTING_KEY = 'routing_key', - HEADER_CONSUMER_TAG = 'consumer_tag', - HEADER_DELIVERY_TAG = 'delivery_tag', - HEADER_MESSAGE_TYPE = 'x-message-type' - ; - /** * @param array $args */ public function __construct( public string $name, public string $type, - public AMQPFlags $flags, + public Flags $flags, public array $args = [], ) {} @@ -63,7 +54,7 @@ public static function create(array $options): self $args['x-delayed-type'] = self::TYPE_TOPIC; } - return new self($name, $type, AMQPFlags::compute($options), $args); + return new self($name, $type, Flags::compute($options), $args); } public function is(int $flag): bool @@ -76,11 +67,11 @@ public function declare(AMQPChannel $channel): void $channel->exchange_declare( $this->name, $this->type, - $this->flags->is(AMQPFlags::PASSIVE), - $this->flags->is(AMQPFlags::DURABLE), - $this->flags->is(AMQPFlags::DELETE), - $this->flags->is(AMQPFlags::INTERNAL), - $this->flags->is(AMQPFlags::NO_WAIT), + $this->flags->is(Flags::PASSIVE), + $this->flags->is(Flags::DURABLE), + $this->flags->is(Flags::DELETE), + $this->flags->is(Flags::INTERNAL), + $this->flags->is(Flags::NO_WAIT), new AMQPTable($this->args) ); } diff --git a/src/Remote/AMQP/AMQPFlags.php b/src/Remote/AMQP/Flags.php similarity index 98% rename from src/Remote/AMQP/AMQPFlags.php rename to src/Remote/AMQP/Flags.php index 4d500be..45c3223 100644 --- a/src/Remote/AMQP/AMQPFlags.php +++ b/src/Remote/AMQP/Flags.php @@ -4,7 +4,7 @@ namespace Onliner\CommandBus\Remote\AMQP; -class AMQPFlags +class Flags { public const PASSIVE = 1, diff --git a/src/Remote/AMQP/Headers.php b/src/Remote/AMQP/Headers.php index d05aa3e..c2b55c8 100644 --- a/src/Remote/AMQP/Headers.php +++ b/src/Remote/AMQP/Headers.php @@ -8,6 +8,7 @@ class Headers { public const DELAY = 'x-delay', - PRIORITY = 'x-priority' + PRIORITY = 'x-priority', + MESSAGE_TYPE = 'x-message-type' ; } diff --git a/src/Remote/AMQP/Packager.php b/src/Remote/AMQP/Packager.php new file mode 100644 index 0000000..8eb17ee --- /dev/null +++ b/src/Remote/AMQP/Packager.php @@ -0,0 +1,70 @@ +headers, [ + Headers::MESSAGE_TYPE => $envelope->class, + ]); + + return new AMQPMessage($envelope->payload, [ + 'delivery_mode' => $this->deliveryMode, + 'application_headers' => new AMQPTable($headers), + ]); + } + + public function unpack(AMQPMessage $message): Envelope + { + $headers = $message->get('application_headers'); + + if (!$headers instanceof AMQPAbstractCollection) { + throw new RemoteException('Message headers not found.'); + } + + $headers = $headers->getNativeData(); + + if (!isset($headers[Headers::MESSAGE_TYPE])) { + throw new RemoteException('Message type not found.'); + } + + /** @var class-string $class */ + $class = $headers[Headers::MESSAGE_TYPE]; + + unset($headers[Headers::MESSAGE_TYPE]); + + return new Envelope($class, $message->getBody(), array_replace($headers, [ + self::OPTION_LOCAL => true, + self::OPTION_EXCHANGE => $message->getExchange(), + self::OPTION_ROUTING_KEY => $message->getRoutingKey(), + self::OPTION_CONSUMER_TAG => $message->getConsumerTag(), + self::OPTION_DELIVERY_TAG => $message->getDeliveryTag(), + ])); + } +} diff --git a/src/Remote/AMQP/Queue.php b/src/Remote/AMQP/Queue.php index 53f9325..3a96843 100644 --- a/src/Remote/AMQP/Queue.php +++ b/src/Remote/AMQP/Queue.php @@ -18,12 +18,14 @@ final class Queue ; /** + * @param array $bindings * @param array $args */ public function __construct( public string $name, public string $pattern, - public AMQPFlags $flags, + private array $bindings, + public Flags $flags, public array $args = [], ) {} @@ -34,8 +36,13 @@ public static function create(array $options): self { $pattern = $options['pattern'] ?? '#'; $name = $options['queue'] ?? $pattern; + $bindings = $options['bindings'] ?? []; $args = $options['args'] ?? []; + if (is_string($bindings)) { + $bindings = [$bindings]; + } + if (!is_string($name)) { throw new InvalidArgumentException('Queue name must be a string'); } @@ -44,11 +51,15 @@ public static function create(array $options): self throw new InvalidArgumentException('Queue pattern must be a string or null'); } + if (!is_array($bindings)) { + throw new InvalidArgumentException('Queue binding must be an array'); + } + if (!is_array($args)) { throw new InvalidArgumentException('Queue arguments must be an array'); } - return new self($name, $pattern, AMQPFlags::compute($options), $args); + return new self($name, $pattern, $bindings, Flags::compute($options), $args); } public function is(int $flag): bool @@ -56,27 +67,29 @@ public function is(int $flag): bool return $this->flags->is($flag); } - public function consume(AMQPChannel $channel, Exchange $exchange, callable $handler): void + public function consume(AMQPChannel $channel, callable $handler): void { $channel->queue_declare( $this->name, - $this->is(AMQPFlags::PASSIVE), - $this->is(AMQPFlags::DURABLE), - $this->is(AMQPFlags::EXCLUSIVE), - $this->is(AMQPFlags::DELETE), - $this->is(AMQPFlags::NO_WAIT), + $this->is(Flags::PASSIVE), + $this->is(Flags::DURABLE), + $this->is(Flags::EXCLUSIVE), + $this->is(Flags::DELETE), + $this->is(Flags::NO_WAIT), new AMQPTable($this->args) ); - $channel->queue_bind($this->name, $exchange->name, $this->pattern); + foreach ($this->bindings as $binding) { + $channel->queue_bind($this->name, $binding, $this->pattern); + } $channel->basic_consume( $this->name, '', - $this->is(AMQPFlags::NO_LOCAL), - $this->is(AMQPFlags::NO_ACK), - $this->is(AMQPFlags::EXCLUSIVE), - $this->is(AMQPFlags::NO_WAIT), + $this->is(Flags::NO_LOCAL), + $this->is(Flags::NO_ACK), + $this->is(Flags::EXCLUSIVE), + $this->is(Flags::NO_WAIT), $handler ); } diff --git a/src/Remote/AMQP/Route.php b/src/Remote/AMQP/Route.php index c170b96..3c523f9 100644 --- a/src/Remote/AMQP/Route.php +++ b/src/Remote/AMQP/Route.php @@ -9,5 +9,6 @@ final class Route public function __construct( public string $exchange, public string $name, + public bool $mandatory = false, ) {} } diff --git a/src/Remote/AMQP/Router.php b/src/Remote/AMQP/Router.php index 9f85980..05cc40e 100644 --- a/src/Remote/AMQP/Router.php +++ b/src/Remote/AMQP/Router.php @@ -8,5 +8,5 @@ interface Router { - public function match(Envelope $envelope, Exchange $exchange): Route; + public function match(Envelope $envelope): Route; } diff --git a/src/Remote/AMQP/SimpleRouter.php b/src/Remote/AMQP/SimpleRouter.php index 89a9aec..40916bd 100644 --- a/src/Remote/AMQP/SimpleRouter.php +++ b/src/Remote/AMQP/SimpleRouter.php @@ -10,20 +10,23 @@ final class SimpleRouter implements Router { /** * @param array $routes + * @param array $mandatory */ public function __construct( + private string $exchange = '', private array $routes = [], + private array $mandatory = [], ) {} - public function match(Envelope $envelope, Exchange $exchange): Route + public function match(Envelope $envelope): Route { - $target = $this->exchange($envelope->class, $exchange->name); + $target = $this->exchange($envelope->class); $name = strtolower(str_replace('\\', '.', $envelope->class)); - return new Route($target, $name); + return new Route($target, $name, in_array($envelope->class, $this->mandatory)); } - private function exchange(string $type, string $default): string + private function exchange(string $type): string { foreach ($this->routes as $pattern => $exchange) { if (!fnmatch($pattern, $type, FNM_NOESCAPE)) { @@ -33,6 +36,6 @@ private function exchange(string $type, string $default): string return $exchange; } - return $default; + return $this->exchange; } } diff --git a/src/Remote/AMQP/Transport.php b/src/Remote/AMQP/Transport.php new file mode 100644 index 0000000..17c93e2 --- /dev/null +++ b/src/Remote/AMQP/Transport.php @@ -0,0 +1,48 @@ + $routes + */ + public static function create(string $dsn, string $exchange = '', array $routes = []): self + { + $router = new SimpleRouter($exchange, array_filter($routes, 'is_string')); + + return new self(Connector::create($dsn), new Packager(), $router); + } + + public function send(Envelope $envelope): void + { + $message = $this->packager->pack($envelope); + $route = $this->router->match($envelope); + + $channel = $this->connector->connect(); + $channel->basic_publish( + $message, + $route->exchange, + $route->name, + $route->mandatory, + ); + } + + public function consume(): Consumer + { + return new Consumer($this->connector, $this->packager, $this->logger); + } +} diff --git a/src/Remote/RemoteException.php b/src/Remote/RemoteException.php new file mode 100644 index 0000000..34ffe6c --- /dev/null +++ b/src/Remote/RemoteException.php @@ -0,0 +1,9 @@ +isLocal(get_class($message), $context)) { + if ($context->has(Packager::OPTION_LOCAL) || in_array(get_class($message), $this->local)) { $next($message, $context); } else { $this->gateway->send($message, $context); } } - - private function isLocal(string $class, Context $context): bool - { - return $context->isLocal() || in_array($class, $this->local); - } } diff --git a/tests/Remote/AMQP/AMQPTransportTest.php b/tests/Remote/AMQP/TransportTest.php similarity index 83% rename from tests/Remote/AMQP/AMQPTransportTest.php rename to tests/Remote/AMQP/TransportTest.php index b78aaca..dfc17d1 100644 --- a/tests/Remote/AMQP/AMQPTransportTest.php +++ b/tests/Remote/AMQP/TransportTest.php @@ -5,10 +5,10 @@ namespace Onliner\CommandBus\Tests\Remote\AMQP; use InvalidArgumentException; -use Onliner\CommandBus\Remote\AMQP\AMQPTransport; use Onliner\CommandBus\Remote\AMQP\Connector; -use Onliner\CommandBus\Remote\AMQP\Exchange; +use Onliner\CommandBus\Remote\AMQP\Packager; use Onliner\CommandBus\Remote\AMQP\SimpleRouter; +use Onliner\CommandBus\Remote\AMQP\Transport; use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Tests\Command\Hello; use PhpAmqpLib\Channel\AMQPChannel; @@ -16,14 +16,14 @@ use PhpAmqpLib\Wire\AMQPTable; use PHPUnit\Framework\TestCase; -class AMQPTransportTest extends TestCase +class TransportTest extends TestCase { public function testCreate(): void { $error = null; try { - AMQPTransport::create('amqp://guest:guest@localhost/vhost?timeout=1&foo=bar'); + Transport::create('amqp://guest:guest@localhost/vhost?timeout=1&foo=bar'); } catch (InvalidArgumentException $error) { } @@ -35,7 +35,7 @@ public function testCreateWithMalformedUrl(): void self::expectException(InvalidArgumentException::class); self::expectExceptionMessage('Invalid transport DSN'); - AMQPTransport::create('//'); + Transport::create('//'); } public function testSend(): void @@ -67,7 +67,7 @@ public function testSend(): void ->willReturn($channel) ; - $transport = new AMQPTransport($connector, Exchange::create([]), new SimpleRouter()); + $transport = new Transport($connector, new Packager(), new SimpleRouter()); $transport->send($envelope); $transport->send($envelope); }