Skip to content

Commit

Permalink
Added mandatory support; Simplify AMQP transport; Allow queue multipl…
Browse files Browse the repository at this point in the history
…e bindings
  • Loading branch information
zloyuser committed Aug 13, 2024
1 parent 23e661a commit 3ab416a
Show file tree
Hide file tree
Showing 23 changed files with 223 additions and 197 deletions.
10 changes: 3 additions & 7 deletions examples/multi/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transportFoo = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);
$transportBar = AMQPTransport::create('amqp://guest:guest@localhost:5673', [
'exchange' => 'bar',
]);
$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo');
$transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar');

$transport = new Transport\MultiTransport($transportFoo);
$transport->add('Bar\*', $transportBar);
Expand Down
8 changes: 4 additions & 4 deletions examples/multi/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5673', [
$transport = Transport::create('amqp://guest:guest@localhost:5673', [
'exchange' => 'bar',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
8 changes: 4 additions & 4 deletions examples/multi/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
4 changes: 2 additions & 2 deletions examples/remote/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

Expand Down
12 changes: 6 additions & 6 deletions examples/remote/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;
use Onliner\CommandBus\Remote\AMQP\Queue;

/** @var Builder $builder */
Expand All @@ -17,11 +17,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();

$pattern = $argv[1] ?? '#';
Expand All @@ -39,6 +39,6 @@
}

$consumer->run($dispatcher, [
AMQPConsumer::OPTION_ATTEMPTS => 10,
AMQPConsumer::OPTION_INTERVAL => 100000, // 100 ms
Consumer::OPTION_ATTEMPTS => 10,
Consumer::OPTION_INTERVAL => 100000, // 100 ms
]);
4 changes: 2 additions & 2 deletions examples/router/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
'routes' => [
'Bar\*' => 'bar',
Expand Down
8 changes: 4 additions & 4 deletions examples/router/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'bar',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
8 changes: 4 additions & 4 deletions examples/router/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
17 changes: 0 additions & 17 deletions src/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

final class Context
{
private const OPTION_LOCAL = 'local';

/**
* @param array<string, mixed> $options
*/
Expand All @@ -27,16 +25,6 @@ public function dispatch(object $message, array $options = []): void
$this->dispatcher->dispatch($message, $options);
}

/**
* @param array<string, mixed> $options
*/
public function execute(object $message, array $options = []): void
{
$this->dispatcher->dispatch($message, array_replace($options, [
self::OPTION_LOCAL => true,
]));
}

/**
* @param array<string, mixed> $options
*/
Expand Down Expand Up @@ -78,9 +66,4 @@ public function del(string $option): self

return $this;
}

public function isLocal(): bool
{
return $this->has(self::OPTION_LOCAL);
}
}
65 changes: 0 additions & 65 deletions src/Remote/AMQP/AMQPTransport.php

This file was deleted.

46 changes: 13 additions & 33 deletions src/Remote/AMQP/AMQPConsumer.php → src/Remote/AMQP/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
namespace Onliner\CommandBus\Remote\AMQP;

use Onliner\CommandBus\Dispatcher;
use Onliner\CommandBus\Remote\Consumer;
use Onliner\CommandBus\Remote\Envelope;
use Onliner\CommandBus\Remote\Consumer as ConsumerContract;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;

final class AMQPConsumer implements Consumer
final class Consumer implements ConsumerContract
{
public const
OPTION_ATTEMPTS = 'attempts',
Expand All @@ -39,15 +37,18 @@ final class AMQPConsumer implements Consumer

public function __construct(
private Connector $connector,
private Exchange $exchange,
private Packager $packager,
LoggerInterface $logger = null,
) {
$this->logger = $logger ?? new NullLogger();
}

public function listen(string $pattern): void
/**
* @param string|array<string> $bindings
*/
public function listen(string $name, array|string $bindings = [], Flags $flags = null): void
{
$this->consume(new Queue($pattern, $pattern, $this->exchange->flags));
$this->consume(new Queue($name, $name, (array) $bindings, $flags ?? Flags::default()));
}

public function consume(Queue $queue): void
Expand Down Expand Up @@ -101,10 +102,8 @@ private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
}
};

$this->exchange->declare($channel);

foreach ($this->queues as $queue) {
$queue->consume($channel, $this->exchange, $handler);
$queue->consume($channel, $handler);
}

return $channel;
Expand Down Expand Up @@ -146,31 +145,12 @@ private function connect(array $options): AMQPChannel

private function handle(AMQPMessage $message, Dispatcher $dispatcher): void
{
$headers = $message->get('application_headers');

if (!$headers instanceof AMQPAbstractCollection) {
$this->logger->warning('Message headers not found.');

return;
}

$headers = array_replace($headers->getNativeData(), [
Exchange::HEADER_EXCHANGE => $message->getExchange(),
Exchange::HEADER_REDELIVERED => $message->isRedelivered(),
Exchange::HEADER_ROUTING_KEY => $message->getRoutingKey(),
Exchange::HEADER_CONSUMER_TAG => $message->getConsumerTag(),
Exchange::HEADER_DELIVERY_TAG => $message->getDeliveryTag(),
]);

if (!isset($headers[Exchange::HEADER_MESSAGE_TYPE])) {
$this->logger->warning(sprintf('Header "%s" not found in message.', Exchange::HEADER_MESSAGE_TYPE));

return;
if ($message->isRedelivered()) {
throw new AMQPIOException('Message redelivered');
}

/** @var class-string $class */
$class = $headers[Exchange::HEADER_MESSAGE_TYPE];
$envelope = $this->packager->unpack($message);

$dispatcher->dispatch(new Envelope($class, $message->getBody(), $headers));
$dispatcher->dispatch($envelope);
}
}
Loading

0 comments on commit 3ab416a

Please sign in to comment.