Skip to content

Commit

Permalink
Added mandatory support; Simplify AMQP transport; Allow queue multipl…
Browse files Browse the repository at this point in the history
…e bindings
  • Loading branch information
zloyuser committed Aug 13, 2024
1 parent 23e661a commit bc0eb9b
Show file tree
Hide file tree
Showing 21 changed files with 221 additions and 174 deletions.
10 changes: 3 additions & 7 deletions examples/multi/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transportFoo = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);
$transportBar = AMQPTransport::create('amqp://guest:guest@localhost:5673', [
'exchange' => 'bar',
]);
$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo');
$transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar');

$transport = new Transport\MultiTransport($transportFoo);
$transport->add('Bar\*', $transportBar);
Expand Down
8 changes: 4 additions & 4 deletions examples/multi/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5673', [
$transport = Transport::create('amqp://guest:guest@localhost:5673', [
'exchange' => 'bar',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
8 changes: 4 additions & 4 deletions examples/multi/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
4 changes: 2 additions & 2 deletions examples/remote/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

Expand Down
12 changes: 6 additions & 6 deletions examples/remote/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;
use Onliner\CommandBus\Remote\AMQP\Queue;

/** @var Builder $builder */
Expand All @@ -17,11 +17,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();

$pattern = $argv[1] ?? '#';
Expand All @@ -39,6 +39,6 @@
}

$consumer->run($dispatcher, [
AMQPConsumer::OPTION_ATTEMPTS => 10,
AMQPConsumer::OPTION_INTERVAL => 100000, // 100 ms
Consumer::OPTION_ATTEMPTS => 10,
Consumer::OPTION_INTERVAL => 100000, // 100 ms
]);
4 changes: 2 additions & 2 deletions examples/router/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
'routes' => [
'Bar\*' => 'bar',
Expand Down
8 changes: 4 additions & 4 deletions examples/router/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'bar',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
8 changes: 4 additions & 4 deletions examples/router/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\AMQPTransport;
use Onliner\CommandBus\Remote\AMQP\AMQPConsumer;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
Expand All @@ -14,11 +14,11 @@

$dispatcher = $builder->build();

$transport = AMQPTransport::create('amqp://guest:guest@localhost:5672', [
$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var AMQPConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
65 changes: 0 additions & 65 deletions src/Remote/AMQP/AMQPTransport.php

This file was deleted.

46 changes: 13 additions & 33 deletions src/Remote/AMQP/AMQPConsumer.php → src/Remote/AMQP/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
namespace Onliner\CommandBus\Remote\AMQP;

use Onliner\CommandBus\Dispatcher;
use Onliner\CommandBus\Remote\Consumer;
use Onliner\CommandBus\Remote\Envelope;
use Onliner\CommandBus\Remote\Consumer as ConsumerContract;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;

final class AMQPConsumer implements Consumer
final class Consumer implements ConsumerContract
{
public const
OPTION_ATTEMPTS = 'attempts',
Expand All @@ -39,15 +37,18 @@ final class AMQPConsumer implements Consumer

public function __construct(
private Connector $connector,
private Exchange $exchange,
private Packager $packager,
LoggerInterface $logger = null,
) {
$this->logger = $logger ?? new NullLogger();
}

public function listen(string $pattern): void
/**
* @param string|array<string> $bindings
*/
public function listen(string $name, array|string $bindings = [], Flags $flags = null): void
{
$this->consume(new Queue($pattern, $pattern, $this->exchange->flags));
$this->consume(new Queue($name, $name, (array) $bindings, $flags ?? Flags::default()));
}

public function consume(Queue $queue): void
Expand Down Expand Up @@ -101,10 +102,8 @@ private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
}
};

$this->exchange->declare($channel);

foreach ($this->queues as $queue) {
$queue->consume($channel, $this->exchange, $handler);
$queue->consume($channel, $handler);
}

return $channel;
Expand Down Expand Up @@ -146,31 +145,12 @@ private function connect(array $options): AMQPChannel

private function handle(AMQPMessage $message, Dispatcher $dispatcher): void
{
$headers = $message->get('application_headers');

if (!$headers instanceof AMQPAbstractCollection) {
$this->logger->warning('Message headers not found.');

return;
}

$headers = array_replace($headers->getNativeData(), [
Exchange::HEADER_EXCHANGE => $message->getExchange(),
Exchange::HEADER_REDELIVERED => $message->isRedelivered(),
Exchange::HEADER_ROUTING_KEY => $message->getRoutingKey(),
Exchange::HEADER_CONSUMER_TAG => $message->getConsumerTag(),
Exchange::HEADER_DELIVERY_TAG => $message->getDeliveryTag(),
]);

if (!isset($headers[Exchange::HEADER_MESSAGE_TYPE])) {
$this->logger->warning(sprintf('Header "%s" not found in message.', Exchange::HEADER_MESSAGE_TYPE));

return;
if ($message->isRedelivered()) {
throw new AMQPIOException('Message redelivered');
}

/** @var class-string $class */
$class = $headers[Exchange::HEADER_MESSAGE_TYPE];
$envelope = $this->packager->unpack($message);

$dispatcher->dispatch(new Envelope($class, $message->getBody(), $headers));
$dispatcher->dispatch($envelope);
}
}
23 changes: 7 additions & 16 deletions src/Remote/AMQP/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@ final class Exchange
TYPE_DELAYED = 'x-delayed-message'
;

public const
HEADER_EXCHANGE = 'exchange',
HEADER_REDELIVERED = 'redelivered',
HEADER_ROUTING_KEY = 'routing_key',
HEADER_CONSUMER_TAG = 'consumer_tag',
HEADER_DELIVERY_TAG = 'delivery_tag',
HEADER_MESSAGE_TYPE = 'x-message-type'
;

/**
* @param array<string, string> $args
*/
public function __construct(
public string $name,
public string $type,
public AMQPFlags $flags,
public Flags $flags,
public array $args = [],
) {}

Expand Down Expand Up @@ -63,7 +54,7 @@ public static function create(array $options): self
$args['x-delayed-type'] = self::TYPE_TOPIC;
}

return new self($name, $type, AMQPFlags::compute($options), $args);
return new self($name, $type, Flags::compute($options), $args);
}

public function is(int $flag): bool
Expand All @@ -76,11 +67,11 @@ public function declare(AMQPChannel $channel): void
$channel->exchange_declare(
$this->name,
$this->type,
$this->flags->is(AMQPFlags::PASSIVE),
$this->flags->is(AMQPFlags::DURABLE),
$this->flags->is(AMQPFlags::DELETE),
$this->flags->is(AMQPFlags::INTERNAL),
$this->flags->is(AMQPFlags::NO_WAIT),
$this->flags->is(Flags::PASSIVE),
$this->flags->is(Flags::DURABLE),
$this->flags->is(Flags::DELETE),
$this->flags->is(Flags::INTERNAL),
$this->flags->is(Flags::NO_WAIT),
new AMQPTable($this->args)
);
}
Expand Down
Loading

0 comments on commit bc0eb9b

Please sign in to comment.