Skip to content

Commit

Permalink
Pipeline item skip mechanism.
Browse files Browse the repository at this point in the history
  • Loading branch information
GrandLTU committed Jan 15, 2015
1 parent ea2319e commit db83250
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 13 deletions.
50 changes: 50 additions & 0 deletions EventListener/AbstractConsumeEventListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the ONGR package.
*
* (c) NFQ Technologies UAB <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ONGR\ConnectionsBundle\EventListener;

use ONGR\ConnectionsBundle\Pipeline\Event\ItemPipelineEvent;

/**
* AbstractConsumeEventListener class.
*/
abstract class AbstractConsumeEventListener
{
/**
* Entry point of consume event.
*
* @param ItemPipelineEvent $event
*/
public function onConsume(ItemPipelineEvent $event)
{
if ($event->getItemSkipException()) {
$this->skip($event);
} else {
$this->consume($event);
}
}

/**
* Called when item should be skipped.
*
* @param ItemPipelineEvent $event
*/
public function skip(ItemPipelineEvent $event)
{
}

/**
* Called when item should be consumed.
*
* @param ItemPipelineEvent $event
*/
abstract public function consume(ItemPipelineEvent $event);
}
6 changes: 3 additions & 3 deletions EventListener/AbstractImportConsumeEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@

namespace ONGR\ConnectionsBundle\EventListener;

use ONGR\ConnectionsBundle\Pipeline\Item\AbstractImportItem;
use ONGR\ConnectionsBundle\Log\EventLoggerAwareTrait;
use ONGR\ConnectionsBundle\Pipeline\Event\ItemPipelineEvent;
use ONGR\ConnectionsBundle\Pipeline\Item\AbstractImportItem;
use ONGR\ElasticsearchBundle\ORM\Manager;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LogLevel;

