Skip to content

Commit

Permalink
Push redelivered messages to queue end; Examples fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Aug 13, 2024
1 parent 3ab416a commit d32b53d
Show file tree
Hide file tree
Showing 15 changed files with 63 additions and 136 deletions.
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
phpstan:
#!make

help: ## Show this help
@printf "\033[33m%s:\033[0m\n" 'Run: make <target> where <target> is one of the following'
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf " \033[32m%-18s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)

phpstan: ## Run PHPStan
vendor/bin/phpstan analyse --level max src tests

test: phpstan
test: phpstan ## Run PHPUnit
vendor/bin/phpunit --verbose
7 changes: 3 additions & 4 deletions examples/multi/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;
use Onliner\CommandBus\Remote\Transport\MultiTransport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo');
$transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar');

$transport = new Transport\MultiTransport($transportFoo);
$transport = new MultiTransport($transportFoo);
$transport->add('Bar\*', $transportBar);

return (new Builder())
->use(new RemoteExtension($transport))
;
->use(new RemoteExtension($transport));
12 changes: 3 additions & 9 deletions examples/multi/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@
echo sprintf('Hello %s from bar!', $command->name), PHP_EOL;
});

$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5673', [
'exchange' => 'bar',
]);

/** @var Consumer $consumer */
$transport = Transport::create('amqp://guest:guest@localhost:5673');
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->listen('#', 'bar');
$consumer->run($builder->build());
12 changes: 3 additions & 9 deletions examples/multi/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,7 @@
echo sprintf('Hello %s from foo!', $command->name), PHP_EOL;
});

$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var Consumer $consumer */
$transport = Transport::create('amqp://guest:guest@localhost:5672');
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->listen('#', 'foo');
$consumer->run($builder->build());
30 changes: 6 additions & 24 deletions examples/multi/messages.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,17 @@
namespace Foo {
class Hello
{
/**
* @var string
*/
public $name;

/**
* @param string $name
*/
public function __construct(string $name)
{
$this->name = $name;
}
public function __construct(
public string $name,
) {}
}
}

namespace Bar {
class Hello
{
/**
* @var string
*/
public $name;

/**
* @param string $name
*/
public function __construct(string $name)
{
$this->name = $name;
}
public function __construct(
public string $name,
) {}
}
}
7 changes: 2 additions & 5 deletions examples/remote/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);
$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo');

return (new Builder())
->use(new RemoteExtension($transport))
;
->use(new RemoteExtension($transport));
13 changes: 4 additions & 9 deletions examples/remote/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,25 @@
echo 'CONTENT: ', $command->content, \PHP_EOL;
});

$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);

/** @var Consumer $consumer */
$transport = Transport::create('amqp://guest:guest@localhost:5672');
$consumer = $transport->consume();

$pattern = $argv[1] ?? '#';
$priority = isset($argv[2]) ? (int) $argv[2] : 0;

if ($priority === 0) {
$consumer->listen($pattern);
$consumer->listen($pattern, 'foo');
} else {
$consumer->consume(Queue::create([
'pattern' => $pattern,
'bindings' => ['foo'],
'args' => [
Queue::MAX_PRIORITY => $priority,
],
]));
}

$consumer->run($dispatcher, [
$consumer->run($builder->build(), [
Consumer::OPTION_ATTEMPTS => 10,
Consumer::OPTION_INTERVAL => 100000, // 100 ms
]);
31 changes: 5 additions & 26 deletions examples/remote/messages.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,9 @@

class SendEmail
{
/**
* @var string
*/
public $to;

/**
* @var string
*/
public $subject;

/**
* @var string
*/
public $content;

/**
* @param string $to
* @param string $subject
* @param string $content
*/
public function __construct(string $to, string $subject, string $content)
{
$this->to = $to;
$this->subject = $subject;
$this->content = $content;
}
public function __construct(
public string $to,
public string $subject,
public string $content,
) {}
}
11 changes: 3 additions & 8 deletions examples/router/builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@
use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
'routes' => [
'Bar\*' => 'bar',
],
$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo', [
'Bar\*' => 'bar',
]);

