Skip to content

Commit

Permalink
Added publish handler for raw messages
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Aug 13, 2024
1 parent 1ef1fa8 commit 57998e8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
22 changes: 22 additions & 0 deletions src/Remote/AMQP/Publish.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Onliner\CommandBus\Remote\AMQP;

final class Publish
{
public function __construct(
public string $exchange,
public string $queue,
public string $payload,
) {}

/**
* @param array<string, mixed> $payload
*/
public static function create(string $exchange, string $queue, array $payload): self
{
return new self($exchange, $queue, json_encode($payload, JSON_THROW_ON_ERROR));
}
}
21 changes: 19 additions & 2 deletions src/Remote/AMQP/Transport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

namespace Onliner\CommandBus\Remote\AMQP;

use Onliner\CommandBus\Builder;
use Onliner\CommandBus\Context;
use Onliner\CommandBus\Extension;
use Onliner\CommandBus\Remote\Envelope;
use Onliner\CommandBus\Remote\Transport as TransportContract;
use Psr\Log\LoggerInterface;

final class Transport implements TransportContract
final class Transport implements TransportContract, Extension
{
public function __construct(
private Connector $connector,
Expand All @@ -28,9 +31,13 @@ public static function create(string $dsn, string $exchange = '', array $routes
}

public function send(Envelope $envelope): void
{
$this->publish($envelope, $this->router->match($envelope));
}

public function publish(Envelope $envelope, Route $route): void
{
$message = $this->packager->pack($envelope);
$route = $this->router->match($envelope);

$channel = $this->connector->connect();
$channel->basic_publish(
Expand All @@ -57,4 +64,14 @@ public function declare(Exchange|array $exchange): void

$exchange->declare($this->connector->connect());
}

public function setup(Builder $builder): void
{
$builder->handle(Publish::class, function (Publish $message, Context $context) {
$this->publish(
new Envelope(Publish::class, $message->payload, $context->all()),
new Route($message->exchange, $message->queue),
);
});
}
}
4 changes: 4 additions & 0 deletions src/Remote/RemoteExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public function local(string ...$local): void

public function setup(Builder $builder): void
{
if ($this->transport instanceof Extension) {
$this->transport->setup($builder);
}

$gateway = new Gateway($this->transport, $this->serializer);

$builder->middleware(new RemoteMiddleware($gateway, $this->local));
Expand Down

0 comments on commit 57998e8

Please sign in to comment.