-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #30 from onliner/amqp-rediliver-handle
- AMQP redeliver handle - Allow different modes for AMQP consumer - Exchange settings separated from queues - Added mandatory support - Other improvements
- Loading branch information
Showing
83 changed files
with
769 additions
and
1,128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,11 @@ | ||
phpstan: | ||
#!make | ||
|
||
help: ## Show this help | ||
@printf "\033[33m%s:\033[0m\n" 'Run: make <target> where <target> is one of the following' | ||
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf " \033[32m%-18s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | ||
|
||
phpstan: ## Run PHPStan | ||
vendor/bin/phpstan analyse --level max src tests | ||
|
||
test: phpstan | ||
test: phpstan ## Run PHPUnit | ||
vendor/bin/phpunit --verbose |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
use Onliner\CommandBus\Builder; | ||
use Onliner\CommandBus\Remote\AMQP\Transport; | ||
use Onliner\CommandBus\Remote\RemoteExtension; | ||
|
||
require __DIR__ . '/../../../vendor/autoload.php'; | ||
require __DIR__ . '/messages.php'; | ||
|
||
$transport = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); | ||
|
||
return (new Builder()) | ||
->use(new RemoteExtension($transport)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
use Onliner\CommandBus\Builder; | ||
use Onliner\CommandBus\Context; | ||
use Onliner\CommandBus\Remote\AMQP\Exchange; | ||
use Onliner\CommandBus\Remote\AMQP\Flags; | ||
use Onliner\CommandBus\Remote\AMQP\Packager; | ||
use Onliner\CommandBus\Remote\AMQP\Transport; | ||
use Onliner\CommandBus\Remote\AMQP\Consumer; | ||
use Onliner\CommandBus\Remote\AMQP\Queue; | ||
|
||
/** @var Builder $builder */ | ||
$builder = require __DIR__ . '/builder.php'; | ||
$builder->handle(SendEmail::class, function (SendEmail $command, Context $context) { | ||
$exchange = $context->get(Packager::OPTION_EXCHANGE); | ||
$routingKey = $context->get(Packager::OPTION_ROUTING_KEY); | ||
|
||
echo sprintf('Received message from %s with routing key %s', $exchange, $routingKey), PHP_EOL; | ||
|
||
// Throw exception to trigger DLE | ||
if ($exchange === 'foo') { | ||
throw new Exception("Something went wrong..."); | ||
} | ||
}); | ||
|
||
$transport = Transport::create('amqp://guest:guest@localhost:5672'); | ||
$transport->declare(Exchange::create(['name' => 'dle'])); | ||
$transport->declare(Exchange::create(['name' => 'foo'])); | ||
|
||
$consumer = $transport->consume(); | ||
$consumer->consume(new Queue('my-queue', [ | ||
'foo' => '#', | ||
'dle' => 'sendemail', | ||
], Flags::default(), args: [ | ||
Queue::DEAD_LETTER => 'dle', | ||
])); | ||
|
||
$consumer->run($builder->build(), [ | ||
Consumer::OPTION_ATTEMPTS => 10, | ||
Consumer::OPTION_INTERVAL => 100000, // 100 ms | ||
]); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
class SendEmail | ||
{ | ||
public function __construct( | ||
public string $to, | ||
public string $subject, | ||
public string $content, | ||
) {} | ||
} |
File renamed without changes.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
use Onliner\CommandBus\Builder; | ||
use Onliner\CommandBus\Remote\AMQP\Transport; | ||
use Onliner\CommandBus\Remote\RemoteExtension; | ||
use Onliner\CommandBus\Remote\Transport\MultiTransport; | ||
|
||
require __DIR__ . '/../../../vendor/autoload.php'; | ||
require __DIR__ . '/messages.php'; | ||
|
||
$transportFoo = Transport::create('amqp://guest:guest@localhost:5672', 'foo'); | ||
$transportBar = Transport::create('amqp://guest:guest@localhost:5673', 'bar'); | ||
|
||
$transport = new MultiTransport($transportFoo); | ||
$transport->add('Bar\*', $transportBar); | ||
|
||
return (new Builder()) | ||
->use(new RemoteExtension($transport)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
use Onliner\CommandBus\Builder; | ||
use Onliner\CommandBus\Remote\AMQP\Exchange; | ||
use Onliner\CommandBus\Remote\AMQP\Transport; | ||
use Onliner\CommandBus\Remote\AMQP\Consumer; | ||
|
||
/** @var Builder $builder */ | ||
$builder = require __DIR__ . '/builder.php'; | ||
$builder->handle(Bar\Hello::class, function (Bar\Hello $command) { | ||
echo sprintf('Hello %s from bar!', $command->name), PHP_EOL; | ||
}); | ||
|
||
$transport = Transport::create('amqp://guest:guest@localhost:5673'); | ||
$transport->declare(Exchange::create(['name' => 'bar'])); | ||
|
||
$consumer = $transport->consume(); | ||
$consumer->listen('#', 'bar'); | ||
$consumer->run($builder->build()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
use Onliner\CommandBus\Builder; | ||
use Onliner\CommandBus\Remote\AMQP\Exchange; | ||
use Onliner\CommandBus\Remote\AMQP\Transport; | ||
use Onliner\CommandBus\Remote\AMQP\Consumer; | ||
|
||
/** @var Builder $builder */ | ||
$builder = require __DIR__ . '/builder.php'; | ||
$builder->handle(Foo\Hello::class, function (Foo\Hello $command) { | ||
echo sprintf('Hello %s from foo!', $command->name), PHP_EOL; | ||
}); | ||
|
||
$transport = Transport::create('amqp://guest:guest@localhost:5672'); | ||
$transport->declare(Exchange::create(['name' => 'foo'])); | ||
|
||
$consumer = $transport->consume(); | ||
$consumer->listen('#', 'foo'); | ||
$consumer->run($builder->build()); |
Oops, something went wrong.