Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Jan 23, 2024
1 parent 0772d37 commit 7e0a12f
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 39 deletions.
2 changes: 1 addition & 1 deletion examples/multi/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
/** @var AMQPConsumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->run([$dispatcher, 'dispatch']);
2 changes: 1 addition & 1 deletion examples/multi/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
/** @var AMQPConsumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->run([$dispatcher, 'dispatch']);
2 changes: 1 addition & 1 deletion examples/remote/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
]));
}

$consumer->run($dispatcher, [
$consumer->run([$dispatcher, 'dispatch'], [
AMQPConsumer::OPTION_ATTEMPTS => 10,
AMQPConsumer::OPTION_INTERVAL => 100000, // 100 ms
]);
2 changes: 1 addition & 1 deletion examples/router/consumer_bar.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
/** @var AMQPConsumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->run([$dispatcher, 'dispatch']);
2 changes: 1 addition & 1 deletion examples/router/consumer_foo.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
/** @var AMQPConsumer $consumer */
$consumer = $transport->consume();
$consumer->listen('#');
$consumer->run($dispatcher);
$consumer->run([$dispatcher, 'dispatch']);
2 changes: 1 addition & 1 deletion src/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

final class Context
{
private const OPTION_LOCAL = 'local';
public const OPTION_LOCAL = 'local';

/**
* @param Dispatcher $dispatcher
Expand Down
13 changes: 13 additions & 0 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,17 @@ public function dispatch(object $message, array $options = []): void
$this->dispatch($deferred->message, $deferred->options);
}
}

/**
* @param object $message
* @param array<string, mixed> $options
*
* @return void
*/
public function execute(object $message, array $options = []): void
{
$this->dispatch($message, array_replace($options, [
Context::OPTION_LOCAL => true,
]));
}
}
43 changes: 21 additions & 22 deletions src/Remote/AMQP/AMQPConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Onliner\CommandBus\Remote\AMQP;

use Onliner\CommandBus\Dispatcher;
use Onliner\CommandBus\Remote\Consumer;
use Onliner\CommandBus\Remote\Envelope;
use PhpAmqpLib\Channel\AMQPChannel;
Expand Down Expand Up @@ -51,13 +50,14 @@ public function __construct(
}

/**
* @param string $pattern
* @param string $name
* @param string|null $pattern
*
* @return void
*/
public function listen(string $pattern): void
public function listen(string $name, string $pattern = null): void
{
$this->consume(new Queue($pattern, $pattern, $this->exchange->flags()));
$this->consume(new Queue($name, $pattern, $this->exchange->flags()));
}

/**
Expand All @@ -71,11 +71,11 @@ public function consume(Queue $queue): void
/**
* {@inheritDoc}
*/
public function run(Dispatcher $dispatcher, array $options = []): void
public function run(callable $handler, array $options = []): void
{
$this->running = true;

$channel = $this->channel($dispatcher, $options);
$channel = $this->channel($handler, $options);

/** @phpstan-ignore-next-line */
while ($this->running && $channel->is_consuming()) {
Expand All @@ -87,7 +87,7 @@ public function run(Dispatcher $dispatcher, array $options = []): void
throw $error;
}

$channel = $this->channel($dispatcher, $options);
$channel = $this->channel($handler, $options);
} catch (Throwable $error) {
$this->logger->error((string) $error);
}
Expand All @@ -105,29 +105,28 @@ public function stop(): void
}

