Skip to content

Commit

Permalink
Allow different modes for AMQP consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
zloyuser committed Aug 19, 2024
1 parent dd83f06 commit ff1ec22
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/Middleware/LoggerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public function call(object $message, Context $context, callable $next): void
$next($message, $context);
} catch (Throwable $error) {
$this->logger->log($this->level, $error->getMessage(), [
'type' => get_class($error),
'file' => $error->getFile(),
'line' => $error->getLine(),
]);
Expand Down
47 changes: 41 additions & 6 deletions src/Remote/AMQP/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@

final class Consumer implements ConsumerContract
{
public const
MODE_ACK = 'ack',
MODE_NACK = 'nack',
MODE_REJECT = 'reject'
;

public const
OPTION_ATTEMPTS = 'attempts',
OPTION_INTERVAL = 'interval',
OPTION_PREFETCH = 'prefetch'
OPTION_PREFETCH = 'prefetch',
OPTION_REQUEUE = 'requeue',
OPTION_MULTIPLE = 'multiple',
OPTION_MODE = self::MODE_REJECT
;

private const
Expand Down Expand Up @@ -74,7 +83,11 @@ public function run(Dispatcher $dispatcher, array $options = []): void

$channel = $this->channel($dispatcher, $options);
} catch (Throwable $error) {
$this->logger->error((string) $error);
$this->logger->error($error->getMessage(), [
'type' => get_class($error),
'file' => $error->getFile(),
'line' => $error->getLine(),
]);
}
}

Expand All @@ -92,15 +105,37 @@ public function stop(): void
private function channel(Dispatcher $dispatcher, array $options): AMQPChannel
{
$channel = $this->connect($options);
$handler = function (AMQPMessage $message) use ($channel, $dispatcher) {
$mode = $options[self::OPTION_MODE] ?? self::MODE_REJECT;
$requeue = filter_var($options[self::OPTION_REQUEUE] ?? false, FILTER_VALIDATE_BOOLEAN);
$multiple = filter_var($options[self::OPTION_MULTIPLE] ?? false, FILTER_VALIDATE_BOOLEAN);

if (!in_array($mode, [self::MODE_ACK, self::MODE_NACK, self::MODE_REJECT])) {
$mode = self::MODE_REJECT;
}

$handler = function (AMQPMessage $message) use ($channel, $mode, $multiple, $requeue, $dispatcher) {
try {
$this->handle($message, $channel, $dispatcher);

$message->ack();
$message->ack($multiple);
} catch (Throwable $error) {
$this->logger->error((string) $error);
switch ($mode) {
case self::MODE_ACK:
$message->ack($multiple);
break;
case self::MODE_NACK:
$message->nack($requeue, $multiple);
break;
case self::MODE_REJECT:
$message->reject($requeue);
break;
}

$message->nack();
$this->logger->error($error->getMessage(), [
'type' => get_class($error),
'file' => $error->getFile(),
'line' => $error->getLine(),
]);
}
};

Expand Down
3 changes: 2 additions & 1 deletion tests/Middleware/LoggerMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public function testLogErrors(): void
->expects(self::once())
->method('log')
->with($level, 'expected', [
'type' => LogicException::class,
'file' => __FILE__,
'line' => 46,
'line' => 47,
]);

$dispatcher = (new Builder())
Expand Down

0 comments on commit ff1ec22

Please sign in to comment.