Skip to content

Commit

Permalink
PR fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Aug 15, 2024
1 parent afcfef8 commit 10e44cc
Show file tree
Hide file tree
Showing 33 changed files with 130 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,16 @@

class Task
{
public $task;

public function __construct(string $task)
{
$this->task = $task;
}
public function __construct(
public string $task,
) {}
}

class ResultMessage
{
public $message;

public function __construct(string $message)
{
$this->message = $message;
}
public function __construct(
public string $message,
) {}
}

class TaskHandler
Expand Down Expand Up @@ -61,4 +55,3 @@ public function __invoke(ResultMessage $command, Context $context)
* Task: build report processing end
* Result: success
*/

9 changes: 3 additions & 6 deletions examples/hello.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@

class Hello
{
public $message;

public function __construct(string $message)
{
$this->message = $message;
}
public function __construct(
public string $message,
) {}
}

$dispatcher = (new Builder())
Expand Down
9 changes: 3 additions & 6 deletions examples/invoke.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@

class Hello
{
public $message;

public function __construct(string $message)
{
$this->message = $message;
}
public function __construct(
public string $message,
) {}
}

class HelloHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/../../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo');
Expand Down
43 changes: 43 additions & 0 deletions examples/remote/bindings/consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Context;
use Onliner\CommandBus\Remote\AMQP\Exchange;
use Onliner\CommandBus\Remote\AMQP\Flags;
use Onliner\CommandBus\Remote\AMQP\Packager;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\AMQP\Consumer;
use Onliner\CommandBus\Remote\AMQP\Queue;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';
$builder->handle(SendEmail::class, function (SendEmail $command, Context $context) {
$exchange = $context->get(Packager::OPTION_EXCHANGE);
$routingKey = $context->get(Packager::OPTION_ROUTING_KEY);

echo sprintf('Received message from %s with routing key %s', $exchange, $routingKey), PHP_EOL;

// Throw exception to trigger DLE
if ($exchange === 'foo') {
throw new Exception("Something went wrong...");
}
});

$transport = Transport::create('amqp://guest:guest@localhost:5672');
$transport->declare(Exchange::create(['name' => 'dle']));
$transport->declare(Exchange::create(['name' => 'foo']));

$consumer = $transport->consume();
$consumer->consume(new Queue('my-queue', [
'foo' => '#',
'dle' => 'sendemail',
], Flags::default(), args: [
Queue::DEAD_LETTER => 'dle',
]));

$consumer->run($builder->build(), [
Consumer::OPTION_ATTEMPTS => 10,
Consumer::OPTION_INTERVAL => 100000, // 100 ms
]);
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Onliner\CommandBus\Remote\RemoteExtension;
use Onliner\CommandBus\Remote\Transport\MultiTransport;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/../../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo');
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;

