diff --git a/README.md b/README.md index 889b2c3..8d983fd 100644 --- a/README.md +++ b/README.md @@ -114,8 +114,27 @@ Operation | Description [TOUCH](https://redis.io/commands/touch) *key [key ...]* | Alters the last access time of a key(s). A key is ignored if it does not exist. [TTL](https://redis.io/commands/ttl) *key* | Returns the remaining time to live of a key that has a timeout. [UNLINK](https://redis.io/commands/unlink) *key [key ...]* | This command is an alias to DEL. +[ZADD](https://redis.io/commands/zadd) *key [NX/XX] [GT/LT] [CH] [INCR] score member [score member ...]* | Adds all the specified members with the specified scores to the sorted set stored at key. [ZCARD](https://redis.io/commands/zcard) *key* | Returns the sorted set cardinality (number of elements) of the sorted set stored at key. +[ZCOUNT](https://redis.io/commands/zcount) *key min max* | Returns the number of elements in the sorted set at key with a score between min and max. +[ZINCRBY](https://redis.io/commands/zincrby) *key increment member* | Increments the score of member in the sorted set stored at key by increment. +[ZLEXCOUNT](https://redis.io/commands/zlexcount) *key min max* | When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, this command returns the number of elements in the sorted set at key with a value between min and max. +[ZMSCORE](https://redis.io/commands/zmscore) *key member [member ...]* | Returns the scores associated with the specified members in the sorted set stored at key. +[ZPOPMAX](https://redis.io/commands/zpopmax) *key [count]* | Removes and returns up to count members with the highest scores in the sorted set stored at key. +[ZPOPMIN](https://redis.io/commands/zpopmin) *key [count]* | Removes and returns up to count members with the lowest scores in the sorted set stored at key. +[ZRANDMEMBER](https://redis.io/commands/zrandmember) *key [count [WITHSCORES]]* | When called with just the key argument, return a random element from the sorted set value stored at key. +[ZRANGE](https://redis.io/commands/zrange) *key min max [BYSCORE/BYLEX] [REV] [LIMIT offset count] [WITHSCORES]* | Returns the specified range of elements in the sorted set stored at . +[ZRANGEBYLEX](https://redis.io/commands/zrangebylex) *key min max [LIMIT offset count]* | When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, this command returns all the elements in the sorted set at key with a value between min and max. +[ZRANGEBYSCORE](https://redis.io/commands/zrangebyscore) *key min max [WITHSCORES] [LIMIT offset count]* | Returns all the elements in the sorted set at key with a score between min and max (including elements with score equal to min or max). The elements are considered to be ordered from low to high scores. +[ZRANGESTORE](https://redis.io/commands/zrangestore) *dst src min max [BYSCORE/BYLEX] [REV] [LIMIT offset count]* | This command is like ZRANGE, but stores the result in the destination key. +[ZRANK](https://redis.io/commands/zrank) *key member* | Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high. [ZREM](https://redis.io/commands/zrem) *key member [member ...]* | Removes the specified members from the sorted set stored at key. +[ZREMRANGEBYLEX](https://redis.io/commands/zremrangebylex) *key min max* | When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, this command removes all elements in the sorted set stored at key between the lexicographical range specified by min and max. +[ZREMRANGEBYRANK](https://redis.io/commands/zremrangebyrank) *key start stop* | Removes all elements in the sorted set stored at key with rank between start and stop. +[ZREMRANGEBYSCORE](https://redis.io/commands/zremrangebyscore) *key min max* | Removes all elements in the sorted set stored at key with a score between min and max (inclusive). +[ZREVRANGE](https://redis.io/commands/zrevrange) *key start stop [WITHSCORES]* | Returns the specified range of elements in the sorted set stored at key. +[ZREVRANGEBYLEX](https://redis.io/commands/zrevrangebylex) *key max min [LIMIT offset count]* | Apart from the reversed ordering, ZREVRANGEBYLEX is similar to ZRANGEBYLEX. +[ZREVRANGEBYSCORE](https://redis.io/commands/zrevrangebyscore) *key max min [WITHSCORES] [LIMIT offset count]* | Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min). In contrary to the default ordering of sorted sets, for this command the elements are considered to be ordered from high to low scores. diff --git a/build.gradle.kts b/build.gradle.kts index c96efdf..8fe005d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "com.aerospike" -version = "0.5.0" +version = "0.6.0" repositories { mavenCentral() diff --git a/src/main/kotlin/com/aerospike/skyhook/command/RedisCommand.kt b/src/main/kotlin/com/aerospike/skyhook/command/RedisCommand.kt index 6279b20..9795831 100644 --- a/src/main/kotlin/com/aerospike/skyhook/command/RedisCommand.kt +++ b/src/main/kotlin/com/aerospike/skyhook/command/RedisCommand.kt @@ -70,8 +70,27 @@ import com.aerospike.skyhook.util.RedisCommandsDetails.timeCommand import com.aerospike.skyhook.util.RedisCommandsDetails.touchCommand import com.aerospike.skyhook.util.RedisCommandsDetails.ttlCommand import com.aerospike.skyhook.util.RedisCommandsDetails.unlinkCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zaddCommand import com.aerospike.skyhook.util.RedisCommandsDetails.zcardCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zcountCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zincrbyCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zlexcountCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zmscoreCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zpopmaxCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zpopminCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrandmemberCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrangeCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrangebylexCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrangebyscoreCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrangestoreCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrankCommand import com.aerospike.skyhook.util.RedisCommandsDetails.zremCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zremrangebylexCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zremrangebyrankCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zremrangebyscoreCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrevrangeCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrevrangebylexCommand +import com.aerospike.skyhook.util.RedisCommandsDetails.zrevrangebyscoreCommand import io.netty.channel.ChannelHandlerContext import io.netty.handler.codec.redis.ArrayHeaderRedisMessage import mu.KotlinLogging @@ -135,9 +154,12 @@ enum class RedisCommand(private val details: RedisCommandDetails?) { HGETALL(hgetallCommand), HVALS(hvalsCommand), HKEYS(hkeysCommand), + ZMSCORE(zmscoreCommand), + ZRANK(zrankCommand), SMEMBERS(smembersCommand), HINCRBY(hincrbyCommand), HINCRBYFLOAT(hincrbyfloatCommand), + ZINCRBY(zincrbyCommand), HSTRLEN(hstrlenCommand), HLEN(hlenCommand), SCARD(scardCommand), @@ -149,6 +171,22 @@ enum class RedisCommand(private val details: RedisCommandDetails?) { SINTER(sinterCommand), SUNIONSTORE(sunionstoreCommand), SINTERSTORE(sinterstoreCommand), + ZADD(zaddCommand), + ZPOPMAX(zpopmaxCommand), + ZPOPMIN(zpopminCommand), + ZRANDMEMBER(zrandmemberCommand), + ZCOUNT(zcountCommand), + ZLEXCOUNT(zlexcountCommand), + ZREMRANGEBYSCORE(zremrangebyscoreCommand), + ZREMRANGEBYRANK(zremrangebyrankCommand), + ZREMRANGEBYLEX(zremrangebylexCommand), + ZRANGE(zrangeCommand), + ZRANGESTORE(zrangestoreCommand), + ZREVRANGE(zrevrangeCommand), + ZRANGEBYSCORE(zrangebyscoreCommand), + ZREVRANGEBYSCORE(zrevrangebyscoreCommand), + ZRANGEBYLEX(zrangebylexCommand), + ZREVRANGEBYLEX(zrevrangebylexCommand), FLUSHDB(flushdbCommand), FLUSHALL(flushallCommand), diff --git a/src/main/kotlin/com/aerospike/skyhook/handler/NettyAerospikeHandler.kt b/src/main/kotlin/com/aerospike/skyhook/handler/NettyAerospikeHandler.kt index 41b56c7..5974f75 100644 --- a/src/main/kotlin/com/aerospike/skyhook/handler/NettyAerospikeHandler.kt +++ b/src/main/kotlin/com/aerospike/skyhook/handler/NettyAerospikeHandler.kt @@ -91,9 +91,12 @@ class NettyAerospikeHandler @Inject constructor( RedisCommand.HGETALL, RedisCommand.HVALS, RedisCommand.HKEYS, + RedisCommand.ZMSCORE, + RedisCommand.ZRANK, RedisCommand.SMEMBERS -> MapGetCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.HINCRBY, - RedisCommand.HINCRBYFLOAT -> HincrbyCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.HINCRBYFLOAT, + RedisCommand.ZINCRBY -> HincrbyCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.HSTRLEN -> HstrlenCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.HLEN, RedisCommand.SCARD, @@ -105,6 +108,22 @@ class NettyAerospikeHandler @Inject constructor( RedisCommand.SINTER -> SinterCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.SUNIONSTORE -> SunionstoreCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.SINTERSTORE -> SinterstoreCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZADD -> ZaddCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZPOPMAX -> ZpopmaxCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZPOPMIN -> ZpopminCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZRANDMEMBER -> ZrandmemberCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZCOUNT -> ZcountCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZLEXCOUNT -> ZlexcountCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREMRANGEBYSCORE -> ZremrangebyscoreCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREMRANGEBYRANK -> ZremrangebyrankCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREMRANGEBYLEX -> ZremrangebylexCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZRANGE -> ZrangeCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZRANGESTORE -> ZrangestoreCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREVRANGE -> ZrevrangeCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZRANGEBYSCORE -> ZrangebyscoreCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREVRANGEBYSCORE -> ZrevrangebyscoreCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZRANGEBYLEX -> ZrangebylexCommandListener(aerospikeCtx, ctx).handle(cmd) + RedisCommand.ZREVRANGEBYLEX -> ZrevrangebylexCommandListener(aerospikeCtx, ctx).handle(cmd) RedisCommand.FLUSHDB, RedisCommand.FLUSHALL -> FlushCommandHandler(aerospikeCtx, ctx).handle(cmd) @@ -129,7 +148,7 @@ class NettyAerospikeHandler @Inject constructor( is AerospikeException -> "internal error" else -> e.message } - log.warn { e } + log.warn(e) {} writeErrorString(ctx, msg) ctx.flush() } diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/BaseListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/BaseListener.kt index 7553ce9..bc073a3 100644 --- a/src/main/kotlin/com/aerospike/skyhook/listener/BaseListener.kt +++ b/src/main/kotlin/com/aerospike/skyhook/listener/BaseListener.kt @@ -35,6 +35,13 @@ abstract class BaseListener( updateOnlyPolicy } + @JvmStatic + internal val createOnlyPolicy = run { + val updateOnlyPolicy = getWritePolicy() + updateOnlyPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY + updateOnlyPolicy + } + @JvmStatic internal val defaultWritePolicy: WritePolicy = getWritePolicy() diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/HincrbyCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/HincrbyCommandListener.kt index 420054b..7d30745 100644 --- a/src/main/kotlin/com/aerospike/skyhook/listener/map/HincrbyCommandListener.kt +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/HincrbyCommandListener.kt @@ -6,6 +6,7 @@ import com.aerospike.client.Value import com.aerospike.client.cdt.MapOperation import com.aerospike.client.cdt.MapPolicy import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RedisCommand import com.aerospike.skyhook.command.RequestCommand import com.aerospike.skyhook.config.AerospikeContext import com.aerospike.skyhook.listener.BaseListener @@ -17,15 +18,17 @@ class HincrbyCommandListener( ctx: ChannelHandlerContext ) : BaseListener(aeroCtx, ctx), RecordListener { + @Volatile + private lateinit var command: RedisCommand + override fun handle(cmd: RequestCommand) { require(cmd.argCount == 4) { argValidationErrorMsg(cmd) } + command = cmd.command val key = createKey(cmd.key) - val mapKey: Value = Typed.getValue(cmd.args[2]) - val incrValue: Value = Typed.getValue(cmd.args[3]) val operation = MapOperation.increment( MapPolicy(), aeroCtx.bin, - mapKey, incrValue + getMapKey(cmd), getIncrValue(cmd) ) aeroCtx.client.operate( null, this, defaultWritePolicy, @@ -33,13 +36,30 @@ class HincrbyCommandListener( ) } + private fun getMapKey(cmd: RequestCommand): Value { + return when (cmd.command) { + RedisCommand.ZINCRBY -> Typed.getValue(cmd.args[3]) + else -> Typed.getValue(cmd.args[2]) + } + } + + private fun getIncrValue(cmd: RequestCommand): Value { + return when (cmd.command) { + RedisCommand.ZINCRBY -> Typed.getValue(cmd.args[2]) + else -> Typed.getValue(cmd.args[3]) + } + } + override fun onSuccess(key: Key?, record: Record?) { if (record == null) { - writeNullString(ctx) + writeErrorString(ctx, "failed to create a record") ctx.flush() } else { try { - writeResponse(record.bins[aeroCtx.bin]) + when (command) { + RedisCommand.ZINCRBY -> writeResponse(record.getLong(aeroCtx.bin).toString()) + else -> writeResponse(record.bins[aeroCtx.bin]) + } ctx.flush() } catch (e: Exception) { closeCtx(e) diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/MapGetCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/MapGetCommandListener.kt index 885339c..6537261 100644 --- a/src/main/kotlin/com/aerospike/skyhook/listener/map/MapGetCommandListener.kt +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/MapGetCommandListener.kt @@ -19,7 +19,11 @@ class MapGetCommandListener( ctx: ChannelHandlerContext ) : BaseListener(aeroCtx, ctx), RecordListener { + @Volatile + private lateinit var command: RedisCommand + override fun handle(cmd: RequestCommand) { + command = cmd.command val key = createKey(cmd.key) aeroCtx.client.operate( @@ -44,7 +48,16 @@ class MapGetCommandListener( MapReturnType.VALUE ) } - RedisCommand.HMGET -> { + RedisCommand.ZRANK -> { + require(cmd.argCount == 3) { argValidationErrorMsg(cmd) } + + val mapKey = Typed.getValue(cmd.args[2]) + MapOperation.getByKey( + aeroCtx.bin, mapKey, + MapReturnType.RANK + ) + } + RedisCommand.HMGET, RedisCommand.ZMSCORE -> { require(cmd.argCount >= 3) { argValidationErrorMsg(cmd) } val mapKeys = getValues(cmd) @@ -85,7 +98,7 @@ class MapGetCommandListener( override fun onSuccess(key: Key?, record: Record?) { if (record == null) { - writeNullString(ctx) + writeNull() ctx.flush() } else { try { @@ -97,6 +110,17 @@ class MapGetCommandListener( } } + private fun writeNull() { + when (command) { + RedisCommand.HGETALL, + RedisCommand.HVALS, + RedisCommand.HKEYS, + RedisCommand.SMEMBERS -> + writeEmptyList(ctx) + else -> writeNullString(ctx) + } + } + private fun marshalOutput(data: Any?): Any? { return when (data) { is Map<*, *> -> data.toList() @@ -104,9 +128,13 @@ class MapGetCommandListener( when (data.firstOrNull()) { is Map.Entry<*, *> -> data.map { it as Map.Entry<*, *> } .map { it.toPair().toList() }.flatten() - else -> data + else -> when (command) { + RedisCommand.ZMSCORE -> data.map { it.toString() } + else -> data + } } } + -1L -> if (command == RedisCommand.ZRANK) null else data else -> data } } diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZaddCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZaddCommandListener.kt new file mode 100644 index 0000000..167f828 --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZaddCommandListener.kt @@ -0,0 +1,157 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.Key +import com.aerospike.client.Operation +import com.aerospike.client.Record +import com.aerospike.client.Value +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapOrder +import com.aerospike.client.cdt.MapPolicy +import com.aerospike.client.cdt.MapWriteFlags +import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.listener.BaseListener +import com.aerospike.skyhook.util.Typed +import io.netty.channel.ChannelHandlerContext + +class ZaddCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : BaseListener(aeroCtx, ctx), RecordListener { + + private class ZaddCommand(val cmd: RequestCommand) { + var XX: Boolean = false + private set + var NX: Boolean = false + private set + var LT: Boolean = false + private set + var GT: Boolean = false + private set + var CH: Boolean = false + private set + var INCR: Boolean = false + private set + + lateinit var values: Map + private set + + init { + for (i in 2 until cmd.args.size) { + if (!setFlag(String(cmd.args[i]))) { + setSortedSetValues(i) + break + } + } + validate() + } + + private fun setFlag(flagStr: String): Boolean { + when (flagStr.toUpperCase()) { + "XX" -> XX = true + "NX" -> NX = true + "LT" -> LT = true + "GT" -> GT = true + "CH" -> CH = true + "INCR" -> INCR = true + else -> return false + } + return true + } + + private fun setSortedSetValues(from: Int) { + values = cmd.args.drop(from).chunked(2) + .map { (it1, it2) -> + Typed.getStringValue(it2) to + Value.LongValue(Typed.getLong(it1)) + } + .toMap() + } + + private fun validate() { + require(!(NX && XX)) { "[NX|XX]" } + require(!(GT && LT)) { "[GT|LT]" } + require(!LT) { "LT flag not supported" } + require(!GT) { "GT flag not supported" } + require(!CH) { "CH flag not supported" } + } + } + + @Volatile + private var size: Long = 0L + + @Volatile + private lateinit var zaddCommand: ZaddCommand + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount >= 4) { argValidationErrorMsg(cmd) } + + val key = createKey(cmd.key) + zaddCommand = ZaddCommand(cmd) + + val getSize = MapOperation.size(aeroCtx.bin) + size = aeroCtx.client.operate(defaultWritePolicy, key, getSize) + ?.getLong(aeroCtx.bin) ?: 0L + + aeroCtx.client.operate( + null, this, defaultWritePolicy, + key, getMapOperation() + ) + } + + private fun getMapOperation(): Operation { + return when { + zaddCommand.INCR -> { + require(zaddCommand.values.size == 1) { "INCR params" } + MapOperation.increment( + getMapPolicy(), + aeroCtx.bin, + zaddCommand.values.keys.first(), + zaddCommand.values.values.first() + ) + } + else -> { + MapOperation.putItems( + getMapPolicy(), + aeroCtx.bin, + zaddCommand.values + ) + } + } + } + + private fun getMapPolicy(): MapPolicy { + val writeFlag = when { + zaddCommand.XX -> { + MapWriteFlags.UPDATE_ONLY + } + zaddCommand.NX -> { + MapWriteFlags.CREATE_ONLY + } + else -> { + MapWriteFlags.DEFAULT + } + } + return MapPolicy(MapOrder.KEY_VALUE_ORDERED, writeFlag) + } + + override fun onSuccess(key: Key?, record: Record?) { + if (record == null) { + writeLong(ctx, 0L) + ctx.flush() + } else { + try { + if (zaddCommand.INCR) { + writeResponse(record.getString(aeroCtx.bin)) + } else { + val added = record.getLong(aeroCtx.bin) - size + writeLong(ctx, added) + } + ctx.flush() + } catch (e: Exception) { + closeCtx(e) + } + } + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZcountCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZcountCommandListener.kt new file mode 100644 index 0000000..e81baff --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZcountCommandListener.kt @@ -0,0 +1,133 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.* +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapReturnType +import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.listener.BaseListener +import com.aerospike.skyhook.util.Intervals +import com.aerospike.skyhook.util.Typed +import io.netty.channel.ChannelHandlerContext + +open class ZcountCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : BaseListener(aeroCtx, ctx), RecordListener { + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount == 4) { argValidationErrorMsg(cmd) } + + val key = createKey(cmd.key) + aeroCtx.client.operate( + null, this, null, + key, getOperation(cmd) + ) + } + + protected open fun getOperation(cmd: RequestCommand): Operation { + return MapOperation.getByValueRange( + aeroCtx.bin, + Value.get(Intervals.fromScore(String(cmd.args[2]))), + Value.get(Intervals.upScore(String(cmd.args[3]))), + MapReturnType.COUNT + ) + } + + override fun writeError(e: AerospikeException?) { + writeLong(ctx, 0L) + } + + override fun onSuccess(key: Key?, record: Record?) { + if (record == null) { + writeLong(ctx, 0L) + ctx.flush() + } else { + try { + writeLong(ctx, record.getLong(aeroCtx.bin)) + ctx.flush() + } catch (e: Exception) { + closeCtx(e) + } + } + } +} + +class ZlexcountCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZcountCommandListener(aeroCtx, ctx) { + + override fun getOperation(cmd: RequestCommand): Operation { + return MapOperation.getByKeyRange( + aeroCtx.bin, + Value.get(Intervals.fromLex(String(cmd.args[2]))), + Value.get(Intervals.upLex(String(cmd.args[3]))), + MapReturnType.COUNT + ) + } +} + +class ZremrangebyscoreCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZcountCommandListener(aeroCtx, ctx) { + + override fun getOperation(cmd: RequestCommand): Operation { + return MapOperation.removeByValueRange( + aeroCtx.bin, + Value.get(Intervals.fromScore(String(cmd.args[2]))), + Value.get(Intervals.upScore(String(cmd.args[3]))), + MapReturnType.COUNT + ) + } +} + +class ZremrangebyrankCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZcountCommandListener(aeroCtx, ctx) { + + override fun getOperation(cmd: RequestCommand): Operation { + val from = Typed.getInteger(cmd.args[2]) + val count = getCount(from, cmd) + return MapOperation.removeByRankRange( + aeroCtx.bin, + from, + count, + MapReturnType.COUNT + ) + } + + private fun getCount(from: Int, cmd: RequestCommand): Int { + val to = Typed.getInteger(cmd.args[3]) + return maxOf( + if (to < 0) { + val key = createKey(cmd.key) + val mapSize = aeroCtx.client.operate( + null, + key, MapOperation.size(aeroCtx.bin) + ).getInt(aeroCtx.bin) + (mapSize + to + 1) - from + } else { + (to - from) + 1 + }, 0 + ) + } +} + +class ZremrangebylexCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZcountCommandListener(aeroCtx, ctx) { + + override fun getOperation(cmd: RequestCommand): Operation { + return MapOperation.removeByKeyRange( + aeroCtx.bin, + Value.get(Intervals.fromLex(String(cmd.args[2]))), + Value.get(Intervals.upLex(String(cmd.args[3]))), + MapReturnType.COUNT + ) + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZpopCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZpopCommandListener.kt new file mode 100644 index 0000000..064de05 --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZpopCommandListener.kt @@ -0,0 +1,99 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.Key +import com.aerospike.client.Record +import com.aerospike.client.Value +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapReturnType +import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.listener.BaseListener +import io.netty.channel.ChannelHandlerContext +import java.util.* + +abstract class ZpopCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : BaseListener(aeroCtx, ctx), RecordListener { + + protected var count: Int = 0 + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount == 2 || cmd.argCount == 3) { + argValidationErrorMsg(cmd) + } + + val key = createKey(cmd.key) + setCount(cmd) + val record = aeroCtx.client.get(defaultWritePolicy, key).bins[aeroCtx.bin] + + aeroCtx.client.operate( + null, this, defaultWritePolicy, key, + MapOperation.removeByKeyList(aeroCtx.bin, getKeysToPop(record), MapReturnType.KEY_VALUE) + ) + } + + private fun setCount(cmd: RequestCommand) { + count = if (cmd.argCount == 3) String(cmd.args[2]).toInt() else 1 + } + + @Suppress("UNCHECKED_CAST") + private fun getKeysToPop(data: Any?): List { + val sorted = (data as TreeMap<*, Long>).toList() + .sortedBy { it.second }.map { it.first } + return take(sorted).map { Value.get(it) } + } + + protected abstract fun take(sorted: List): List + + override fun onSuccess(key: Key?, record: Record?) { + if (record == null) { + writeEmptyList(ctx) + ctx.flush() + } else { + try { + writeResponse(marshalOutput(record.bins[aeroCtx.bin])) + ctx.flush() + } catch (e: Exception) { + closeCtx(e) + } + } + } + + @Suppress("UNCHECKED_CAST") + private fun marshalOutput(data: Any?): List { + return sortOutput(data as List>) + .map { it.toPair().toList() }.flatten().map { it.toString() } + } + + protected abstract fun sortOutput(list: List>): List> +} + +class ZpopmaxCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZpopCommandListener(aeroCtx, ctx) { + + override fun take(sorted: List): List { + return sorted.takeLast(count) + } + + override fun sortOutput(list: List>): List> { + return list.sortedBy { it.value }.reversed() + } +} + +class ZpopminCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZpopCommandListener(aeroCtx, ctx) { + + override fun take(sorted: List): List { + return sorted.take(count) + } + + override fun sortOutput(list: List>): List> { + return list.sortedBy { it.value } + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrandmemberCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrandmemberCommandListener.kt new file mode 100644 index 0000000..681ce25 --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrandmemberCommandListener.kt @@ -0,0 +1,153 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.Key +import com.aerospike.client.Operation +import com.aerospike.client.Record +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapReturnType +import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.listener.BaseListener +import io.netty.channel.ChannelHandlerContext +import java.util.concurrent.atomic.AtomicInteger + +class ZrandmemberCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : BaseListener(aeroCtx, ctx), RecordListener { + + @Volatile + private lateinit var zrandmemberCommand: ZrandmemberCommand + + private class ZrandmemberCommand(val cmd: RequestCommand) { + + private val total: AtomicInteger = AtomicInteger() + + val count by lazy { + if (cmd.argCount == 2) 1 else String(cmd.args[2]).toInt() + } + + val withScores by lazy { + if (cmd.argCount < 4) { + false + } else { + require(String(cmd.args[3]) == "WITHSCORES") { + argValidationErrorMsg(cmd) + } + true + } + } + + val step by lazy { + if (withScores) 2 else 1 + } + + fun set(size: Int) { + total.set(size * step) + } + + fun get(): Int { + return total.get() + } + + fun decrementAndGet(): Int { + return total.addAndGet(-step) + } + } + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount >= 2 || cmd.argCount <= 4) { + argValidationErrorMsg(cmd) + } + + zrandmemberCommand = ZrandmemberCommand(cmd) + val key = createKey(cmd.key) + val indexList = getRandomList(key) + + zrandmemberCommand.set(indexList.size) + if (zrandmemberCommand.get() > 1) { + writeArrayHeader(ctx, zrandmemberCommand.get().toLong()) + } + for (i: Int in indexList) { + aeroCtx.client.operate( + null, this, defaultWritePolicy, + key, getMapOperation(i) + ) + } + } + + private fun getRandomList(key: Key): List { + val setSize = getSetSize(key) + return if (zrandmemberCommand.count >= 0) { + val count = minOf(zrandmemberCommand.count, setSize) + val mutableSet: MutableSet = mutableSetOf() + while (mutableSet.size < count) { + mutableSet.add((0 until setSize).random()) + } + mutableSet.toList() + } else { + val count = kotlin.math.abs(zrandmemberCommand.count) + val mutableList: MutableList = mutableListOf() + while (mutableList.size < count) { + mutableList.add((0 until setSize).random()) + } + mutableList.toList() + } + } + + private fun getMapOperation(i: Int): Operation { + val returnType = if (zrandmemberCommand.withScores) { + MapReturnType.KEY_VALUE + } else { + MapReturnType.KEY + } + return MapOperation.getByIndex(aeroCtx.bin, i, returnType) + } + + private fun getSetSize(key: Key): Int { + return aeroCtx.client.operate( + defaultWritePolicy, key, + MapOperation.size(aeroCtx.bin) + )?.getInt(aeroCtx.bin) ?: 0 + } + + override fun onSuccess(key: Key?, record: Record?) { + try { + if (record == null) { + writeNullString(ctx) + } else { + synchronized(zrandmemberCommand) { + writeResponse(record.bins[aeroCtx.bin]) + } + } + if (zrandmemberCommand.decrementAndGet() == 0) { + ctx.flush() + } + } catch (e: Exception) { + closeCtx(e) + } + } + + override fun writeResponse(mapped: Any?) { + when (mapped) { + is Map<*, *> -> mapped.toList().forEach { + super.writeResponse(it.first.toString()) + super.writeResponse(it.second.toString()) + } + is List<*> -> when (mapped.firstOrNull()) { + is Map.Entry<*, *> -> mapped.map { it as Map.Entry<*, *> }.map { it.toPair() } + .forEach { + super.writeResponse(it.first.toString()) + super.writeResponse(it.second.toString()) + } + else -> mapped.forEach { + super.writeResponse(it.toString()) + } + } + else -> { + super.writeResponse(mapped) + } + } + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangeCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangeCommandListener.kt new file mode 100644 index 0000000..769d40b --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangeCommandListener.kt @@ -0,0 +1,253 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.Key +import com.aerospike.client.Operation +import com.aerospike.client.Record +import com.aerospike.client.Value +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapReturnType +import com.aerospike.client.listener.RecordListener +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.listener.BaseListener +import com.aerospike.skyhook.util.Intervals +import io.netty.channel.ChannelHandlerContext + +open class ZrangeCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : BaseListener(aeroCtx, ctx), RecordListener { + + protected data class LimitArgument( + val offset: Int, + val count: Int, + ) + + protected class RangeCommand(val cmd: RequestCommand, flagIndex: Int) { + var BYSCORE: Boolean = false + var BYLEX: Boolean = false + var REV: Boolean = false + var LIMIT: LimitArgument? = null + private set + var WITHSCORES: Boolean = false + + val minScore: Int by lazy { + Intervals.fromScore(String(cmd.args[flagIndex - 2])) + } + + val maxScore: Int by lazy { + Intervals.upScore(String(cmd.args[flagIndex - 1])) + } + + val minLex: String by lazy { + Intervals.fromLex(String(cmd.args[flagIndex - 2])) + } + + val maxLex: String by lazy { + Intervals.upLex(String(cmd.args[flagIndex - 1])) + } + + val factor: Long by lazy { + if (WITHSCORES) 2L else 1L + } + + init { + var i = flagIndex + while (i < cmd.args.size) { + i += setFlag(i) + } + } + + private fun setFlag(i: Int): Int { + val flagStr = String(cmd.args[i]) + when (flagStr.toUpperCase()) { + "BYSCORE" -> BYSCORE = true + "BYLEX" -> BYLEX = true + "REV" -> REV = true + "WITHSCORES" -> WITHSCORES = true + "LIMIT" -> { + LIMIT = LimitArgument( + String(cmd.args[i + 1]).toInt(), + String(cmd.args[i + 2]).toInt(), + ) + return 3 + } + else -> throw IllegalArgumentException(flagStr) + } + return 1 + } + } + + @Volatile + protected lateinit var rangeCommand: RangeCommand + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount >= 4) { argValidationErrorMsg(cmd) } + + val key = createKey(cmd.key) + rangeCommand = RangeCommand(cmd, 4) + validateAndSet() + + aeroCtx.client.operate( + null, this, defaultWritePolicy, + key, getMapOperation() + ) + } + + protected open fun validateAndSet() { + require(!(rangeCommand.BYSCORE && rangeCommand.BYLEX)) { "[BYSCORE|BYLEX]" } + } + + protected open fun getMapReturnType(): Int { + return if (rangeCommand.WITHSCORES) { + MapReturnType.KEY_VALUE + } else { + MapReturnType.KEY + } + } + + protected open fun getMapOperation(): Operation { + return when { + rangeCommand.BYSCORE -> { + MapOperation.getByValueRange( + aeroCtx.bin, + Value.get(rangeCommand.minScore), + Value.get(rangeCommand.maxScore), getMapReturnType() + ) + } + rangeCommand.BYLEX -> { + MapOperation.getByKeyRange( + aeroCtx.bin, + Value.get(rangeCommand.minLex), + Value.get(rangeCommand.maxLex), getMapReturnType() + ) + } + else -> { + val from = if (rangeCommand.minScore == Int.MIN_VALUE) 0 else rangeCommand.minScore + val count = rangeCommand.maxScore - from + MapOperation.getByIndexRange( + aeroCtx.bin, + from, + count, getMapReturnType() + ) + } + } + } + + override fun onSuccess(key: Key?, record: Record?) { + try { + if (record == null) { + writeEmptyList(ctx) + } else { + writeResponse(record.bins[aeroCtx.bin]) + } + ctx.flush() + } catch (e: Exception) { + closeCtx(e) + } + } + + override fun writeResponse(mapped: Any?) { + when (mapped) { + is Map<*, *> -> applyFilters(mapped.toList()).forEach { + super.writeResponse(it.first.toString()) + super.writeResponse(it.second.toString()) + } + is List<*> -> when (mapped.firstOrNull()) { + is Map.Entry<*, *> -> applyFilters(mapped) + .map { it as Map.Entry<*, *> }.map { it.toPair() } + .forEach { + super.writeResponse(it.first.toString()) + super.writeResponse(it.second.toString()) + } + else -> applyFilters(mapped).forEach { + super.writeResponse(it.toString()) + } + } + } + } + + private fun applyFilters(list: List): List { + return list.let { + if (rangeCommand.REV) { + it.reversed() + } else { + it + } + }.let { l -> + rangeCommand.LIMIT?.let { + l.subList(it.offset, it.offset + it.count) + } ?: l + }.also { + writeArrayHeader(ctx, it.size * rangeCommand.factor) + } + } + + protected fun validateCommon() { + require(!rangeCommand.BYSCORE) { "BYSCORE flag not supported" } + require(!rangeCommand.BYLEX) { "BYLEX flag not supported" } + require(!rangeCommand.REV) { "REV flag not supported" } + } +} + +class ZrevrangeCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangeCommandListener(aeroCtx, ctx) { + + override fun validateAndSet() { + validateCommon() + require(rangeCommand.LIMIT?.let { false } ?: true) { "LIMIT flag not supported" } + + rangeCommand.REV = true + } +} + +open class ZrangebyscoreCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangeCommandListener(aeroCtx, ctx) { + + override fun validateAndSet() { + validateCommon() + + rangeCommand.BYSCORE = true + } +} + +class ZrevrangebyscoreCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangebyscoreCommandListener(aeroCtx, ctx) { + + override fun validateAndSet() { + super.validateAndSet() + + rangeCommand.REV = true + } +} + +open class ZrangebylexCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangeCommandListener(aeroCtx, ctx) { + + override fun validateAndSet() { + validateCommon() + require(!rangeCommand.WITHSCORES) { "WITHSCORES flag not supported" } + + rangeCommand.BYLEX = true + } +} + +class ZrevrangebylexCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangebylexCommandListener(aeroCtx, ctx) { + + override fun validateAndSet() { + super.validateAndSet() + + rangeCommand.REV = true + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangestoreCommandListener.kt b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangestoreCommandListener.kt new file mode 100644 index 0000000..3c80b01 --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/listener/map/ZrangestoreCommandListener.kt @@ -0,0 +1,61 @@ +package com.aerospike.skyhook.listener.map + +import com.aerospike.client.Key +import com.aerospike.client.Record +import com.aerospike.client.Value +import com.aerospike.client.cdt.MapOperation +import com.aerospike.client.cdt.MapPolicy +import com.aerospike.client.cdt.MapReturnType +import com.aerospike.skyhook.command.RequestCommand +import com.aerospike.skyhook.config.AerospikeContext +import com.aerospike.skyhook.util.Typed +import io.netty.channel.ChannelHandlerContext + +class ZrangestoreCommandListener( + aeroCtx: AerospikeContext, + ctx: ChannelHandlerContext +) : ZrangeCommandListener(aeroCtx, ctx) { + + override fun handle(cmd: RequestCommand) { + require(cmd.argCount >= 4) { argValidationErrorMsg(cmd) } + + val destKey = createKey(cmd.key) + val sourceKey = createKey(Value.get(cmd.args[2])) + rangeCommand = RangeCommand(cmd, 5) + validateAndSet() + + val record = aeroCtx.client.operate( + defaultWritePolicy, sourceKey, getMapOperation() + ) + + @Suppress("UNCHECKED_CAST") + val putOperation = MapOperation.putItems( + MapPolicy(), + aeroCtx.bin, + (record.bins[aeroCtx.bin] as List>).map { + Typed.getValue(it.key.toString().toByteArray()) to + Value.get(it.value) + }.toMap() + ) + aeroCtx.client.operate( + null, this, defaultWritePolicy, destKey, putOperation + ) + } + + override fun getMapReturnType(): Int { + return MapReturnType.KEY_VALUE + } + + override fun onSuccess(key: Key?, record: Record?) { + try { + if (record == null) { + writeLong(ctx, 0L) + } else { + writeLong(ctx, record.getLong(aeroCtx.bin)) + } + ctx.flush() + } catch (e: Exception) { + closeCtx(e) + } + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/util/Intervals.kt b/src/main/kotlin/com/aerospike/skyhook/util/Intervals.kt new file mode 100644 index 0000000..dc007f7 --- /dev/null +++ b/src/main/kotlin/com/aerospike/skyhook/util/Intervals.kt @@ -0,0 +1,71 @@ +package com.aerospike.skyhook.util + +object Intervals { + + private const val infHighest = "+inf" + private const val infLowest = "-inf" + + private const val lexHighest = "+" + private const val lexLowest = "-" + + private const val scoreExclusive = "(" + private const val lexExclusive = "[" + + fun fromScore(interval: String): Int { + return score(interval, 0, 1) + } + + fun upScore(interval: String): Int { + return score(interval, 1, 0) + } + + private fun score(interval: String, includeShift: Int, excludeShift: Int): Int { + return when (interval.toLowerCase()) { + infHighest -> Int.MAX_VALUE + infLowest -> Int.MIN_VALUE + else -> { + return if (interval.startsWith(scoreExclusive)) { + interval.drop(1).toInt() + excludeShift + } else { + interval.toInt() + includeShift + } + } + } + } + + fun fromLex(interval: String): String { + return lex(interval, true) + } + + fun upLex(interval: String): String { + return lex(interval, false) + } + + private fun lex(interval: String, from: Boolean): String { + return when (interval.toLowerCase()) { + lexHighest -> String(byteArrayOf(127)) + lexLowest -> String(byteArrayOf(0)) + else -> { + return if (interval.startsWith(lexExclusive)) { + if (from) { + fixLexExclusive(interval.drop(1)) + } else { + interval.drop(1) + } + } else { + if (from) { + interval + } else { + fixLexExclusive(interval) + } + } + } + } + } + + private fun fixLexExclusive(str: String): String { + val byteArray = str.toByteArray() + byteArray[byteArray.size - 1]++ + return String(byteArray) + } +} diff --git a/src/main/kotlin/com/aerospike/skyhook/util/RedisCommandsDetails.kt b/src/main/kotlin/com/aerospike/skyhook/util/RedisCommandsDetails.kt index 7d840dc..e703e65 100644 --- a/src/main/kotlin/com/aerospike/skyhook/util/RedisCommandsDetails.kt +++ b/src/main/kotlin/com/aerospike/skyhook/util/RedisCommandsDetails.kt @@ -68,6 +68,25 @@ object RedisCommandsDetails { val sinterCommand = RedisCommandDetails("sinter", -2, arrayListOf("readonly", "sort_for_script"), 1, -1, 1) val sunionstoreCommand = RedisCommandDetails("sunionstore", -3, arrayListOf("write", "denyoom"), 1, -1, 1) val sinterstoreCommand = RedisCommandDetails("sinterstore", -3, arrayListOf("write", "denyoom"), 1, -1, 1) + val zmscoreCommand = RedisCommandDetails("zmscore", -3, arrayListOf("readonly", "fast"), 1, 1, 1) + val zrankCommand = RedisCommandDetails("zrank", 3, arrayListOf("readonly", "fast"), 1, 1, 1) + val zincrbyCommand = RedisCommandDetails("zincrby", 4, arrayListOf("write", "denyoom", "fast"), 1, 1, 1) + val zaddCommand = RedisCommandDetails("zadd", -4, arrayListOf("write", "denyoom", "fast"), 1, 1, 1) + val zpopmaxCommand = RedisCommandDetails("zpopmax", -2, arrayListOf("write", "fast"), 1, 1, 1) + val zpopminCommand = RedisCommandDetails("zpopmin", -2, arrayListOf("write", "fast"), 1, 1, 1) + val zrandmemberCommand = RedisCommandDetails("zrandmember", -2, arrayListOf("readonly", "random"), 1, 1, 1) + val zcountCommand = RedisCommandDetails("zcount", 4, arrayListOf("readonly", "fast"), 1, 1, 1) + val zlexcountCommand = RedisCommandDetails("zlexcount", 4, arrayListOf("readonly", "fast"), 1, 1, 1) + val zremrangebyscoreCommand = RedisCommandDetails("zremrangebyscore", 4, arrayListOf("write"), 1, 1, 1) + val zremrangebyrankCommand = RedisCommandDetails("zremrangebyrank", 4, arrayListOf("write"), 1, 1, 1) + val zremrangebylexCommand = RedisCommandDetails("zremrangebylex", 4, arrayListOf("write"), 1, 1, 1) + val zrangeCommand = RedisCommandDetails("zrange", -4, arrayListOf("readonly"), 1, 1, 1) + val zrangestoreCommand = RedisCommandDetails("zrangestore", -5, arrayListOf("write", "denyoom"), 1, 2, 1) + val zrevrangeCommand = RedisCommandDetails("zrevrange", -4, arrayListOf("readonly"), 1, 1, 1) + val zrangebyscoreCommand = RedisCommandDetails("zrangebyscore", -4, arrayListOf("readonly"), 1, 1, 1) + val zrevrangebyscoreCommand = RedisCommandDetails("zrevrangebyscore", -4, arrayListOf("readonly"), 1, 1, 1) + val zrangebylexCommand = RedisCommandDetails("zrangebylex", -4, arrayListOf("readonly"), 1, 1, 1) + val zrevrangebylexCommand = RedisCommandDetails("zrevrangebylex", -4, arrayListOf("readonly"), 1, 1, 1) val flushdbCommand = RedisCommandDetails("flushdb", -1, arrayListOf("write"), 0, 0, 0) val flushallCommand = RedisCommandDetails("flushall", -1, arrayListOf("write"), 0, 0, 0) diff --git a/src/main/kotlin/com/aerospike/skyhook/util/Typed.kt b/src/main/kotlin/com/aerospike/skyhook/util/Typed.kt index 90a9fee..864d415 100644 --- a/src/main/kotlin/com/aerospike/skyhook/util/Typed.kt +++ b/src/main/kotlin/com/aerospike/skyhook/util/Typed.kt @@ -16,6 +16,10 @@ object Typed { return Value.StringValue(wireVal.toString(Charsets.UTF_8)) } + fun getStringValue(wireVal: ByteArray): Value { + return Value.StringValue(wireVal.toString(Charsets.UTF_8)) + } + fun getInteger(wireVal: ByteArray): Int { return String(wireVal).toInt() } diff --git a/src/test/kotlin/com/aerospike/skyhook/HashCommandsTest.kt b/src/test/kotlin/com/aerospike/skyhook/HashCommandsTest.kt index 495abe6..c3ff4b3 100644 --- a/src/test/kotlin/com/aerospike/skyhook/HashCommandsTest.kt +++ b/src/test/kotlin/com/aerospike/skyhook/HashCommandsTest.kt @@ -3,6 +3,7 @@ package com.aerospike.skyhook import com.aerospike.skyhook.command.RedisCommand import org.junit.jupiter.api.Test import kotlin.test.assertEquals +import kotlin.test.assertTrue class HashCommandsTest() : SkyhookIntegrationTestBase() { @@ -15,6 +16,17 @@ class HashCommandsTest() : SkyhookIntegrationTestBase() { } } + @Test + fun testHget() { + setup() + writeCommand("${RedisCommand.HGET.name} $_key key1") + assertEquals("val1", readFullBulkString()) + writeCommand("${RedisCommand.HGET.name} $_key key11") + assertEquals(nullString, readFullBulkString()) + writeCommand("${RedisCommand.HGET.name} ne key1") + assertEquals(nullString, readFullBulkString()) + } + @Test fun testHsetnx() { writeCommand("${RedisCommand.HSETNX.name} $_key key1 val1") @@ -65,6 +77,9 @@ class HashCommandsTest() : SkyhookIntegrationTestBase() { assertEquals("val2", r[3]) assertEquals("key3", r[4]) assertEquals("val3", r[5]) + writeCommand("${RedisCommand.HGETALL.name} ne") + val r2 = readStringArray() + assertTrue { r2.isEmpty() } } @Test @@ -75,6 +90,9 @@ class HashCommandsTest() : SkyhookIntegrationTestBase() { assertEquals("key1", r[0]) assertEquals("key2", r[1]) assertEquals("key3", r[2]) + writeCommand("${RedisCommand.HKEYS.name} ne") + val r2 = readStringArray() + assertTrue { r2.isEmpty() } } @Test @@ -85,6 +103,9 @@ class HashCommandsTest() : SkyhookIntegrationTestBase() { assertEquals("val1", r[0]) assertEquals("val2", r[1]) assertEquals("val3", r[2]) + writeCommand("${RedisCommand.HVALS.name} ne") + val r2 = readStringArray() + assertTrue { r2.isEmpty() } } @Test diff --git a/src/test/kotlin/com/aerospike/skyhook/SortedSetCommandsTest.kt b/src/test/kotlin/com/aerospike/skyhook/SortedSetCommandsTest.kt new file mode 100644 index 0000000..0d0fc8d --- /dev/null +++ b/src/test/kotlin/com/aerospike/skyhook/SortedSetCommandsTest.kt @@ -0,0 +1,411 @@ +package com.aerospike.skyhook + +import com.aerospike.skyhook.command.RedisCommand +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class SortedSetCommandsTest() : SkyhookIntegrationTestBase() { + + private val _key = "zset" + + private fun setup(n: Int = 3, key: String = _key, v: String = "val") { + for (i in 1..n) { + writeCommand("${RedisCommand.ZADD.name} $key $i $v$i") + assertEquals(1, readLong()) + } + } + + @Test + fun testZmscore() { + setup() + writeCommand("${RedisCommand.ZMSCORE.name} $_key val1 val2 val3") + val r = readStringArray() + assertTrue { r.size == 3 } + assertEquals("1", r[0]) + assertEquals("2", r[1]) + assertEquals("3", r[2]) + } + + @Test + fun testZcard() { + setup() + writeCommand("${RedisCommand.ZCARD.name} $_key") + assertEquals(3, readLong()) + } + + @Test + fun testZrem() { + setup() + writeCommand("${RedisCommand.ZREM.name} $_key val1") + assertEquals(1, readLong()) + writeCommand("${RedisCommand.ZREM.name} $_key val11") + assertEquals(0, readLong()) + writeCommand("${RedisCommand.ZCARD.name} $_key") + assertEquals(2, readLong()) + } + + @Test + fun testZincby() { + setup(1) + writeCommand("${RedisCommand.ZINCRBY.name} $_key 10 val1") + assertEquals("11", readFullBulkString()) + } + + @Test + fun testZrank() { + setup() + setup(3, _key, "v") + writeCommand("${RedisCommand.ZRANK.name} $_key v1") + assertEquals(0, readLong()) + writeCommand("${RedisCommand.ZRANK.name} $_key val3") + assertEquals(5, readLong()) + writeCommand("${RedisCommand.ZRANK.name} $_key val11") + assertEquals(nullString, readFullBulkString()) + writeCommand("${RedisCommand.ZRANK.name} ne val1") + assertEquals(nullString, readFullBulkString()) + } + + @Test + fun testZpopmax() { + setup() + setup(3, _key, "v") + writeCommand("${RedisCommand.ZPOPMAX.name} $_key 3") + val r = readStringArray() + assertTrue { r.size == 6 } + assertEquals("val3", r[0]) + assertEquals("3", r[1]) + assertEquals("v3", r[2]) + assertEquals("3", r[3]) + assertEquals("val2", r[4]) + assertEquals("2", r[5]) + + writeCommand("${RedisCommand.ZPOPMAX.name} $_key") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("v2", r2[0]) + assertEquals("2", r2[1]) + + writeCommand("${RedisCommand.ZPOPMAX.name} $_key 0") + val r3 = readStringArray() + assertTrue { r3.isEmpty() } + } + + @Test + fun testZpopmain() { + setup() + setup(3, _key, "v") + writeCommand("${RedisCommand.ZPOPMIN.name} $_key 3") + val r = readStringArray() + assertTrue { r.size == 6 } + assertEquals("v1", r[0]) + assertEquals("1", r[1]) + assertEquals("val1", r[2]) + assertEquals("1", r[3]) + assertEquals("v2", r[4]) + assertEquals("2", r[5]) + + writeCommand("${RedisCommand.ZPOPMIN.name} $_key") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val2", r2[0]) + assertEquals("2", r2[1]) + + writeCommand("${RedisCommand.ZPOPMIN.name} $_key 0") + val r3 = readStringArray() + assertTrue { r3.isEmpty() } + } + + @Test + fun testZrandmember() { + setup() + writeCommand("${RedisCommand.ZRANDMEMBER.name} $_key 5") + val r = readStringArray() + assertTrue { r.size == 3 } + + writeCommand("${RedisCommand.ZRANDMEMBER.name} $_key -5") + val r2 = readStringArray() + assertTrue { r2.size == 5 } + + writeCommand("${RedisCommand.ZRANDMEMBER.name} $_key 5 WITHSCORES") + val r3 = readStringArray() + assertTrue { r3.size == 6 } + + writeCommand("${RedisCommand.ZRANDMEMBER.name} $_key -5 WITHSCORES") + val r4 = readStringArray() + assertTrue { r4.size == 10 } + } + + @Test + fun testZcount() { + setup(6) + writeCommand("${RedisCommand.ZCOUNT.name} $_key -inf +inf") + assertEquals(6, readLong()) + writeCommand("${RedisCommand.ZCOUNT.name} $_key 2 (5") + assertEquals(3, readLong()) + writeCommand("${RedisCommand.ZCOUNT.name} $_key (1 4") + assertEquals(3, readLong()) + writeCommand("${RedisCommand.ZCOUNT.name} ne 1 2") + assertEquals(0, readLong()) + writeCommand("${RedisCommand.ZCOUNT.name} $_key a b") + assertTrue { readError().isNotEmpty() } + } + + @Test + fun testZlexcount() { + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} $_key - +") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} $_key b f") + assertEquals(5, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} $_key [c e") + assertEquals(2, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} $_key a [c") + assertEquals(2, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} $_key - [d") + assertEquals(3, readLong()) + writeCommand("${RedisCommand.ZLEXCOUNT.name} ne 1 2") + assertEquals(0, readLong()) + } + + @Test + fun testZremrangebylex() { + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZREMRANGEBYLEX.name} $_key - +") + assertEquals(7, readLong()) + + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZREMRANGEBYLEX.name} $_key b f") + assertEquals(5, readLong()) + + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(5, readLong()) + writeCommand("${RedisCommand.ZREMRANGEBYLEX.name} $_key [c e") + assertEquals(2, readLong()) + + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(2, readLong()) + writeCommand("${RedisCommand.ZREMRANGEBYLEX.name} $_key a [c") + assertEquals(2, readLong()) + + writeCommand("${RedisCommand.ZREMRANGEBYLEX.name} ne 1 2") + assertEquals(0, readLong()) + } + + @Test + fun testZremrangebyscore() { + setup(6) + writeCommand("${RedisCommand.ZREMRANGEBYSCORE.name} $_key -inf +inf") + assertEquals(6, readLong()) + writeCommand("${RedisCommand.ZCARD.name} $_key") + assertEquals(0, readLong()) + + clear() + setup(6) + writeCommand("${RedisCommand.ZREMRANGEBYSCORE.name} $_key 2 (5") + assertEquals(3, readLong()) + writeCommand("${RedisCommand.ZCARD.name} $_key") + assertEquals(3, readLong()) + + clear() + setup(6) + writeCommand("${RedisCommand.ZREMRANGEBYSCORE.name} $_key (1 4") + assertEquals(3, readLong()) + writeCommand("${RedisCommand.ZCARD.name} $_key") + assertEquals(3, readLong()) + + writeCommand("${RedisCommand.ZREMRANGEBYSCORE.name} ne 1 2") + assertEquals(0, readLong()) + writeCommand("${RedisCommand.ZREMRANGEBYSCORE.name} $_key a b") + assertTrue { readError().isNotEmpty() } + } + + @Test + fun testZremrangebyrank() { + setup() + setup(3, _key, "v") + writeCommand("${RedisCommand.ZREMRANGEBYRANK.name} $_key 3 4") + assertEquals(2, readLong()) + writeCommand("${RedisCommand.ZRANGE.name} $_key -inf +inf BYSCORE") + val r = readStringArray() + assertTrue { r.size == 4 } + assertEquals("v1", r[0]) + assertEquals("val1", r[1]) + assertEquals("v2", r[2]) + assertEquals("val3", r[3]) + + clear() + setup() + setup(3, _key, "v") + writeCommand("${RedisCommand.ZREMRANGEBYRANK.name} $_key 3 -2") + assertEquals(2, readLong()) + writeCommand("${RedisCommand.ZRANGE.name} $_key -inf +inf BYSCORE") + val r2 = readStringArray() + assertTrue { r2.size == 4 } + assertEquals("v1", r2[0]) + assertEquals("val1", r2[1]) + assertEquals("v2", r2[2]) + assertEquals("val3", r2[3]) + + writeCommand("${RedisCommand.ZREMRANGEBYRANK.name} $_key 3 -4") + assertEquals(0, readLong()) + + writeCommand("${RedisCommand.ZREMRANGEBYRANK.name} ne 3 4") + assertEquals(0, readLong()) + } + + @Test + fun testZrange() { + setup(6) + writeCommand("${RedisCommand.ZRANGE.name} $_key -inf +inf WITHSCORES") + assertEquals(12, readStringArray().size) + + writeCommand("${RedisCommand.ZRANGE.name} $_key (2 3 WITHSCORES") + val r = readStringArray() + assertTrue { r.size == 2 } + assertEquals("val4", r[0]) + assertEquals("4", r[1]) + + writeCommand("${RedisCommand.ZRANGE.name} $_key (2 4 BYSCORE") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val3", r2[0]) + assertEquals("val4", r2[1]) + + writeCommand("${RedisCommand.ZRANGE.name} $_key 3 (6 BYSCORE REV LIMIT 1 2") + val r3 = readStringArray() + assertTrue { r3.size == 2 } + assertEquals("val4", r3[0]) + assertEquals("val3", r3[1]) + } + + @Test + fun testZrangestore() { + setup(6) + writeCommand("${RedisCommand.ZRANGESTORE.name} zset2 $_key (2 3") + assertEquals(1, readLong()) + + writeCommand("${RedisCommand.ZRANGE.name} zset2 -inf +inf WITHSCORES") + val r = readStringArray() + assertTrue { r.size == 2 } + assertEquals("val4", r[0]) + assertEquals("4", r[1]) + + writeCommand("${RedisCommand.ZRANGESTORE.name} zset3 $_key (2 3 BYSCORE") + assertEquals(1, readLong()) + + writeCommand("${RedisCommand.ZRANGE.name} zset3 -inf +inf WITHSCORES") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val3", r2[0]) + assertEquals("3", r2[1]) + } + + @Test + fun testZrevrange() { + setup(6) + writeCommand("${RedisCommand.ZREVRANGE.name} $_key -inf +inf WITHSCORES") + assertEquals(12, readStringArray().size) + + writeCommand("${RedisCommand.ZREVRANGE.name} $_key (2 4 WITHSCORES") + val r = readStringArray() + assertTrue { r.size == 4 } + assertEquals("val5", r[0]) + assertEquals("5", r[1]) + assertEquals("val4", r[2]) + assertEquals("4", r[3]) + + writeCommand("${RedisCommand.ZREVRANGE.name} $_key (2 4") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val5", r2[0]) + assertEquals("val4", r2[1]) + } + + @Test + fun testZrangebyscore() { + setup(6) + writeCommand("${RedisCommand.ZRANGEBYSCORE.name} $_key -inf +inf WITHSCORES") + assertEquals(12, readStringArray().size) + + writeCommand("${RedisCommand.ZRANGEBYSCORE.name} $_key (2 4 WITHSCORES") + val r = readStringArray() + assertTrue { r.size == 4 } + assertEquals("val3", r[0]) + assertEquals("3", r[1]) + assertEquals("val4", r[2]) + assertEquals("4", r[3]) + + writeCommand("${RedisCommand.ZRANGEBYSCORE.name} $_key (2 4") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val3", r2[0]) + assertEquals("val4", r2[1]) + } + + @Test + fun testZrevrangebyscore() { + setup(6) + writeCommand("${RedisCommand.ZREVRANGEBYSCORE.name} $_key -inf +inf WITHSCORES") + assertEquals(12, readStringArray().size) + + writeCommand("${RedisCommand.ZREVRANGEBYSCORE.name} $_key (2 4 WITHSCORES") + val r = readStringArray() + assertTrue { r.size == 4 } + assertEquals("val4", r[0]) + assertEquals("4", r[1]) + assertEquals("val3", r[2]) + assertEquals("3", r[3]) + + writeCommand("${RedisCommand.ZREVRANGEBYSCORE.name} $_key (2 4") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("val4", r2[0]) + assertEquals("val3", r2[1]) + } + + @Test + fun testZrangebylex() { + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZRANGEBYLEX.name} $_key - + LIMIT 1 2") + assertEquals(2, readStringArray().size) + + writeCommand("${RedisCommand.ZRANGEBYLEX.name} $_key [a d") + val r = readStringArray() + assertTrue { r.size == 3 } + assertEquals("b", r[0]) + assertEquals("c", r[1]) + assertEquals("d", r[2]) + + writeCommand("${RedisCommand.ZRANGEBYLEX.name} $_key c [e") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("c", r2[0]) + assertEquals("d", r2[1]) + } + + @Test + fun testZrevrangebylex() { + writeCommand("${RedisCommand.ZADD.name} $_key 0 a 0 b 0 c 0 d 0 e 0 f 0 g") + assertEquals(7, readLong()) + writeCommand("${RedisCommand.ZREVRANGEBYLEX.name} $_key - +") + assertEquals(7, readStringArray().size) + + writeCommand("${RedisCommand.ZREVRANGEBYLEX.name} $_key [a d") + val r = readStringArray() + assertTrue { r.size == 3 } + assertEquals("d", r[0]) + assertEquals("c", r[1]) + assertEquals("b", r[2]) + + writeCommand("${RedisCommand.ZREVRANGEBYLEX.name} $_key c [e") + val r2 = readStringArray() + assertTrue { r2.size == 2 } + assertEquals("d", r2[0]) + assertEquals("c", r2[1]) + } +}