From 3084e3f10f88b80d971e1962881b0e628ac06e61 Mon Sep 17 00:00:00 2001 From: joemeehan Date: Wed, 25 Oct 2017 08:53:12 +0100 Subject: [PATCH] Add Kinesis Firehose adapter and tests (#42) * Add Kinesis Firehose adapter and tests * Simplify message structure and support configurable batch size through options * Remove unused use statement * Add (and fix) failed enqueue test * Move enqueue failure test to integration tests --- src/Adapter/FirehoseAdapter.php | 159 ++++++++++++++++++ tests/integration/FirehoseIntegrationTest.php | 80 +++++++++ tests/unit/Adapter/FirehoseAdapterTest.php | 118 +++++++++++++ 3 files changed, 357 insertions(+) create mode 100644 src/Adapter/FirehoseAdapter.php create mode 100644 tests/integration/FirehoseIntegrationTest.php create mode 100644 tests/unit/Adapter/FirehoseAdapterTest.php diff --git a/src/Adapter/FirehoseAdapter.php b/src/Adapter/FirehoseAdapter.php new file mode 100644 index 0000000..cebfd9d --- /dev/null +++ b/src/Adapter/FirehoseAdapter.php @@ -0,0 +1,159 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * @license https://github.com/graze/queue/blob/master/LICENSE MIT + * + * @link https://github.com/graze/queue + */ + +namespace Graze\Queue\Adapter; + +use Aws\Firehose\FirehoseClient; +use Graze\Queue\Adapter\Exception\MethodNotSupportedException; +use Graze\Queue\Adapter\Exception\FailedEnqueueException; +use Graze\Queue\Message\MessageFactoryInterface; +use Graze\Queue\Message\MessageInterface; + +/** + * Amazon AWS Kinesis Firehose Adapter. + * + * This method only supports the enqueue method to send messages to a Kinesiss + * Firehose stream + * + * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch + */ +final class FirehoseAdapter implements AdapterInterface +{ + const BATCHSIZE_SEND = 100; + + /** @var FirehoseClient */ + protected $client; + + /** @var array */ + protected $options; + + /** @var string */ + protected $deliveryStreamName; + + /** + * @param FirehoseClient $client + * @param string $deliveryStreamName + * @param array $options - BatchSize The number of messages to send in each batch. + */ + public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = []) + { + $this->client = $client; + $this->deliveryStreamName = $deliveryStreamName; + $this->options = $options; + } + + /** + * @param MessageInterface[] $messages + * + * @throws MethodNotSupportedException + */ + public function acknowledge(array $messages) + { + throw new MethodNotSupportedException( + __FUNCTION__, + $this, + $messages + ); + } + + /** + * @param MessageFactoryInterface $factory + * @param int $limit + * + * @throws MethodNotSupportedException + */ + public function dequeue(MessageFactoryInterface $factory, $limit) + { + throw new MethodNotSupportedException( + __FUNCTION__, + $this, + [] + ); + } + + /** + * @param MessageInterface[] $messages + * + * @throws FailedEnqueueException + */ + public function enqueue(array $messages) + { + $failed = []; + $batches = array_chunk( + $messages, + $this->getOption('BatchSize', self::BATCHSIZE_SEND) + ); + + foreach ($batches as $batch) { + $requestRecords = array_map(function (MessageInterface $message) { + return [ + 'Data' => $message->getBody() + ]; + }, $batch); + + $request = [ + 'DeliveryStreamName' => $this->deliveryStreamName, + 'Records' => $requestRecords, + ]; + + $results = $this->client->putRecordBatch($request); + + foreach ($results->get('RequestResponses') as $idx => $response) { + if (isset($response['ErrorCode'])) { + $failed[] = $batch[$idx]; + } + } + } + + if (!empty($failed)) { + throw new FailedEnqueueException($this, $failed); + } + } + + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ + protected function getOption($name, $default = null) + { + return isset($this->options[$name]) ? $this->options[$name] : $default; + } + + /** + * @throws MethodNotSupportedException + */ + public function purge() + { + throw new MethodNotSupportedException( + __FUNCTION__, + $this, + [] + ); + } + + /** + * @throws MethodNotSupportedException + */ + public function delete() + { + throw new MethodNotSupportedException( + __FUNCTION__, + $this, + [] + ); + } +} diff --git a/tests/integration/FirehoseIntegrationTest.php b/tests/integration/FirehoseIntegrationTest.php new file mode 100644 index 0000000..ab93151 --- /dev/null +++ b/tests/integration/FirehoseIntegrationTest.php @@ -0,0 +1,80 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * @license https://github.com/graze/queue/blob/master/LICENSE MIT + * + * @link https://github.com/graze/queue + */ + +namespace Graze\Queue; + +use Aws\ResultInterface; +use Aws\Firehose\FirehoseClient; +use Graze\Queue\Adapter\Exception\FailedEnqueueException; +use Graze\Queue\Adapter\FirehoseAdapter; +use Mockery as m; +use Mockery\MockInterface; +use PHPUnit_Framework_TestCase as TestCase; + +class FirehoseIntegrationTest extends TestCase +{ + /** @var string */ + private $deliveryStreamName; + /** @var FirehoseClient|MockInterface */ + private $firehoseClient; + /** @var Client */ + private $client; + + public function setUp() + { + $this->deliveryStreamName = 'delivery_stream_foo'; + $this->firehoseClient = m::mock(FirehoseClient::class); + $this->client = new Client(new FirehoseAdapter($this->firehoseClient, 'delivery_stream_foo')); + } + + public function testSend() + { + $model = m::mock(ResultInterface::class); + $model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]); + + $this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([ + 'DeliveryStreamName' => $this->deliveryStreamName, + 'Records' => [ + ['Data' => 'foo'] + ] + ])->andReturn($model); + + $this->client->send([$this->client->create('foo')]); + } + + /** + * @expectedException \Graze\Queue\Adapter\Exception\FailedEnqueueException + */ + public function testSendError() + { + $model = m::mock(ResultInterface::class); + $model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([ + [ + 'ErrorCode' => 'fooError', + 'ErrorMessage' => 'Some error message', + 'RecordId' => 'foo', + ] + ]); + + $this->firehoseClient->shouldReceive('putRecordBatch')->once()->with([ + 'DeliveryStreamName' => $this->deliveryStreamName, + 'Records' => [ + ['Data' => 'foo'], + ], + ])->andReturn($model); + + $this->client->send([$this->client->create('foo')]); + } +} diff --git a/tests/unit/Adapter/FirehoseAdapterTest.php b/tests/unit/Adapter/FirehoseAdapterTest.php new file mode 100644 index 0000000..564139b --- /dev/null +++ b/tests/unit/Adapter/FirehoseAdapterTest.php @@ -0,0 +1,118 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * @license https://github.com/graze/queue/blob/master/LICENSE MIT + * + * @link https://github.com/graze/queue + */ + +namespace Graze\Queue\Adapter; + +use Aws\ResultInterface; +use Aws\Firehose\FirehoseClient; +use Graze\Queue\Adapter\Exception\MethodNotSupportedException; +use Graze\Queue\Message\MessageFactoryInterface; +use Graze\Queue\Message\MessageInterface; +use Mockery as m; +use Mockery\MockInterface; +use PHPUnit_Framework_TestCase as TestCase; + +class FirehoseAdapterTest extends TestCase +{ + /** @var MessageInterface|MockInterface */ + private $messageA; + /** @var MessageInterface|MockInterface */ + private $messageB; + /** @var MessageInterface|MockInterface */ + private $messageC; + /** @var MessageInterface[]|MockInterface[] */ + private $messages; + /** @var ResultInterface|MockInterface */ + private $model; + /** @var MessageFactoryInterface|MockInterface */ + private $factory; + /** @var FirehoseClient */ + private $client; + + public function setUp() + { + $this->client = m::mock(FirehoseClient::class); + $this->model = m::mock(ResultInterface::class); + $this->factory = m::mock(MessageFactoryInterface::class); + + $this->messageA = $a = m::mock(MessageInterface::class); + $this->messageB = $b = m::mock(MessageInterface::class); + $this->messageC = $c = m::mock(MessageInterface::class); + $this->messages = [$a, $b, $c]; + } + + public function testInterface() + { + assertThat(new FirehoseAdapter($this->client, 'foo'), is(anInstanceOf('Graze\Queue\Adapter\AdapterInterface'))); + } + + public function testEnqueue() + { + $adapter = new FirehoseAdapter($this->client, 'foo'); + + $this->messageA->shouldReceive('getBody')->once()->withNoArgs()->andReturn('foo'); + $this->messageB->shouldReceive('getBody')->once()->withNoArgs()->andReturn('bar'); + $this->messageC->shouldReceive('getBody')->once()->withNoArgs()->andReturn('baz'); + + $this->model->shouldReceive('get')->once()->with('RequestResponses')->andReturn([]); + + $this->client->shouldReceive('putRecordBatch')->once()->with([ + 'DeliveryStreamName' => 'foo', + 'Records' => [ + ['Data' => 'foo'], + ['Data' => 'bar'], + ['Data' => 'baz'], + ], + ])->andReturn($this->model); + + $adapter->enqueue($this->messages); + } + + /** + * @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException + */ + public function testAcknowledge() + { + $adapter = new FirehoseAdapter($this->client, 'foo'); + $adapter->acknowledge($this->messages); + } + + /** + * @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException + */ + public function testDequeue() + { + $adapter = new FirehoseAdapter($this->client, 'foo'); + $adapter->dequeue($this->factory, 10); + } + + /** + * @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException + */ + public function testPurge() + { + $adapter = new FirehoseAdapter($this->client, 'foo'); + $adapter->purge(); + } + + /** + * @expectedException \Graze\Queue\Adapter\Exception\MethodNotSupportedException + */ + public function testDelete() + { + $adapter = new FirehoseAdapter($this->client, 'foo'); + $adapter->delete(); + } +}