Skip to content

Commit

Permalink
adds a reject method to do the opposite of acknowledge (#43)
Browse files Browse the repository at this point in the history
* adds a reject method to do the opposite of acknowledge

* lint fix

* add tests for 7.2 / nightly

* some boy scouting

* add 7.2 to allowed failures list

* change install to build

* use dist for travis

* fix singluar/plural mixing
  • Loading branch information
Harry Bragg authored Jan 16, 2018
1 parent 3084e3f commit d469f2a
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 68 deletions.
17 changes: 14 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
language: php

sudo: false
dist: trusty

cache:
directories:
- $HOME/.composer/cache/files

php:
- 5.5
- 5.6
- 7.0
- 7.1
- 7.2
- nightly
- hhvm

env:
- 'COMPOSER_FLAGS="--prefer-lowest --prefer-stable"'
- 'COMPOSER_FLAGS=""'

matrix:
allow_failures:
- php: nightly
- php: 7.2

install:
- travis_retry composer update ${COMPOSER_FLAGS} --no-interaction --prefer-source
- travis_retry composer update ${COMPOSER_FLAGS} --no-interaction --prefer-dist

script:
- vendor/bin/phpcs -p --warning-severity=0 src/ tests/
- vendor/bin/phpunit --coverage-clover=./tests/report/coverage.clover

after_script:
- ./build/coverage_to_scruitinizer.sh
- test -f ./tests/report/coverage.clover && (wget https://scrutinizer-ci.com/ocular.phar; php ocular.phar code-coverage:upload --format=php-clover ./tests/report/coverage.clover)
18 changes: 10 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SHELL = /bin/sh

DOCKER ?= $(shell which docker)
DOCKER_REPOSITORY := graze/php-alpine:test
DOCKER_REPOSITORY := graze/php-alpine:7.1-test
VOLUME := /opt/graze/queue
VOLUME_MAP := -v $$(pwd):${VOLUME}
DOCKER_RUN_BASE := ${DOCKER} run --rm -t ${VOLUME_MAP} -w ${VOLUME}
Expand All @@ -14,15 +14,17 @@ DOCKER_RUN := ${DOCKER_RUN_BASE} ${DOCKER_REPOSITORY}

# Building

install: ## Download the dependencies then build the image :rocket:.
make 'composer-install --optimize-autoloader --ignore-platform-reqs'
build: ## Download the dependencies
make 'composer-install --optimize-autoloader'

build-update: ## Update and download the dependencies
make 'composer-update --optimize-autoloader'

composer-%: ## Run a composer command, `make "composer-<command> [...]"`.
${DOCKER} run -t --rm \
-v $$(pwd):/usr/src/app \
-v ~/.composer:/root/composer \
-v ~/.ssh:/root/.ssh:ro \
graze/composer --no-interaction --prefer-dist $* $(filter-out $@,$(MAKECMDGOALS))
-v $$(pwd):/app:delegated \
-v ~/.composer:/tmp:delegated \
composer --no-interaction --prefer-dist $* $(filter-out $@,$(MAKECMDGOALS))

# Testing

Expand All @@ -42,10 +44,10 @@ test-integration: ## Run the integration testsuite.
${DOCKER_RUN} vendor/bin/phpunit --colors=always --testsuite integration

test-matrix: ## Run the unit tests against multiple targets.
make DOCKER_REPOSITORY="php:5.5-alpine" test
make DOCKER_REPOSITORY="php:5.6-alpine" test
make DOCKER_REPOSITORY="php:7.0-alpine" test
make DOCKER_REPOSITORY="php:7.1-alpine" test
make DOCKER_REPOSITORY="php:7.2-alpine" test
make DOCKER_REPOSITORY="hhvm/hhvm:latest" test

test-coverage: ## Run all tests and output coverage to the console.
Expand Down
7 changes: 0 additions & 7 deletions build/coverage_to_scruitinizer.sh

This file was deleted.

25 changes: 25 additions & 0 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,30 @@
interface AdapterInterface
{
/**
* Acknowledge the messages (delete them from the queue)
*
* @param MessageInterface[] $messages
*
* @return void
*
* @throws FailedAcknowledgementException
*/
public function acknowledge(array $messages);

/**
* Attempt to reject all the following messages (make the message immediately visible to other consumers)
*
* @param MessageInterface[] $messages
*
* @return void
*
* @throws FailedAcknowledgementException
*/
public function reject(array $messages);

/**
* Remove up to {$limit} messages from the queue
*
* @param MessageFactoryInterface $factory
* @param int $limit
*
Expand All @@ -39,18 +56,26 @@ public function acknowledge(array $messages);
public function dequeue(MessageFactoryInterface $factory, $limit);

/**
* Add all the messages to the queue
*
* @param MessageInterface[] $messages
*
* @return void
*
* @throws FailedEnqueueException
*/
public function enqueue(array $messages);

/**
* Empty the queue
*
* @return void
*/
public function purge();

/**
* Delete the queue
*
* @return void
*/
public function delete();
Expand Down
15 changes: 15 additions & 0 deletions src/Adapter/ArrayAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace Graze\Queue\Adapter;

use ArrayIterator;
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;
use LimitIterator;
Expand Down Expand Up @@ -43,6 +44,20 @@ public function acknowledge(array $messages)
}));
}

/**
* Attempt to reject all the following messages (make the message immediately visible to other consumers)
*
* @param MessageInterface[] $messages
*
* @return void
*
* @throws FailedAcknowledgementException
*/
public function reject(array $messages)
{
// do nothing, timeouts not implemented, so messages are immediately available
}

/**
* @param MessageFactoryInterface $factory
* @param int $limit
Expand Down
35 changes: 25 additions & 10 deletions src/Adapter/FirehoseAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
namespace Graze\Queue\Adapter;

use Aws\Firehose\FirehoseClient;
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;

Expand All @@ -31,7 +31,7 @@
*/
final class FirehoseAdapter implements AdapterInterface
{
const BATCHSIZE_SEND = 100;
const BATCHSIZE_SEND = 100;

/** @var FirehoseClient */
protected $client;
Expand All @@ -44,8 +44,8 @@ final class FirehoseAdapter implements AdapterInterface

/**
* @param FirehoseClient $client
* @param string $deliveryStreamName
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
* @param string $deliveryStreamName
* @param array $options - BatchSize <integer> The number of messages to send in each batch.
*/
public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
{
Expand All @@ -68,6 +68,18 @@ public function acknowledge(array $messages)
);
}

/**
* @param MessageInterface[] $messages
*/
public function reject(array $messages)
{
throw new MethodNotSupportedException(
__FUNCTION__,
$this,
$messages
);
}

/**
* @param MessageFactoryInterface $factory
* @param int $limit
Expand Down Expand Up @@ -97,15 +109,18 @@ public function enqueue(array $messages)
);

foreach ($batches as $batch) {
$requestRecords = array_map(function (MessageInterface $message) {
return [
'Data' => $message->getBody()
];
}, $batch);
$requestRecords = array_map(
function (MessageInterface $message) {
return [
'Data' => $message->getBody(),
];
},
$batch
);

$request = [
'DeliveryStreamName' => $this->deliveryStreamName,
'Records' => $requestRecords,
'Records' => $requestRecords,
];

$results = $this->client->putRecordBatch($request);
Expand Down
Loading

0 comments on commit d469f2a

Please sign in to comment.