require __DIR__ . '/../../vendor/autoload.php';
require __DIR__ . '/../../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo', [
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
15 changes: 15 additions & 0 deletions examples/remote/simple/builder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Remote\AMQP\Transport;
use Onliner\CommandBus\Remote\RemoteExtension;

require __DIR__ . '/../../../vendor/autoload.php';
require __DIR__ . '/messages.php';

$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo');

return (new Builder())
->use(new RemoteExtension($transport));
File renamed without changes.
12 changes: 12 additions & 0 deletions examples/remote/simple/messages.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

class SendEmail
{
public function __construct(
public string $to,
public string $subject,
public string $content,
) {}
}
11 changes: 11 additions & 0 deletions examples/remote/simple/sender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

use Onliner\CommandBus\Builder;

/** @var Builder $builder */
$builder = require __DIR__ . '/builder.php';

$dispatcher = $builder->build();
$dispatcher->dispatch(new SendEmail('[email protected]', 'Onliner', 'Hello world!'));
5 changes: 4 additions & 1 deletion src/Middleware/LoggerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ public function call(object $message, Context $context, callable $next): void
try {
$next($message, $context);
} catch (Throwable $error) {
$this->logger->log($this->level, $error->getMessage());
$this->logger->log($this->level, $error->getMessage(), [
'file' => $error->getFile(),
'line' => $error->getLine(),
]);

throw $error;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Remote/AMQP/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;

class Connector
final class Connector
{
private ?AMQPChannel $channel = null;
private ?PCNTLHeartbeatSender $heartbeats = null;

/**
* @param array<array<mixed>> $hosts
* @param array<array<mixed>> $hosts
* @param array<string|int, mixed> $options
*/
public function __construct(
Expand Down
16 changes: 8 additions & 8 deletions src/Remote/AMQP/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ final class Consumer implements ConsumerContract
public function __construct(
private Connector $connector,
private Packager $packager,
LoggerInterface $logger = null,
?LoggerInterface $logger = null,
) {
$this->logger = $logger ?? new NullLogger();
}

/**
* @param string|array<string> $bindings
*/
public function listen(string $name, array|string $bindings = [], Flags $flags = null): void
public function listen(string $name, array|string $bindings = [], ?Flags $flags = null): void
{
$this->consume(new Queue($name, $name, (array) $bindings, $flags ?? Flags::default()));
$this->consume(new Queue($name, (array) $bindings, $flags ?? Flags::default()));
}

public function consume(Queue $queue): void
Expand Down Expand Up @@ -95,10 +95,12 @@ private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
$handler = function (AMQPMessage $message) use ($channel, $dispatcher) {
try {
$this->handle($message, $channel, $dispatcher);

$message->ack();
} catch (Throwable $error) {
$this->logger->error((string) $error);
} finally {
$message->ack();

$message->nack();
}
};

Expand Down Expand Up @@ -155,8 +157,6 @@ private function handle(AMQPMessage $message, AMQPChannel $channel, Dispatcher $
return;
}

$envelope = $this->packager->unpack($message);

$dispatcher->dispatch($envelope);
$dispatcher->dispatch($this->packager->unpack($message));
}
}
2 changes: 1 addition & 1 deletion src/Remote/AMQP/Flags.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Onliner\CommandBus\Remote\AMQP;

class Flags
final class Flags
{
public const
PASSIVE = 1,
Expand Down
2 changes: 1 addition & 1 deletion src/Remote/AMQP/Headers.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Onliner\CommandBus\Remote\AMQP;

class Headers
final class Headers
{
public const
DELAY = 'x-delay',
Expand Down
2 changes: 1 addition & 1 deletion src/Remote/AMQP/Packager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use PhpAmqpLib\Wire\AMQPTable;

class Packager
final class Packager
{
/**
* @deprecated
Expand Down
6 changes: 3 additions & 3 deletions src/Remote/AMQP/Publish.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ final class Publish
{
public function __construct(
public string $exchange,
public string $queue,
public string $route,
public string $payload,
) {}

/**
* @param array<string, mixed> $payload
*/
public static function create(string $exchange, string $queue, array $payload): self
public static function create(string $exchange, string $route, array $payload): self
{
return new self($exchange, $queue, json_encode($payload, JSON_THROW_ON_ERROR));
return new self($exchange, $route, json_encode($payload, JSON_THROW_ON_ERROR));
}
}
15 changes: 9 additions & 6 deletions src/Remote/AMQP/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ final class Queue
*/
public function __construct(
public string $name,
public string $pattern,
private array $bindings,
public Flags $flags,
public array $args = [],
Expand All @@ -35,7 +34,7 @@ public function __construct(
public static function create(array $options): self
{
$pattern = $options['pattern'] ?? '#';
$name = $options['queue'] ?? $pattern;
$name = $options['name'] ?? $pattern;
$bindings = $options['bindings'] ?? [];
$args = $options['args'] ?? [];

Expand All @@ -59,7 +58,11 @@ public static function create(array $options): self
throw new InvalidArgumentException('Queue arguments must be an array');
}

return new self($name, $pattern, $bindings, Flags::compute($options), $args);
if (array_is_list($bindings)) {
$bindings = array_fill_keys($bindings, $pattern);
}

return new self($name, $bindings, Flags::compute($options), $args);
}

public function is(int $flag): bool
Expand All @@ -76,11 +79,11 @@ public function consume(AMQPChannel $channel, callable $handler): void
$this->is(Flags::EXCLUSIVE),
$this->is(Flags::DELETE),
$this->is(Flags::NO_WAIT),
new AMQPTable($this->args)
new AMQPTable($this->args),
);

foreach ($this->bindings as $binding) {
$channel->queue_bind($this->name, $binding, $this->pattern);
foreach ($this->bindings as $exchange => $pattern) {
$channel->queue_bind($this->name, $exchange, $pattern);
}

$channel->basic_consume(
Expand Down
11 changes: 2 additions & 9 deletions src/Remote/AMQP/Transport.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,8 @@ public function consume(): Consumer
return new Consumer($this->connector, $this->packager, $this->logger);
}

/**
* @param Exchange|array<string, mixed> $exchange
*/
public function declare(Exchange|array $exchange): void
public function declare(Exchange $exchange): void
{
if (is_array($exchange)) {
$exchange = Exchange::create($exchange);
}

$exchange->declare($this->connector->connect());
}

Expand All @@ -70,7 +63,7 @@ public function setup(Builder $builder): void
$builder->handle(Publish::class, function (Publish $message, Context $context) {
$this->publish(
new Envelope(Publish::class, $message->payload, $context->all()),
new Route($message->exchange, $message->queue),
new Route($message->exchange, $message->route),
);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/Remote/RemoteExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class RemoteExtension implements Extension
Envelope::class,
];

public function __construct(Transport $transport = null, Serializer $serializer = null)
public function __construct(?Transport $transport = null, ?Serializer $serializer = null)
{
$this->transport = $transport ?? new Transport\MemoryTransport();
$this->serializer = $serializer ?? new Serializer\NativeSerializer();
Expand Down
2 changes: 1 addition & 1 deletion src/Remote/Transport/MultiTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Onliner\CommandBus\Remote\Envelope;
use Onliner\CommandBus\Remote\Transport;

class MultiTransport implements Transport
final class MultiTransport implements Transport
{
/**
* @var array<string, Transport>
Expand Down
2 changes: 1 addition & 1 deletion src/Retry/RetryExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class RetryExtension implements Extension
*/
private array $policies = [];

public function __construct(Policy $default = null)
public function __construct(?Policy $default = null)
{
$this->default = $default ?? new Policy\ThrowPolicy();
}
Expand Down

0 comments on commit 10e44cc

Please sign in to comment.