return (new Builder())
->use(new RemoteExtension($transport))
;
->use(new RemoteExtension($transport));
7 changes: 2 additions & 5 deletions examples/router/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
});

$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'bar',
]);
$transport = Transport::create('amqp://guest:guest@localhost:5672');

/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->listen('#', 'bar');
$consumer->run($dispatcher);
6 changes: 2 additions & 4 deletions examples/router/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@

$dispatcher = $builder->build();

$transport = Transport::create('amqp://guest:guest@localhost:5672', [
'exchange' => 'foo',
]);
$transport = Transport::create('amqp://guest:guest@localhost:5672');

/** @var Consumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->listen('#', 'foo');
$consumer->run($dispatcher);
30 changes: 6 additions & 24 deletions examples/router/messages.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,17 @@
namespace Foo {
class Hello
{
/**
* @var string
*/
public $name;

/**
* @param string $name
*/
public function __construct(string $name)
{
$this->name = $name;
}
public function __construct(
public string $name,
) {}
}
}

namespace Bar {
class Hello
{
/**
* @var string
*/
public $name;

/**
* @param string $name
*/
public function __construct(string $name)
{
$this->name = $name;
}
public function __construct(
public string $name,
) {}
}
}
14 changes: 10 additions & 4 deletions src/Remote/AMQP/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public function stop(): void
private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
{
$channel = $this->connect($options);
$handler = function (AMQPMessage $message) use ($dispatcher) {
$handler = function (AMQPMessage $message) use ($channel, $dispatcher) {
try {
$this->handle($message, $dispatcher);
$this->handle($message, $channel, $dispatcher);
} catch (Throwable $error) {
$this->logger->error((string) $error);
} finally {
Expand Down Expand Up @@ -143,10 +143,16 @@ private function connect(array $options): AMQPChannel
throw new AMQPIOException();
}

private function handle(AMQPMessage $message, Dispatcher $dispatcher): void
private function handle(AMQPMessage $message, AMQPChannel $channel, Dispatcher $dispatcher): void
{
if ($message->isRedelivered()) {
throw new AMQPIOException('Message redelivered');
$channel->basic_publish(
new AMQPMessage($message->body, $message->get_properties()),
(string) $message->getExchange(),
(string) $message->getRoutingKey(),
);

return;
}

$envelope = $this->packager->unpack($message);
Expand Down
5 changes: 4 additions & 1 deletion src/Remote/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Onliner\CommandBus\Remote;

use Onliner\CommandBus\Context;
use Onliner\CommandBus\Remote\AMQP\Packager;

final class Gateway
{
Expand All @@ -24,6 +25,8 @@ public function receive(Envelope $envelope, Context $context): void
{
$message = $this->serializer->unserialize($envelope);

$context->execute($message, $envelope->headers);
$context->dispatch($message, array_replace($envelope->headers, [
Packager::OPTION_LOCAL => true,
]));
}
}
4 changes: 2 additions & 2 deletions tests/Remote/AMQP/TransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function testSend(): void
$channel
->expects(self::exactly(2))
->method('basic_publish')
->with($message, 'amqp.topic', strtolower(str_replace('\\', '.', $envelope->class)), false, false)
->with($message, 'foo', strtolower(str_replace('\\', '.', $envelope->class)), false, false)
;

$connector = self::createMock(Connector::class);
Expand All @@ -67,7 +67,7 @@ public function testSend(): void
->willReturn($channel)
;

$transport = new Transport($connector, new Packager(), new SimpleRouter());
$transport = new Transport($connector, new Packager(), new SimpleRouter('foo'));
$transport->send($envelope);
$transport->send($envelope);
}
Expand Down

0 comments on commit d32b53d

Please sign in to comment.