/**
* @param Dispatcher $dispatcher
* @param callable $handler
* @param array<string, mixed> $options
*
* @return AMQPChannel
* @throws AMQPIOException
*/
private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
private function channel(callable $handler, array $options): AMQPChannel
{
$channel = $this->connect($options);
$handler = function (AMQPMessage $message) use ($dispatcher) {
try {
$this->handle($message, $dispatcher);
} catch (Throwable $error) {
$this->logger->error((string) $error);
} finally {
$message->ack();
}
};

$this->exchange->declare($channel);

foreach ($this->queues as $queue) {
$queue->consume($channel, $this->exchange, $handler);
$queue->consume($channel, $this->exchange, function (AMQPMessage $message) use ($handler) {
try {
$this->handle($message, $handler);
} catch (Throwable $error) {
$this->logger->error((string) $error);
} finally {
$message->ack();
}
});
}

return $channel;
Expand Down Expand Up @@ -172,11 +171,11 @@ private function connect(array $options): AMQPChannel

/**
* @param AMQPMessage $message
* @param Dispatcher $dispatcher
* @param callable $handler
*
* @return void
*/
private function handle(AMQPMessage $message, Dispatcher $dispatcher): void
private function handle(AMQPMessage $message, callable $handler): void
{
$headers = $message->get('application_headers');

Expand All @@ -203,6 +202,6 @@ private function handle(AMQPMessage $message, Dispatcher $dispatcher): void
/** @var class-string $class */
$class = $headers[Exchange::HEADER_MESSAGE_TYPE];

$dispatcher->dispatch(new Envelope($class, $message->getBody(), $headers));
$handler(new Envelope($class, $message->getBody(), $headers));
}
}
6 changes: 2 additions & 4 deletions src/Remote/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@

namespace Onliner\CommandBus\Remote;

use Onliner\CommandBus\Dispatcher;

interface Consumer
{
/**
* @param Dispatcher $dispatcher
* @param callable $handler
* @param array<string, mixed> $options
*
* @return void
*/
public function run(Dispatcher $dispatcher, array $options = []): void;
public function run(callable $handler, array $options = []): void;

/**
* @return void
Expand Down
15 changes: 8 additions & 7 deletions src/Remote/Transport/MemoryTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Onliner\CommandBus\Remote\Transport;

use Onliner\CommandBus\Dispatcher;
use Onliner\CommandBus\Remote\Consumer;
use Onliner\CommandBus\Remote\Envelope;
use Onliner\CommandBus\Remote\Transport;
Expand Down Expand Up @@ -41,7 +40,7 @@ public function consume(): Consumer
/**
* {@inheritDoc}
*/
public function run(Dispatcher $dispatcher, array $options = []): void
public function run(callable $handler, array $options = []): void
{
if ($this->running) {
return;
Expand All @@ -53,8 +52,8 @@ public function run(Dispatcher $dispatcher, array $options = []): void
foreach ($this->envelopes as $type => $envelopes) {
foreach ($envelopes as $i => $envelope) {
try {
$dispatcher->dispatch($envelope);
} catch (Throwable $error) {
$handler($envelope);
} catch (Throwable) {
unset($this->envelopes[$type][$i]);
}
}
Expand Down Expand Up @@ -87,13 +86,15 @@ public function empty(): bool
}

/**
* @param string $type
* @param string|null $type
*
* @return array<Envelope>
*/
public function receive(string $type): array
public function receive(string $type = null): array
{
return $this->envelopes[$type] ?? [];
return $type !== null

Check failure on line 95 in src/Remote/Transport/MemoryTransport.php

View workflow job for this annotation

GitHub Actions / PHP 8.0 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\Transport\MemoryTransport::receive() should return array<Onliner\CommandBus\Remote\Envelope> but returns array<array<Onliner\CommandBus\Remote\Envelope>|Onliner\CommandBus\Remote\Envelope>.

Check failure on line 95 in src/Remote/Transport/MemoryTransport.php

View workflow job for this annotation

GitHub Actions / PHP 8.1 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\Transport\MemoryTransport::receive() should return array<Onliner\CommandBus\Remote\Envelope> but returns array<array<Onliner\CommandBus\Remote\Envelope>|Onliner\CommandBus\Remote\Envelope>.

Check failure on line 95 in src/Remote/Transport/MemoryTransport.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\Transport\MemoryTransport::receive() should return array<Onliner\CommandBus\Remote\Envelope> but returns array<array<Onliner\CommandBus\Remote\Envelope>|Onliner\CommandBus\Remote\Envelope>.

Check failure on line 95 in src/Remote/Transport/MemoryTransport.php

View workflow job for this annotation

GitHub Actions / PHP 8.3 Test on ubuntu-latest

Method Onliner\CommandBus\Remote\Transport\MemoryTransport::receive() should return array<Onliner\CommandBus\Remote\Envelope> but returns array<array<Onliner\CommandBus\Remote\Envelope>|Onliner\CommandBus\Remote\Envelope>.
? $this->envelopes[$type] ?? []
: $this->envelopes;
}

/**
Expand Down

0 comments on commit 7e0a12f

Please sign in to comment.