Skip to content

Commit

Permalink
Updated to fix messenger component api changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Quaile authored and adamquaile committed Nov 12, 2020
1 parent 9a89d79 commit 6064536
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 34 deletions.
34 changes: 12 additions & 22 deletions src/AppBundle/Messenger/SpoolTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,6 @@ class SpoolTransport implements TransportInterface

private $envelopes = [];

/**
* Receive some messages to the given handler.
*
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
*/
public function receive(callable $handler): void
{
foreach ($this->envelopes as $key => $envelope) {
unset($this->envelopes[$key]);
$handler($envelope);
}
}

/**
* Stop receiving some messages.
*/
public function stop(): void
{
}

/**
* Sends the given envelope.
*
Expand All @@ -46,14 +25,25 @@ public function send(Envelope $envelope): Envelope

public function get(): iterable
{
return [];
return $this->envelopes;
}

public function ack(Envelope $envelope): void
{
$this->removeEnvelopeFromQueue($envelope);
}

public function reject(Envelope $envelope): void
{
$this->removeEnvelopeFromQueue($envelope);
}

private function removeEnvelopeFromQueue(Envelope $envelope): void
{
foreach ($this->envelopes as $key => $queued) {
if ($queued->getMessage() === $envelope->getMessage()) {
unset($this->envelopes[$key]);
}
}
}
}
26 changes: 21 additions & 5 deletions src/AppBundle/Messenger/SpoolTransportEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
namespace AppBundle\Messenger;

use Psr\Container\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\Event\PostResponseEvent;
use Symfony\Component\HttpKernel\KernelEvents;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Worker;

Expand Down Expand Up @@ -59,12 +61,26 @@ public static function getSubscribedEvents()
public function onKernelTerminate(PostResponseEvent $event)
{
if ($this->enveloperQueueDsn === 'spool://memory') {

$stopWhenFinishedEventDispatcher = new EventDispatcher();


$worker = new Worker(
$this->receiverLocator->get('email_queue'),
// [
// 'email_queue' => $this->receiverLocator->get('email_queue')
// ],
$this->bus
// $this->receiverLocator->get('email_queue'),
[
'email_queue' => $this->receiverLocator->get('email_queue')
],
$this->bus,
$stopWhenFinishedEventDispatcher
);

$stopWhenFinishedEventDispatcher->addListener(
WorkerRunningEvent::class,
static function (WorkerRunningEvent $event) use ($worker) {
if ($event->isWorkerIdle()) {
$worker->stop();
}
}
);
$worker->run();
}
Expand Down
8 changes: 1 addition & 7 deletions src/Outstack/Enveloper/Application/QueueEmailRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ public function __invoke(EmailRequest $emailRequest)

$this->messageResolver->validate($template, $emailRequest->getParameters());
$this->emailRequestLog->recordInitialRequest($emailRequest);
$this->deliveryQueue->append(
$emailRequest,
$this->messageResolver->resolve(
$template,
$emailRequest->getParameters()
)
);
$this->deliveryQueue->append($emailRequest);
}
}

0 comments on commit 6064536

Please sign in to comment.