From 383300c5c57de95191609364af4d5b0c982b51cc Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Wed, 20 Dec 2023 11:43:26 +0100 Subject: [PATCH 1/2] Bump min PHP version to 8.0 --- .github/workflows/test.yml | 3 +- composer.json | 4 +- src/Builder.php | 6 +-- src/Context.php | 49 ++++++----------- src/Dispatcher.php | 24 +++------ src/Exception/InvalidHandlerException.php | 13 +++++ src/Exception/InvalidMessageException.php | 13 +++++ src/Exception/UnknownHandlerException.php | 4 +- src/Message/Deferred.php | 18 +++++++ ...ssageIterator.php => DeferredIterator.php} | 18 +++---- src/Middleware/LoggerMiddleware.php | 14 +---- src/Remote/AMQP/AMQPConsumer.php | 53 ++++++++----------- src/Remote/AMQP/AMQPFlags.php | 8 +-- src/Remote/AMQP/AMQPTransport.php | 47 +++++----------- src/Remote/AMQP/Connector.php | 38 +++---------- src/Remote/AMQP/Exchange.php | 50 ++++++++--------- src/Remote/AMQP/Headers.php | 11 ++++ src/Remote/AMQP/Queue.php | 49 +++++++++-------- src/Remote/AMQP/Route.php | 14 +---- src/Remote/AMQP/Router/SimpleRouter.php | 16 ++---- src/Remote/Envelope.php | 22 +------- src/Remote/Gateway.php | 14 +---- src/Remote/RemoteExtension.php | 13 ++--- src/Remote/RemoteMiddleware.php | 14 +---- src/Remote/Serializer/NativeSerializer.php | 9 +++- src/Remote/Transport/MemoryTransport.php | 16 ++++-- src/Remote/Transport/MultiTransport.php | 12 ++--- src/Resolver/CallableResolver.php | 6 +-- src/Resolver/ContainerResolver.php | 18 +++---- src/Resolver/MiddlewareResolver.php | 10 +--- src/Retry/Policy/SimplePolicy.php | 14 +---- src/Retry/RetryExtension.php | 7 +-- src/Retry/RetryMiddleware.php | 18 ++----- tests/ContextTest.php | 23 ++++---- tests/Remote/AMQP/AMQPTransportTest.php | 10 ++-- tests/Remote/EnvelopeTest.php | 5 +- tests/Remote/GatewayTest.php | 6 +-- .../Remote/Transport/MemoryTransportTest.php | 6 ++- tests/Resolver/MiddlewareResolverTest.php | 4 +- 39 files changed, 269 insertions(+), 410 deletions(-) create mode 100644 src/Exception/InvalidHandlerException.php create mode 100644 src/Exception/InvalidMessageException.php create mode 100644 src/Message/Deferred.php rename src/Message/{MessageIterator.php => DeferredIterator.php} (50%) create mode 100644 src/Remote/AMQP/Headers.php diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 91fba22..e68daf1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - php: ['7.2', '7.3', '7.4', '8.0'] + php: ['8.0', '8.1', '8.2', '8.3'] name: PHP ${{ matrix.php }} Test on ${{ matrix.os }} steps: - name: Checkout @@ -18,7 +18,6 @@ jobs: uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php }} - extensions: curl coverage: none - name: Install dependencies diff --git a/composer.json b/composer.json index a9b2ece..1ee0a4e 100644 --- a/composer.json +++ b/composer.json @@ -5,14 +5,14 @@ "keywords": ["command-bus", "command", "service-bus"], "license": "MIT", "require": { - "php": "^7.2 || ^8.0", + "php": "^8.0", "psr/container": "^1.0 || ^2.0", "psr/log": "^1.1 || ^2.0", "php-amqplib/php-amqplib": "^2.12 || ^3.0" }, "require-dev": { "phpunit/phpunit": "^8.5|^9.0", - "phpstan/phpstan": "^0.12.14" + "phpstan/phpstan": "^1.10" }, "autoload": { "psr-4": { diff --git a/src/Builder.php b/src/Builder.php index c3d2e42..2e85812 100644 --- a/src/Builder.php +++ b/src/Builder.php @@ -12,17 +12,17 @@ final class Builder /** * @var array */ - private $handlers = []; + private array $handlers = []; /** * @var array */ - private $middleware = []; + private array $middleware = []; /** * @var array */ - private $extensions = []; + private array $extensions = []; /** * @param string $command diff --git a/src/Context.php b/src/Context.php index 944aa8e..d1f429f 100644 --- a/src/Context.php +++ b/src/Context.php @@ -4,42 +4,25 @@ namespace Onliner\CommandBus; -use Onliner\CommandBus\Message\MessageIterator; +use Onliner\CommandBus\Message\DeferredIterator; final class Context { /** - * @var Dispatcher + * @param Dispatcher $dispatcher + * @param DeferredIterator $deferred + * @param array $options */ - private $dispatcher; - - /** - * @var MessageIterator - */ - private $deferred; - - /** - * @var array - */ - private $options; - - /** - * @internal - * - * @param Dispatcher $dispatcher - * @param MessageIterator $deferred - * @param array $options - */ - public function __construct(Dispatcher $dispatcher, MessageIterator $deferred, array $options = []) - { - $this->dispatcher = $dispatcher; - $this->deferred = $deferred; - $this->options = $options; + public function __construct( + private Dispatcher $dispatcher, + private DeferredIterator $deferred, + private array $options = [] + ) { } /** - * @param object $message - * @param array $options + * @param object $message + * @param array $options * * @return void */ @@ -49,8 +32,8 @@ public function dispatch(object $message, array $options = []): void } /** - * @param object $message - * @param array $options + * @param object $message + * @param array $options * * @return self */ @@ -62,7 +45,7 @@ public function defer(object $message, array $options = []): self } /** - * @return array + * @return array */ public function all(): array { @@ -85,7 +68,7 @@ public function has(string $option): bool * * @return mixed */ - public function get(string $option, $default = null) + public function get(string $option, mixed $default = null): mixed { return $this->options[$option] ?? $default; } @@ -96,7 +79,7 @@ public function get(string $option, $default = null) * * @return self */ - public function set(string $option, $value): self + public function set(string $option, mixed $value): self { $this->options[$option] = $value; diff --git a/src/Dispatcher.php b/src/Dispatcher.php index c494d32..38c05a3 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -4,41 +4,33 @@ namespace Onliner\CommandBus; -use Onliner\CommandBus\Message\MessageIterator; +use Onliner\CommandBus\Message\DeferredIterator; final class Dispatcher { - /** - * @var Resolver - */ - private $resolver; - /** * @param Resolver $resolver */ - public function __construct(Resolver $resolver) + public function __construct(private Resolver $resolver) { - $this->resolver = $resolver; } /** - * @param object $message - * @param array $options + * @param object $message + * @param array $options * * @return void */ public function dispatch(object $message, array $options = []): void { $handler = $this->resolver->resolve($message); - $deferred = new MessageIterator(); - $context = new Context($this, $deferred, $options); + $iterator = new DeferredIterator(); + $context = new Context($this, $iterator, $options); $handler($message, $context); - foreach ($deferred as $item) { - [$deferredMessage, $deferredOptions] = $item; - - $this->dispatch($deferredMessage, $deferredOptions); + foreach ($iterator as $deferred) { + $this->dispatch($deferred->message, $deferred->options); } } } diff --git a/src/Exception/InvalidHandlerException.php b/src/Exception/InvalidHandlerException.php new file mode 100644 index 0000000..661fa29 --- /dev/null +++ b/src/Exception/InvalidHandlerException.php @@ -0,0 +1,13 @@ + $options + */ + public function __construct( + public object $message, + public array $options, + ) { + } +} diff --git a/src/Message/MessageIterator.php b/src/Message/DeferredIterator.php similarity index 50% rename from src/Message/MessageIterator.php rename to src/Message/DeferredIterator.php index 1fa67da..cde7371 100644 --- a/src/Message/MessageIterator.php +++ b/src/Message/DeferredIterator.php @@ -8,30 +8,30 @@ use IteratorAggregate; /** - * @implements IteratorAggregate + * @implements IteratorAggregate */ -class MessageIterator implements IteratorAggregate +class DeferredIterator implements IteratorAggregate { /** - * @var array + * @var array */ - private $messages = []; + private array $messages = []; /** - * @param object $message - * @param array $options + * @param object $message + * @param array $options * - * @return self + * @return self */ public function append(object $message, array $options): self { - $this->messages[] = [$message, $options]; + $this->messages[] = new Deferred($message, $options); return $this; } /** - * @return Generator + * @return Generator */ public function getIterator(): Generator { diff --git a/src/Middleware/LoggerMiddleware.php b/src/Middleware/LoggerMiddleware.php index 7e85ee7..e4c4fec 100644 --- a/src/Middleware/LoggerMiddleware.php +++ b/src/Middleware/LoggerMiddleware.php @@ -12,24 +12,12 @@ final class LoggerMiddleware implements Middleware { - /** - * @var LoggerInterface - */ - private $logger; - - /** - * @var string - */ - private $level; - /** * @param LoggerInterface $logger * @param string $level */ - public function __construct(LoggerInterface $logger, string $level = LogLevel::ERROR) + public function __construct(private LoggerInterface $logger, private string $level = LogLevel::ERROR) { - $this->logger = $logger; - $this->level = $level; } /** diff --git a/src/Remote/AMQP/AMQPConsumer.php b/src/Remote/AMQP/AMQPConsumer.php index 37e5336..870b8a5 100644 --- a/src/Remote/AMQP/AMQPConsumer.php +++ b/src/Remote/AMQP/AMQPConsumer.php @@ -11,6 +11,7 @@ use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPIOException; use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPAbstractCollection; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Throwable; @@ -28,41 +29,25 @@ final class AMQPConsumer implements Consumer DEFAULT_INTERVAL = 1000000 ; - /** - * @var Connector - */ - private $connector; - - /** - * @var Exchange - */ - private $exchange; + private LoggerInterface $logger; + private bool $running = false; /** - * @var LoggerInterface + * @var Queue[] */ - private $logger; - - /** - * @var array - */ - private $queues = []; - - /** - * @var bool - */ - private $running = false; + private array $queues = []; /** * @param Connector $connector * @param Exchange $exchange * @param LoggerInterface|null $logger */ - public function __construct(Connector $connector, Exchange $exchange, LoggerInterface $logger = null) - { - $this->connector = $connector; - $this->exchange = $exchange; - $this->logger = $logger ?? new NullLogger(); + public function __construct( + private Connector $connector, + private Exchange $exchange, + LoggerInterface $logger = null + ) { + $this->logger = $logger ?? new NullLogger(); } /** @@ -193,8 +178,15 @@ private function connect(array $options): AMQPChannel */ private function handle(AMQPMessage $message, Dispatcher $dispatcher): void { - $headers = $message->get('application_headers')->getNativeData(); - $headers = array_merge($headers, [ + $headers = $message->get('application_headers'); + + if (!$headers instanceof AMQPAbstractCollection) { + $this->logger->warning('Message headers not found.'); + + return; + } + + $headers = array_replace($headers->getNativeData(), [ Exchange::HEADER_EXCHANGE => $message->getExchange(), Exchange::HEADER_ROUTING_KEY => $message->getRoutingKey(), Exchange::HEADER_CONSUMER_TAG => $message->getConsumerTag(), @@ -208,8 +200,9 @@ private function handle(AMQPMessage $message, Dispatcher $dispatcher): void return; } - $type = $headers[Exchange::HEADER_MESSAGE_TYPE]; + /** @var class-string $class */ + $class = $headers[Exchange::HEADER_MESSAGE_TYPE]; - $dispatcher->dispatch(new Envelope($type, $message->getBody(), $headers)); + $dispatcher->dispatch(new Envelope($class, $message->getBody(), $headers)); } } diff --git a/src/Remote/AMQP/AMQPFlags.php b/src/Remote/AMQP/AMQPFlags.php index 0f744bb..615c588 100644 --- a/src/Remote/AMQP/AMQPFlags.php +++ b/src/Remote/AMQP/AMQPFlags.php @@ -28,17 +28,11 @@ class AMQPFlags 'no_ack' => self::NO_ACK, // Used only for consume ]; - /** - * @var int - */ - private $value; - /** * @param int $value */ - public function __construct(int $value) + public function __construct(private int $value) { - $this->value = $value; } /** diff --git a/src/Remote/AMQP/AMQPTransport.php b/src/Remote/AMQP/AMQPTransport.php index b2fde78..a6ef838 100644 --- a/src/Remote/AMQP/AMQPTransport.php +++ b/src/Remote/AMQP/AMQPTransport.php @@ -11,7 +11,6 @@ use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; final class AMQPTransport implements Transport { @@ -19,42 +18,18 @@ final class AMQPTransport implements Transport 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; - /** - * @var Connector - */ - private $connector; - - /** - * @var Exchange - */ - private $exchange; - - /** - * @var Router - */ - private $router; - - /** - * @var LoggerInterface - */ - private $logger; - /** * @param Connector $connector * @param Exchange $exchange - * @param Router|null $router + * @param Router $router * @param LoggerInterface|null $logger */ public function __construct( - Connector $connector, - Exchange $exchange, - Router $router = null, - LoggerInterface $logger = null + private Connector $connector, + private Exchange $exchange, + private Router $router, + private ?LoggerInterface $logger = null ) { - $this->connector = $connector; - $this->exchange = $exchange; - $this->router = $router ?? new SimpleRouter(); - $this->logger = $logger ?? new NullLogger(); } /** @@ -65,9 +40,13 @@ public function __construct( */ public static function create(string $dsn, array $options = []): self { - $resolver = new SimpleRouter($options['routes'] ?? []); + if (!is_array($routes = $options['routes'] ?? false)) { + $routes = []; + } + + $router = new SimpleRouter(array_filter($routes, 'is_string')); - return new self(Connector::create($dsn), Exchange::create($options), $resolver); + return new self(Connector::create($dsn), Exchange::create($options), $router); } /** @@ -76,7 +55,7 @@ public static function create(string $dsn, array $options = []): self public function send(Envelope $envelope): void { $headers = $envelope->headers + [ - Exchange::HEADER_MESSAGE_TYPE => $envelope->type, + Exchange::HEADER_MESSAGE_TYPE => $envelope->class, ]; $message = new AMQPMessage($envelope->payload, self::MESSAGE_PROPERTIES); @@ -90,8 +69,6 @@ public function send(Envelope $envelope): void $message, $route->exchange(), $route->name(), - false, - false ); } diff --git a/src/Remote/AMQP/Connector.php b/src/Remote/AMQP/Connector.php index 544599b..39fe260 100644 --- a/src/Remote/AMQP/Connector.php +++ b/src/Remote/AMQP/Connector.php @@ -12,34 +12,15 @@ class Connector { - /** - * @var array> - */ - private $hosts; - - /** - * @var array - */ - private $options; - - /** - * @var AMQPChannel|null - */ - private $channel; - - /** - * @var PCNTLHeartbeatSender|null - */ - private $heartbeats; + private ?AMQPChannel $channel = null; + private ?PCNTLHeartbeatSender $heartbeats = null; /** - * @param array> $hosts - * @param array $options + * @param array> $hosts + * @param array $options */ - public function __construct(array $hosts, array $options) + public function __construct(private array $hosts, private array $options) { - $this->hosts = $hosts; - $this->options = $options; } /** @@ -100,12 +81,7 @@ public function connect(): AMQPChannel public function __destruct() { - if ($this->heartbeats !== null) { - $this->heartbeats->unregister(); - } - - if ($this->channel !== null) { - $this->channel->close(); - } + $this->heartbeats?->unregister(); + $this->channel?->close(); } } diff --git a/src/Remote/AMQP/Exchange.php b/src/Remote/AMQP/Exchange.php index 8cc5d72..6eafc80 100644 --- a/src/Remote/AMQP/Exchange.php +++ b/src/Remote/AMQP/Exchange.php @@ -4,6 +4,7 @@ namespace Onliner\CommandBus\Remote\AMQP; +use InvalidArgumentException; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Wire\AMQPTable; @@ -26,25 +27,7 @@ final class Exchange HEADER_MESSAGE_TYPE = 'x-message-type' ; - /** - * @var string - */ - private $name; - - /** - * @var string - */ - private $type; - - /** - * @var AMQPFlags - */ - private $flags; - - /** - * @var array - */ - private $args; + private AMQPFlags $flags; /** * @param string $name @@ -53,15 +36,12 @@ final class Exchange * @param array $args */ public function __construct( - string $name, - string $type = self::TYPE_TOPIC, + private string $name, + private string $type = self::TYPE_TOPIC, AMQPFlags $flags = null, - array $args = [] + private array $args = [] ) { - $this->name = $name; - $this->type = $type; $this->flags = $flags ?? AMQPFlags::default(); - $this->args = $args; } /** @@ -72,15 +52,27 @@ public function __construct( public static function create(array $options): self { $type = $options['type'] ?? self::TYPE_TOPIC; - $name = $options['exchange'] ?? sprintf('amqp.%s', $type); - $flags = AMQPFlags::compute($options); - $args = $options['args'] ?? []; + + if (!is_string($type)) { + throw new InvalidArgumentException('Exchange type must be a string'); + } + + $name = $options['exchange'] ?? sprintf('amqp.%s', $type); + $args = $options['args'] ?? []; + + if (!is_string($name)) { + throw new InvalidArgumentException('Exchange name must be a string'); + } + + if (!is_array($args)) { + throw new InvalidArgumentException('Exchange arguments must be an array'); + } if ($type === self::TYPE_DELAYED && !isset($args['x-delayed-type'])) { $args['x-delayed-type'] = self::TYPE_TOPIC; } - return new self($name, $type, $flags, $args); + return new self($name, $type, AMQPFlags::compute($options), $args); } /** diff --git a/src/Remote/AMQP/Headers.php b/src/Remote/AMQP/Headers.php new file mode 100644 index 0000000..e5d185f --- /dev/null +++ b/src/Remote/AMQP/Headers.php @@ -0,0 +1,11 @@ + - */ - private $args; + private string $pattern; + private AMQPFlags $flags; /** * @param string $name @@ -42,12 +26,14 @@ final class Queue * @param AMQPFlags|null $flags * @param array $args */ - public function __construct(string $name, string $pattern = null, AMQPFlags $flags = null, array $args = []) - { - $this->name = $name; + public function __construct( + private string $name, + string $pattern = null, + AMQPFlags $flags = null, + private array $args = [] + ) { $this->pattern = $pattern ?? $name; $this->flags = $flags ?? AMQPFlags::default(); - $this->args = $args; } /** @@ -58,9 +44,22 @@ public function __construct(string $name, string $pattern = null, AMQPFlags $fla public static function create(array $options): self { $pattern = $options['pattern'] ?? '#'; - $name = $options['queue'] ?? $pattern; + $name = $options['queue'] ?? $pattern; + $args = $options['args'] ?? []; + + if (!is_string($name)) { + throw new InvalidArgumentException('Queue name must be a string'); + } + + if ($pattern !== null && !is_string($pattern)) { + throw new InvalidArgumentException('Queue pattern must be a string or null'); + } + + if (!is_array($args)) { + throw new InvalidArgumentException('Queue arguments must be an array'); + } - return new self($name, $pattern, AMQPFlags::compute($options), $options['args'] ?? []); + return new self($name, $pattern, AMQPFlags::compute($options), $args); } /** diff --git a/src/Remote/AMQP/Route.php b/src/Remote/AMQP/Route.php index a4938af..11aabd0 100644 --- a/src/Remote/AMQP/Route.php +++ b/src/Remote/AMQP/Route.php @@ -6,24 +6,12 @@ final class Route { - /** - * @var string - */ - private $exchange; - - /** - * @var string - */ - private $name; - /** * @param string $exchange * @param string $name */ - public function __construct(string $exchange, string $name) + public function __construct(private string $exchange, private string $name) { - $this->exchange = $exchange; - $this->name = $name; } /** diff --git a/src/Remote/AMQP/Router/SimpleRouter.php b/src/Remote/AMQP/Router/SimpleRouter.php index 730665d..5719f16 100644 --- a/src/Remote/AMQP/Router/SimpleRouter.php +++ b/src/Remote/AMQP/Router/SimpleRouter.php @@ -5,23 +5,17 @@ namespace Onliner\CommandBus\Remote\AMQP\Router; use Onliner\CommandBus\Remote\AMQP\Exchange; -use Onliner\CommandBus\Remote\AMQP\Router; use Onliner\CommandBus\Remote\AMQP\Route; +use Onliner\CommandBus\Remote\AMQP\Router; use Onliner\CommandBus\Remote\Envelope; final class SimpleRouter implements Router { /** - * @var array - */ - private $routes; - - /** - * @param array $routes + * @param array $routes */ - public function __construct(array $routes = []) + public function __construct(private array $routes = []) { - $this->routes = $routes; } /** @@ -29,8 +23,8 @@ public function __construct(array $routes = []) */ public function match(Envelope $envelope, Exchange $exchange): Route { - $target = $this->exchange($envelope->type, $exchange->name()); - $name = strtolower(str_replace('\\', '.', $envelope->type)); + $target = $this->exchange($envelope->class, $exchange->name()); + $name = strtolower(str_replace('\\', '.', $envelope->class)); return new Route($target, $name); } diff --git a/src/Remote/Envelope.php b/src/Remote/Envelope.php index 485237f..136cc01 100644 --- a/src/Remote/Envelope.php +++ b/src/Remote/Envelope.php @@ -7,29 +7,11 @@ final class Envelope { /** - * @var string - */ - public $type; - - /** - * @var string - */ - public $payload; - - /** - * @var array - */ - public $headers; - - /** - * @param string $type + * @param class-string $class * @param string $payload * @param array $headers */ - public function __construct(string $type, string $payload, array $headers = []) + public function __construct(public string $class, public string $payload, public array $headers = []) { - $this->type = $type; - $this->payload = $payload; - $this->headers = $headers; } } diff --git a/src/Remote/Gateway.php b/src/Remote/Gateway.php index ff4507b..d933ab4 100644 --- a/src/Remote/Gateway.php +++ b/src/Remote/Gateway.php @@ -10,24 +10,12 @@ final class Gateway { public const OPTION_LOCAL = 'local'; - /** - * @var Transport - */ - private $transport; - - /** - * @var Serializer - */ - private $serializer; - /** * @param Transport $transport * @param Serializer $serializer */ - public function __construct(Transport $transport, Serializer $serializer) + public function __construct(private Transport $transport, private Serializer $serializer) { - $this->transport = $transport; - $this->serializer = $serializer; } /** diff --git a/src/Remote/RemoteExtension.php b/src/Remote/RemoteExtension.php index f138804..8adcddd 100644 --- a/src/Remote/RemoteExtension.php +++ b/src/Remote/RemoteExtension.php @@ -9,20 +9,13 @@ final class RemoteExtension implements Extension { - /** - * @var Transport - */ - private $transport; - - /** - * @var Serializer - */ - private $serializer; + private Transport $transport; + private Serializer $serializer; /** * @var array */ - private $local = [ + private array $local = [ Envelope::class, ]; diff --git a/src/Remote/RemoteMiddleware.php b/src/Remote/RemoteMiddleware.php index 17a2b03..b274812 100644 --- a/src/Remote/RemoteMiddleware.php +++ b/src/Remote/RemoteMiddleware.php @@ -9,24 +9,12 @@ final class RemoteMiddleware implements Middleware { - /** - * @var Gateway - */ - private $gateway; - - /** - * @var array - */ - private $local; - /** * @param Gateway $gateway * @param array $local */ - public function __construct(Gateway $gateway, array $local = []) + public function __construct(private Gateway $gateway, private array $local = []) { - $this->gateway = $gateway; - $this->local = $local; } /** diff --git a/src/Remote/Serializer/NativeSerializer.php b/src/Remote/Serializer/NativeSerializer.php index af0bbdc..ebb97cf 100644 --- a/src/Remote/Serializer/NativeSerializer.php +++ b/src/Remote/Serializer/NativeSerializer.php @@ -4,6 +4,7 @@ namespace Onliner\CommandBus\Remote\Serializer; +use Onliner\CommandBus\Exception; use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Remote\Serializer; @@ -22,6 +23,12 @@ public function serialize(object $command, array $headers = []): Envelope */ public function unserialize(Envelope $envelope): object { - return unserialize($envelope->payload); + $message = unserialize($envelope->payload); + + if (!$message instanceof $envelope->class) { + throw new Exception\InvalidMessageException($envelope->class); + } + + return $message; } } diff --git a/src/Remote/Transport/MemoryTransport.php b/src/Remote/Transport/MemoryTransport.php index ca2aa30..e553836 100644 --- a/src/Remote/Transport/MemoryTransport.php +++ b/src/Remote/Transport/MemoryTransport.php @@ -15,19 +15,19 @@ final class MemoryTransport implements Transport, Consumer /** * @var array> */ - private $envelopes = []; + private array $envelopes = []; /** * @var bool */ - private $running = false; + private bool $running = false; /** * {@inheritDoc} */ public function send(Envelope $envelope): void { - $this->envelopes[$envelope->type][] = $envelope; + $this->envelopes[$envelope->class][] = $envelope; } /** @@ -59,7 +59,7 @@ public function run(Dispatcher $dispatcher, array $options = []): void } } } - } while ($this->running); + } while ($this->isRunning()); } /** @@ -95,4 +95,12 @@ public function receive(string $type): array { return $this->envelopes[$type] ?? []; } + + /** + * @return bool + */ + public function isRunning(): bool + { + return $this->running; + } } diff --git a/src/Remote/Transport/MultiTransport.php b/src/Remote/Transport/MultiTransport.php index 2721fe0..8c676d5 100644 --- a/src/Remote/Transport/MultiTransport.php +++ b/src/Remote/Transport/MultiTransport.php @@ -10,22 +10,16 @@ class MultiTransport implements Transport { - /** - * @var Transport - */ - private $default; - /** * @var array */ - private $transports = []; + private array $transports = []; /** * @param Transport $default */ - public function __construct(Transport $default) + public function __construct(private Transport $default) { - $this->default = $default; } /** @@ -44,7 +38,7 @@ public function add(string $pattern, Transport $transport): void */ public function send(Envelope $envelope): void { - $this->match($envelope->type)->send($envelope); + $this->match($envelope->class)->send($envelope); } /** diff --git a/src/Resolver/CallableResolver.php b/src/Resolver/CallableResolver.php index 933a81d..70e3dca 100644 --- a/src/Resolver/CallableResolver.php +++ b/src/Resolver/CallableResolver.php @@ -12,7 +12,7 @@ final class CallableResolver implements Resolver /** * @var array */ - private $handlers = []; + private array $handlers = []; /** * @param string $class @@ -36,8 +36,6 @@ public function resolve(object $command): callable } } while ($class = get_parent_class($class)); - return static function () use ($command) { - throw Exception\UnknownHandlerException::forCommand($command); - }; + return fn () => throw new Exception\UnknownHandlerException(get_class($command)); } } diff --git a/src/Resolver/ContainerResolver.php b/src/Resolver/ContainerResolver.php index 7a7fce8..bed8d2b 100644 --- a/src/Resolver/ContainerResolver.php +++ b/src/Resolver/ContainerResolver.php @@ -4,28 +4,23 @@ namespace Onliner\CommandBus\Resolver; +use Onliner\CommandBus\Context; use Onliner\CommandBus\Exception; use Onliner\CommandBus\Resolver; use Psr\Container\ContainerInterface; final class ContainerResolver implements Resolver { - /** - * @var ContainerInterface - */ - private $container; - /** * @var array */ - private $handlers = []; + private array $handlers = []; /** * @param ContainerInterface $container */ - public function __construct(ContainerInterface $container) + public function __construct(private ContainerInterface $container) { - $this->container = $container; } /** @@ -42,7 +37,7 @@ public function register(string $class, string $handler): void */ public function resolve(object $command): callable { - return function ($command, $context) { + return function (object $command, Context $context) { $class = get_class($command); if (!isset($this->handlers[$class])) { @@ -50,6 +45,11 @@ public function resolve(object $command): callable } $handler = $this->container->get($this->handlers[$class]); + + if (!is_callable($handler)) { + throw new Exception\InvalidHandlerException($class); + } + $handler($command, $context); }; } diff --git a/src/Resolver/MiddlewareResolver.php b/src/Resolver/MiddlewareResolver.php index 16a5496..b6d8ab2 100644 --- a/src/Resolver/MiddlewareResolver.php +++ b/src/Resolver/MiddlewareResolver.php @@ -10,22 +10,16 @@ final class MiddlewareResolver implements Resolver { - /** - * @var Resolver - */ - private $resolver; - /** * @var array */ - private $stack = []; + private array $stack = []; /** * @param Resolver $resolver */ - public function __construct(Resolver $resolver) + public function __construct(private Resolver $resolver) { - $this->resolver = $resolver; } /** diff --git a/src/Retry/Policy/SimplePolicy.php b/src/Retry/Policy/SimplePolicy.php index 090ad8d..ac1d88a 100644 --- a/src/Retry/Policy/SimplePolicy.php +++ b/src/Retry/Policy/SimplePolicy.php @@ -12,24 +12,12 @@ final class SimplePolicy implements Policy { private const OPTION_ATTEMPT = 'attempt'; - /** - * @var int - */ - private $retries; - - /** - * @var int - */ - private $delay; - /** * @param int $retries * @param int $delay */ - public function __construct(int $retries, int $delay = 0) + public function __construct(private int $retries, private int $delay = 0) { - $this->retries = $retries; - $this->delay = $delay; } /** diff --git a/src/Retry/RetryExtension.php b/src/Retry/RetryExtension.php index a7da661..c57b043 100644 --- a/src/Retry/RetryExtension.php +++ b/src/Retry/RetryExtension.php @@ -9,15 +9,12 @@ final class RetryExtension implements Extension { - /** - * @var Policy - */ - private $default; + private Policy $default; /** * @var array */ - private $policies = []; + private array $policies = []; /** * @param Policy|null $default diff --git a/src/Retry/RetryMiddleware.php b/src/Retry/RetryMiddleware.php index 7dab2f9..fb0794c 100644 --- a/src/Retry/RetryMiddleware.php +++ b/src/Retry/RetryMiddleware.php @@ -11,23 +11,11 @@ final class RetryMiddleware implements Middleware { /** - * @var Policy + * @param Policy $default + * @param array $policies */ - private $default; - - /** - * @var array - */ - private $policies; - - /** - * @param Policy $default - * @param Policy[] $policies - */ - public function __construct(Policy $default, array $policies) + public function __construct(private Policy $default, private array $policies) { - $this->default = $default; - $this->policies = $policies; } /** diff --git a/tests/ContextTest.php b/tests/ContextTest.php index 1efc6d7..fab7c8f 100644 --- a/tests/ContextTest.php +++ b/tests/ContextTest.php @@ -6,7 +6,8 @@ use Onliner\CommandBus\Builder; use Onliner\CommandBus\Context; -use Onliner\CommandBus\Message\MessageIterator; +use Onliner\CommandBus\Message\Deferred; +use Onliner\CommandBus\Message\DeferredIterator; use Onliner\CommandBus\Tests\Command; use PHPUnit\Framework\TestCase; @@ -19,14 +20,14 @@ public function testValues(): void 'baz' => 1, ]; - $context = new Context((new Builder())->build(), new MessageIterator(), $options); + $context = new Context((new Builder())->build(), new DeferredIterator(), $options); self::assertEquals($options, $context->all()); } public function testOptions(): void { - $context = new Context((new Builder())->build(), new MessageIterator()); + $context = new Context((new Builder())->build(), new DeferredIterator()); self::assertEmpty($context->all()); @@ -71,17 +72,15 @@ public function testDispatch(): void public function testDefer(): void { - $actualDeferred = new MessageIterator(); - $context = new Context((new Builder())->build(), $actualDeferred); + $iterator = new DeferredIterator(); + $context = new Context((new Builder())->build(), $iterator); $context->defer(new Command\Hello('bar')); - foreach ($actualDeferred as $actualDeferredMessage) { - self::assertIsArray($actualDeferredMessage); - - [$message, $options] = $actualDeferredMessage; - self::assertInstanceOf(Command\Hello::class, $message); - self::assertSame('bar', $message->name); - self::assertIsArray($options); + foreach ($iterator as $deferred) { + self::assertInstanceOf(Deferred::class, $deferred); + self::assertInstanceOf(Command\Hello::class, $deferred->message); + self::assertSame('bar', $deferred->message->name); + self::assertIsArray($deferred->options); } } } diff --git a/tests/Remote/AMQP/AMQPTransportTest.php b/tests/Remote/AMQP/AMQPTransportTest.php index fa9e823..98dd144 100644 --- a/tests/Remote/AMQP/AMQPTransportTest.php +++ b/tests/Remote/AMQP/AMQPTransportTest.php @@ -8,7 +8,9 @@ use Onliner\CommandBus\Remote\AMQP\AMQPTransport; use Onliner\CommandBus\Remote\AMQP\Connector; use Onliner\CommandBus\Remote\AMQP\Exchange; +use Onliner\CommandBus\Remote\AMQP\Router\SimpleRouter; use Onliner\CommandBus\Remote\Envelope; +use Onliner\CommandBus\Tests\Command\Hello; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -38,12 +40,12 @@ public function testCreateWithMalformedUrl(): void public function testSend(): void { - $envelope = new Envelope('target', 'payload', [ + $envelope = new Envelope(Hello::class, 'payload', [ 'foo' => 'bar', ]); $headers = $envelope->headers + [ - 'x-message-type' => $envelope->type, + 'x-message-type' => $envelope->class, ]; $message = new AMQPMessage($envelope->payload, [ @@ -55,7 +57,7 @@ public function testSend(): void $channel ->expects(self::exactly(2)) ->method('basic_publish') - ->with($message, 'amqp.topic', $envelope->type, false, false) + ->with($message, 'amqp.topic', strtolower(str_replace('\\', '.', $envelope->class)), false, false) ; $connector = self::createMock(Connector::class); @@ -65,7 +67,7 @@ public function testSend(): void ->willReturn($channel) ; - $transport = new AMQPTransport($connector, Exchange::create([])); + $transport = new AMQPTransport($connector, Exchange::create([]), new SimpleRouter()); $transport->send($envelope); $transport->send($envelope); } diff --git a/tests/Remote/EnvelopeTest.php b/tests/Remote/EnvelopeTest.php index e84bc58..1ff10e3 100644 --- a/tests/Remote/EnvelopeTest.php +++ b/tests/Remote/EnvelopeTest.php @@ -5,13 +5,14 @@ namespace Onliner\CommandBus\Tests\Remote; use Onliner\CommandBus\Remote\Envelope; +use Onliner\CommandBus\Tests\Command\Hello; use PHPUnit\Framework\TestCase; class EnvelopeTest extends TestCase { public function testSerializeUnserialize(): void { - $target = 'target'; + $target = Hello::class; $payload = 'payload'; $headers = [ 'foo' => 'bar' @@ -19,7 +20,7 @@ public function testSerializeUnserialize(): void $envelope = new Envelope($target, $payload, $headers); - self::assertSame($target, $envelope->type); + self::assertSame($target, $envelope->class); self::assertSame($payload, $envelope->payload); self::assertSame($headers, $envelope->headers); } diff --git a/tests/Remote/GatewayTest.php b/tests/Remote/GatewayTest.php index be312a7..eafd8d1 100644 --- a/tests/Remote/GatewayTest.php +++ b/tests/Remote/GatewayTest.php @@ -6,7 +6,7 @@ use Onliner\CommandBus\Context; use Onliner\CommandBus\Dispatcher; -use Onliner\CommandBus\Message\MessageIterator; +use Onliner\CommandBus\Message\DeferredIterator; use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Remote\Gateway; use Onliner\CommandBus\Remote\Serializer; @@ -28,7 +28,7 @@ public function testSend(): void ]; $dispatcher = new Dispatcher(new CallableResolver()); - $context = new Context($dispatcher, new MessageIterator(), $headers); + $context = new Context($dispatcher, new DeferredIterator(), $headers); $gateway = new Gateway($transport, $serializer); $gateway->send($command, $context); @@ -41,7 +41,7 @@ public function testSend(): void $envelope = reset($queue); self::assertInstanceOf(Envelope::class, $envelope); - self::assertSame(Hello::class, $envelope->type); + self::assertSame(Hello::class, $envelope->class); self::assertSame(serialize($command), $envelope->payload); self::assertSame($headers, $envelope->headers); } diff --git a/tests/Remote/Transport/MemoryTransportTest.php b/tests/Remote/Transport/MemoryTransportTest.php index 545a673..78e1295 100644 --- a/tests/Remote/Transport/MemoryTransportTest.php +++ b/tests/Remote/Transport/MemoryTransportTest.php @@ -6,20 +6,22 @@ use Onliner\CommandBus\Remote\Envelope; use Onliner\CommandBus\Remote\Transport\MemoryTransport; +use Onliner\CommandBus\Tests\Command\Hello; use PHPUnit\Framework\TestCase; class MemoryTransportTest extends TestCase { public function testSendReceive(): void { + $class = Hello::class; $transport = new MemoryTransport(); - $envelope = new Envelope('target', 'payload', [ + $envelope = new Envelope($class, 'payload', [ 'foo' => 'bar', ]); $transport->send($envelope); self::assertEquals([], $transport->receive('unknown')); - self::assertEquals([$envelope], $transport->receive('target')); + self::assertEquals([$envelope], $transport->receive($class)); } } diff --git a/tests/Resolver/MiddlewareResolverTest.php b/tests/Resolver/MiddlewareResolverTest.php index 6f96f36..207d2e2 100644 --- a/tests/Resolver/MiddlewareResolverTest.php +++ b/tests/Resolver/MiddlewareResolverTest.php @@ -6,7 +6,7 @@ use Onliner\CommandBus\Context; use Onliner\CommandBus\Dispatcher; -use Onliner\CommandBus\Message\MessageIterator; +use Onliner\CommandBus\Message\DeferredIterator; use Onliner\CommandBus\Middleware; use Onliner\CommandBus\Resolver; use Onliner\CommandBus\Tests\Command; @@ -45,7 +45,7 @@ public function testMiddleware(): void ->willReturn($handler) ; - $context = new Context(new Dispatcher($parent), new MessageIterator()); + $context = new Context(new Dispatcher($parent), new DeferredIterator()); $middleware = self::createMock(Middleware::class); $middleware From fc6f6599c05ed5a7535c2f245f837fd479d22e50 Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Wed, 20 Dec 2023 11:50:25 +0100 Subject: [PATCH 2/2] Proxy income envelope headers to local context --- src/Context.php | 22 ++++++++++++++++++++++ src/Remote/Gateway.php | 6 +----- src/Remote/RemoteMiddleware.php | 2 +- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/Context.php b/src/Context.php index d1f429f..a004ac3 100644 --- a/src/Context.php +++ b/src/Context.php @@ -8,6 +8,8 @@ final class Context { + private const OPTION_LOCAL = 'local'; + /** * @param Dispatcher $dispatcher * @param DeferredIterator $deferred @@ -31,6 +33,18 @@ public function dispatch(object $message, array $options = []): void $this->dispatcher->dispatch($message, $options); } + /** + * @param object $message + * @param array $options + * @return void + */ + public function execute(object $message, array $options = []): void + { + $this->dispatcher->dispatch($message, array_replace($options, [ + self::OPTION_LOCAL => true, + ])); + } + /** * @param object $message * @param array $options @@ -97,4 +111,12 @@ public function del(string $option): self return $this; } + + /** + * @return bool + */ + public function isLocal(): bool + { + return $this->has(self::OPTION_LOCAL); + } } diff --git a/src/Remote/Gateway.php b/src/Remote/Gateway.php index d933ab4..0e13321 100644 --- a/src/Remote/Gateway.php +++ b/src/Remote/Gateway.php @@ -8,8 +8,6 @@ final class Gateway { - public const OPTION_LOCAL = 'local'; - /** * @param Transport $transport * @param Serializer $serializer @@ -41,8 +39,6 @@ public function receive(Envelope $envelope, Context $context): void { $message = $this->serializer->unserialize($envelope); - $context->dispatch($message, [ - self::OPTION_LOCAL => true, - ]); + $context->execute($message, $envelope->headers); } } diff --git a/src/Remote/RemoteMiddleware.php b/src/Remote/RemoteMiddleware.php index b274812..04d458d 100644 --- a/src/Remote/RemoteMiddleware.php +++ b/src/Remote/RemoteMiddleware.php @@ -37,6 +37,6 @@ public function call(object $message, Context $context, callable $next): void */ private function isLocal(string $class, Context $context): bool { - return $context->has(Gateway::OPTION_LOCAL) || in_array($class, $this->local); + return $context->isLocal() || in_array($class, $this->local); } }