From ff1ec22649397fb1e9abf56b65c4634f23a77d5b Mon Sep 17 00:00:00 2001 From: Anton Shabouta Date: Mon, 19 Aug 2024 14:19:39 +0300 Subject: [PATCH] Allow different modes for AMQP consumer --- src/Middleware/LoggerMiddleware.php | 1 + src/Remote/AMQP/Consumer.php | 47 ++++++++++++++++++++--- tests/Middleware/LoggerMiddlewareTest.php | 3 +- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/Middleware/LoggerMiddleware.php b/src/Middleware/LoggerMiddleware.php index 84e8c44..d081d3e 100644 --- a/src/Middleware/LoggerMiddleware.php +++ b/src/Middleware/LoggerMiddleware.php @@ -23,6 +23,7 @@ public function call(object $message, Context $context, callable $next): void $next($message, $context); } catch (Throwable $error) { $this->logger->log($this->level, $error->getMessage(), [ + 'type' => get_class($error), 'file' => $error->getFile(), 'line' => $error->getLine(), ]); diff --git a/src/Remote/AMQP/Consumer.php b/src/Remote/AMQP/Consumer.php index cb52c60..397b8c3 100644 --- a/src/Remote/AMQP/Consumer.php +++ b/src/Remote/AMQP/Consumer.php @@ -16,10 +16,19 @@ final class Consumer implements ConsumerContract { + public const + MODE_ACK = 'ack', + MODE_NACK = 'nack', + MODE_REJECT = 'reject' + ; + public const OPTION_ATTEMPTS = 'attempts', OPTION_INTERVAL = 'interval', - OPTION_PREFETCH = 'prefetch' + OPTION_PREFETCH = 'prefetch', + OPTION_REQUEUE = 'requeue', + OPTION_MULTIPLE = 'multiple', + OPTION_MODE = self::MODE_REJECT ; private const @@ -74,7 +83,11 @@ public function run(Dispatcher $dispatcher, array $options = []): void $channel = $this->channel($dispatcher, $options); } catch (Throwable $error) { - $this->logger->error((string) $error); + $this->logger->error($error->getMessage(), [ + 'type' => get_class($error), + 'file' => $error->getFile(), + 'line' => $error->getLine(), + ]); } } @@ -92,15 +105,37 @@ public function stop(): void private function channel(Dispatcher $dispatcher, array $options): AMQPChannel { $channel = $this->connect($options); - $handler = function (AMQPMessage $message) use ($channel, $dispatcher) { + $mode = $options[self::OPTION_MODE] ?? self::MODE_REJECT; + $requeue = filter_var($options[self::OPTION_REQUEUE] ?? false, FILTER_VALIDATE_BOOLEAN); + $multiple = filter_var($options[self::OPTION_MULTIPLE] ?? false, FILTER_VALIDATE_BOOLEAN); + + if (!in_array($mode, [self::MODE_ACK, self::MODE_NACK, self::MODE_REJECT])) { + $mode = self::MODE_REJECT; + } + + $handler = function (AMQPMessage $message) use ($channel, $mode, $multiple, $requeue, $dispatcher) { try { $this->handle($message, $channel, $dispatcher); - $message->ack(); + $message->ack($multiple); } catch (Throwable $error) { - $this->logger->error((string) $error); + switch ($mode) { + case self::MODE_ACK: + $message->ack($multiple); + break; + case self::MODE_NACK: + $message->nack($requeue, $multiple); + break; + case self::MODE_REJECT: + $message->reject($requeue); + break; + } - $message->nack(); + $this->logger->error($error->getMessage(), [ + 'type' => get_class($error), + 'file' => $error->getFile(), + 'line' => $error->getLine(), + ]); } }; diff --git a/tests/Middleware/LoggerMiddlewareTest.php b/tests/Middleware/LoggerMiddlewareTest.php index 9705064..8a55a59 100644 --- a/tests/Middleware/LoggerMiddlewareTest.php +++ b/tests/Middleware/LoggerMiddlewareTest.php @@ -37,8 +37,9 @@ public function testLogErrors(): void ->expects(self::once()) ->method('log') ->with($level, 'expected', [ + 'type' => LogicException::class, 'file' => __FILE__, - 'line' => 46, + 'line' => 47, ]); $dispatcher = (new Builder())