Skip to content

Commit

Permalink
Added exchange declare helper
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Aug 13, 2024
1 parent d32b53d commit e13bd25
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 13 deletions.
3 changes: 3 additions & 0 deletions examples/multi/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

Expand All @@ -13,6 +14,8 @@
});

$transport = Transport::create('amqp://guest:guest@localhost:5673');
$transport->declare(Exchange::create(['name' => 'bar']));

$consumer = $transport->consume();
$consumer->listen('#', 'bar');
$consumer->run($builder->build());
3 changes: 3 additions & 0 deletions examples/multi/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

Expand All @@ -13,6 +14,8 @@
});

$transport = Transport::create('amqp://guest:guest@localhost:5672');
$transport->declare(Exchange::create(['name' => 'foo']));

$consumer = $transport->consume();
$consumer->listen('#', 'foo');
$consumer->run($builder->build());
3 changes: 3 additions & 0 deletions examples/remote/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;
use Onliner\CommandBus\Remote\AMQP\Queue;
Expand All @@ -16,6 +17,8 @@
});

$transport = Transport::create('amqp://guest:guest@localhost:5672');
$transport->declare(Exchange::create(['name' => 'foo']));

$consumer = $transport->consume();

$pattern = $argv[1] ?? '#';
Expand Down
2 changes: 2 additions & 0 deletions examples/router/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

Expand All @@ -14,6 +15,7 @@

$dispatcher = $builder->build();
$transport = Transport::create('amqp://guest:guest@localhost:5672');
$transport->declare(Exchange::create(['name' => 'bar']));

/** @var Consumer $consumer */
$consumer = $transport->consume();
Expand Down
2 changes: 2 additions & 0 deletions examples/router/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;

Expand All @@ -15,6 +16,7 @@
$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5672');
$transport->declare(Exchange::create(['name' => 'foo']));

/** @var Consumer $consumer */
$consumer = $transport->consume();
Expand Down
2 changes: 1 addition & 1 deletion src/Remote/AMQP/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static function create(array $options): self
throw new InvalidArgumentException('Exchange type must be a string');
}

$name = $options['exchange'] ?? sprintf('amqp.%s', $type);
$name = $options['name'] ?? sprintf('amqp.%s', $type);
$args = $options['args'] ?? [];

if (!is_string($name)) {
Expand Down
7 changes: 3 additions & 4 deletions src/Remote/AMQP/Packager.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ public function __construct(

public function pack(Envelope $envelope): AMQPMessage
{
$headers = array_replace($envelope->headers, [
Headers::MESSAGE_TYPE => $envelope->class,
]);
$headers = new AMQPTable($envelope->headers);
$headers->set(Headers::MESSAGE_TYPE, $envelope->class);

return new AMQPMessage($envelope->payload, [
'delivery_mode' => $this->deliveryMode,
'application_headers' => new AMQPTable($headers),
'application_headers' => $headers,
]);
}

Expand Down
9 changes: 9 additions & 0 deletions src/Remote/AMQP/Transport.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ public function consume(): Consumer
{
return new Consumer($this->connector, $this->packager, $this->logger);
}

public function declare(Exchange|array $exchange): void

Check failure on line 49 in src/Remote/AMQP/Transport.php

View workflow job for this annotation

GitHub Actions / PHP 8.1 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\AMQP\Transport::declare() has parameter $exchange with no value type specified in iterable type array.

Check failure on line 49 in src/Remote/AMQP/Transport.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\AMQP\Transport::declare() has parameter $exchange with no value type specified in iterable type array.
{
if (is_array($exchange)) {
$exchange = Exchange::create($exchange);
}

$exchange->declare($this->connector->connect());
}
}
14 changes: 6 additions & 8 deletions tests/Remote/AMQP/TransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use InvalidArgumentException;
use Onliner\CommandBus\Remote\AMQP\Connector;
use Onliner\CommandBus\Remote\AMQP\Headers;
use Onliner\CommandBus\Remote\AMQP\Packager;
use Onliner\CommandBus\Remote\AMQP\SimpleRouter;
use Onliner\CommandBus\Remote\AMQP\Transport;
Expand Down Expand Up @@ -44,28 +45,25 @@ public function testSend(): void
'foo' => 'bar',
]);

$headers = $envelope->headers + [
'x-message-type' => $envelope->class,
];
$headers = new AMQPTable($envelope->headers);
$headers->set(Headers::MESSAGE_TYPE, $envelope->class);

$message = new AMQPMessage($envelope->payload, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => $headers,
]);
$message->set('application_headers', new AMQPTable($headers));

$channel = self::createMock(AMQPChannel::class);
$channel
->expects(self::exactly(2))
->method('basic_publish')
->with($message, 'foo', strtolower(str_replace('\\', '.', $envelope->class)), false, false)
;
->with($message, 'foo', strtolower(str_replace('\\', '.', $envelope->class)), false, false);

$connector = self::createMock(Connector::class);
$connector
->expects(self::exactly(2))
->method('connect')
->willReturn($channel)
;
->willReturn($channel);

$transport = new Transport($connector, new Packager(), new SimpleRouter('foo'));
$transport->send($envelope);
Expand Down

0 comments on commit e13bd25

Please sign in to comment.