Skip to content

Commit

Permalink
feat: make xread count and block parameters configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Apr 19, 2023
1 parent 52a7347 commit af6922f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ packages/
*.userprefs
obj/
.vs/
/connector-java/src/test/java/io/zeebe/redis/ExporterMassDataTest.java
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ redisClient.shutdown();
#### Dealing with Lettuce versions

Under the hood the exporter and the connector uses Lettuce as Redis client.
Please be aware that the connector requires `io.lettuce:lettuce-core:6.2.2.RELEASE`. In case your project uses a parent POM with lower and potentially incompatible versions you have to take care to deactivate them.
Please be aware that the connector requires at minimum `io.lettuce:lettuce-core:6.2.2.RELEASE`. In case your project uses a parent POM with lower and potentially incompatible versions you have to take care to deactivate them.
E.g. do something like
```
<properties>
Expand Down Expand Up @@ -105,6 +105,18 @@ Enhanced exporter side algorithms can be found in the exporter's configuration s
Of course it is possible to combine this simple client side mechanism with the exporter mechanism.
Hence the choice is yours.

#### Tuning connector performance
*Since 0.9.6*

Reading Redis streams is carried out by using the `XREAD COUNT count BLOCK milliseconds ...` command. In order to tune
connector performance you are able to set the count (maximum number of messages read at once, default is 500) and block milliseconds (maximum blocking time, default is 2000) parameter:

```java
final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClient)
.xreadCount(500).xreadBlockMillis(2000)
...
```

## Install

### Docker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ZeebeRedis implements AutoCloseable {

private static final Map <String, Class<? extends com.google.protobuf.Message>> RECORD_MESSAGE_TYPES;
private static final int XREAD_BLOCK_MILLISECONDS = 2000;
private static final int XREAD_COUNT = 1000;
private static final int XREAD_COUNT = 500;

static {
RECORD_MESSAGE_TYPES = Map.ofEntries(
Expand Down Expand Up @@ -53,6 +53,10 @@ private static AbstractMap.SimpleEntry<String, Class<? extends com.google.protob

private StatefulRedisConnection<String, byte[]> redisConnection;

private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS;

private int xreadCount = XREAD_COUNT;

private String consumerGroup;

private String consumerId;
Expand Down Expand Up @@ -80,6 +84,7 @@ private static AbstractMap.SimpleEntry<String, Class<? extends com.google.protob
private ZeebeRedis(RedisClient redisClient,
StatefulRedisConnection<String, byte[]> redisConnection,
boolean reconnectUsesNewConnection, Duration reconnectInterval,
int xreadBlockMillis, int xreadCount,
String consumerGroup, String consumerId,
String prefix, XReadArgs.StreamOffset<String>[] offsets,
Map<String, List<Consumer<?>>> listeners,
Expand All @@ -89,6 +94,8 @@ private ZeebeRedis(RedisClient redisClient,
this.redisConnection = redisConnection;
this.reconnectUsesNewConnection = reconnectUsesNewConnection;
this.reconnectIntervalMillis = reconnectInterval.toMillis();
this.xreadBlockMillis = xreadBlockMillis;
this.xreadCount = xreadCount;
this.consumerGroup = consumerGroup;
this.consumerId = consumerId;
this.prefix = prefix;
Expand Down Expand Up @@ -222,7 +229,7 @@ private void readNext() {
try {
List<StreamMessage<String, byte[]>> messages = redisConnection.sync()
.xreadgroup(io.lettuce.core.Consumer.from(consumerGroup, consumerId),
XReadArgs.Builder.block(XREAD_BLOCK_MILLISECONDS).count(XREAD_COUNT), offsets);
XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets);

for (StreamMessage<String, byte[]> message : messages) {
LOGGER.trace("Consumer[id={}] received message {} from {}", consumerId, message.getId(), message.getStream());
Expand Down Expand Up @@ -312,6 +319,10 @@ public static class Builder {

private String offset = "0-0";

private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS;

private int xreadCount = XREAD_COUNT;

private boolean deleteMessages = false;

private Builder(RedisClient redisClient) {
Expand All @@ -328,6 +339,18 @@ public Builder reconnectInterval(Duration duration) {
return this;
}

/** Sets the XREAD [BLOCK milliseconds] parameter. Default is 2000. */
public Builder xreadBlockMillis(int xreadBlockMillis) {
this.xreadBlockMillis = xreadBlockMillis;
return this;
}

/** Sets the XREAD [COUNT count] parameter. Default is 1000. */
public Builder xreadCount(int xreadCount) {
this.xreadCount = xreadCount;
return this;
}

/** Set the consumer group, e.g. the application name. */
public Builder consumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
Expand Down Expand Up @@ -483,8 +506,9 @@ public ZeebeRedis build() {
});

final var zeebeRedis = new ZeebeRedis(redisClient, connection, reconnectUsesNewConnection, reconnectInterval,
consumerGroup, consumerId, prefix, offsets.toArray(new XReadArgs.StreamOffset[0]), listeners,
deleteMessages, shouldDestroyConsumerGroupOnClose);
xreadBlockMillis, xreadCount, consumerGroup, consumerId, prefix,
offsets.toArray(new XReadArgs.StreamOffset[0]), listeners, deleteMessages,
shouldDestroyConsumerGroupOnClose);
zeebeRedis.start();

return zeebeRedis;
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
</parent>

<properties>
<version.zeebe>8.2.1</version.zeebe>
<version.zeebe>8.2.2</version.zeebe>
<version.exporter.protobuf>1.4.0</version.exporter.protobuf>
<version.lettuce>6.2.3.RELEASE</version.lettuce>
<version.lettuce>6.2.4.RELEASE</version.lettuce>
<version.log4j>2.20.0</version.log4j>
<version.zeebe.testcontainers>3.5.2</version.zeebe.testcontainers>
<version.junit>5.9.2</version.junit>
Expand Down

0 comments on commit af6922f

Please sign in to comment.