/**
* AbstractImportConsumeEventListener - called after modify event. Puts document into Elasticsearch.
*/
abstract class AbstractImportConsumeEventListener implements LoggerAwareInterface
abstract class AbstractImportConsumeEventListener extends AbstractConsumeEventListener implements LoggerAwareInterface
{
use EventLoggerAwareTrait;

Expand Down Expand Up @@ -55,7 +55,7 @@ public function __construct(Manager $manager, $itemClass)
*
* @param ItemPipelineEvent $event
*/
public function onConsume(ItemPipelineEvent $event)
public function consume(ItemPipelineEvent $event)
{
if (!$this->setItem($event)) {
return;
Expand Down
4 changes: 2 additions & 2 deletions EventListener/DataSyncConsumeEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* DataSyncConsumeEventListener - extracts item onConsume event.
*/
class DataSyncConsumeEventListener implements LoggerAwareInterface
class DataSyncConsumeEventListener extends AbstractConsumeEventListener implements LoggerAwareInterface
{
use EventLoggerAwareTrait;

Expand All @@ -43,7 +43,7 @@ public function __construct(ExtractorInterface $extractor)
*
* @param ItemPipelineEvent $event
*/
public function onConsume(ItemPipelineEvent $event)
public function consume(ItemPipelineEvent $event)
{
$this->extractor->extract($event->getItem());
}
Expand Down
26 changes: 26 additions & 0 deletions Pipeline/Event/ItemPipelineEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace ONGR\ConnectionsBundle\Pipeline\Event;

use ONGR\ConnectionsBundle\Pipeline\ItemSkipException;
use Symfony\Component\EventDispatcher\Event;

/**
Expand All @@ -30,6 +31,11 @@ class ItemPipelineEvent extends Event
*/
private $output;

/**
* @var ItemSkipException
*/
private $itemSkipException;

/**
* @param mixed $item
*/
Expand Down Expand Up @@ -69,4 +75,24 @@ public function setOutput($output)
{
$this->output = $output;
}

/**
* @return ItemSkipException
*/
public function getItemSkipException()
{
return $this->itemSkipException;
}

/**
* @param ItemSkipException $itemSkipException
*
* @return $this
*/
public function setItemSkipException(ItemSkipException $itemSkipException)
{
$this->itemSkipException = $itemSkipException;

return $this;
}
}
19 changes: 19 additions & 0 deletions Pipeline/ItemSkipException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the ONGR package.
*
* (c) NFQ Technologies UAB <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ONGR\ConnectionsBundle\Pipeline;

/**
* Skip Item Exception.
*/
class ItemSkipException extends \Exception
{
}
12 changes: 8 additions & 4 deletions Pipeline/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ public function start()
$itemEvent = new ItemPipelineEvent($item);
$itemEvent->setContext($this->getContext());

$dispatcher->dispatch(
$this->getEventName(self::EVENT_SUFFIX_MODIFY),
$itemEvent
);
try {
$dispatcher->dispatch(
$this->getEventName(self::EVENT_SUFFIX_MODIFY),
$itemEvent
);
} catch (ItemSkipException $itemSkipException) {
$itemEvent->setItemSkipException($itemSkipException);
}

$dispatcher->dispatch(
$this->getEventName(self::EVENT_SUFFIX_CONSUME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public function invalidateItem($item, $context = null)
/**
* {@inheritdoc}
*/
public function onConsume(ItemPipelineEvent $event)
public function consume(ItemPipelineEvent $event)
{
$this->consumeCalled++;

return parent::onConsume($event);
return parent::consume($event);
}

/**
Expand Down
89 changes: 89 additions & 0 deletions Tests/Unit/Pipeline/PipelineTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

/*
* This file is part of the ONGR package.
*
* (c) NFQ Technologies UAB <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ONGR\ConnectionsBundle\Tests\Unit\Pipeline;

use ONGR\ConnectionsBundle\Pipeline\Event\ItemPipelineEvent;
use ONGR\ConnectionsBundle\Pipeline\Event\SourcePipelineEvent;
use ONGR\ConnectionsBundle\Pipeline\ItemSkipException;
use ONGR\ConnectionsBundle\Pipeline\PipelineFactory;
use Symfony\Component\EventDispatcher\EventDispatcher;

/**
* PipelineTest class.
*/
class PipelineTest extends \PHPUnit_Framework_TestCase
{
/**
* Tests pipeline.
*
* @param array $data
* @param int $expectedConsumedItems
* @param int $expectedSkippedItems
*
* @dataProvider PipelineData
*/
public function testPipeline($data, $expectedConsumedItems, $expectedSkippedItems)
{
$pipelineFactory = new PipelineFactory();
$pipelineFactory->setDispatcher(new EventDispatcher());
$pipelineFactory->setClassName('ONGR\ConnectionsBundle\Pipeline\Pipeline');

$consumer = new PipelineTestConsumer();

$source = function (SourcePipelineEvent $event) use ($data) {
$event->addSource($data);
};

$pipeline = $pipelineFactory->create(
'test',
[
'sources' => [$source],
'modifiers' => [[$this, 'onModify']],
'consumers' => [[$consumer, 'onConsume']],
]
);
$pipeline->start();

$this->assertEquals($expectedConsumedItems, $consumer->getConsumeCalled());
$this->assertEquals($expectedSkippedItems, $consumer->getSkipCalled());
}

/**
* OnModify.
*
* @param ItemPipelineEvent $event
*
* @throws ItemSkipException
*/
public function onModify(ItemPipelineEvent $event)
{
if ($event->getItem() == 'skip') {
throw new ItemSkipException();
}
}

/**
* Pipeline data provider.
*
* @return array
*/
public function pipelineData()
{
return [
[[], 0, 0],
[['consume'], 1, 0],
[['skip'], 0, 1],
[['skip', 'consume', 'skip'], 1, 2],
[['consume', 'skip', 'consume'], 2, 1],
];
}
}
63 changes: 63 additions & 0 deletions Tests/Unit/Pipeline/PipelineTestConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

/*
* This file is part of the ONGR package.
*
* (c) NFQ Technologies UAB <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace ONGR\ConnectionsBundle\Tests\Unit\Pipeline;

use ONGR\ConnectionsBundle\EventListener\AbstractConsumeEventListener;
use ONGR\ConnectionsBundle\Pipeline\Event\ItemPipelineEvent;

/**
* PipelineTestConsumer class.
*/
class PipelineTestConsumer extends AbstractConsumeEventListener
{
/**
* @var int
*/
private $consumeCalled = 0;

/**
* @var int
*/
private $skipCalled = 0;

/**
* {@inheritdoc}
*/
public function consume(ItemPipelineEvent $event)
{
++$this->consumeCalled;
}

/**
* {@inheritdoc}
*/
public function skip(ItemPipelineEvent $event)
{
++$this->skipCalled;
}

/**
* @return int
*/
public function getConsumeCalled()
{
return $this->consumeCalled;
}

/**
* @return int
*/
public function getSkipCalled()
{
return $this->skipCalled;
}
}
5 changes: 3 additions & 2 deletions UrlInvalidator/AbstractItemUrlInvalidator.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

namespace ONGR\ConnectionsBundle\UrlInvalidator;

use ONGR\ConnectionsBundle\EventListener\AbstractConsumeEventListener;
use ONGR\ConnectionsBundle\Pipeline\Event\FinishPipelineEvent;
use ONGR\ConnectionsBundle\Pipeline\Event\ItemPipelineEvent;

/**
* Base class for url invalidation for pipeline items.
*/
abstract class AbstractItemUrlInvalidator
abstract class AbstractItemUrlInvalidator extends AbstractConsumeEventListener
{
/**
* @var UrlInvalidatorService
Expand Down Expand Up @@ -49,7 +50,7 @@ public function setUrlInvalidator(UrlInvalidatorService $urlInvalidator)
*
* @param ItemPipelineEvent $event
*/
public function onConsume(ItemPipelineEvent $event)
public function consume(ItemPipelineEvent $event)
{
$this->invalidateItem($event->getItem(), $event->getContext());
}
Expand Down

0 comments on commit db83250

Please sign in to comment.