diff --git a/lib/aerospike/cdt/map_operation.rb b/lib/aerospike/cdt/map_operation.rb index b5e2d55d..b978fbeb 100644 --- a/lib/aerospike/cdt/map_operation.rb +++ b/lib/aerospike/cdt/map_operation.rb @@ -116,7 +116,7 @@ def initialize(op_type, map_op, bin_name, *arguments, ctx: nil, return_type: nil def self.create(bin_name, order, ctx: nil) if !ctx || ctx.length == 0 # If context not defined, the set order for top-level bin map. - self.set_policy(MapPolicy.new(order: order, flag: 0), bin_name) + self.set_policy(MapPolicy.new(order: order, flags: 0), bin_name) else MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, order[:attr], ctx: ctx, flag: order[:flag]) end diff --git a/lib/aerospike/client.rb b/lib/aerospike/client.rb index 0031025e..1b56c9c3 100644 --- a/lib/aerospike/client.rb +++ b/lib/aerospike/client.rb @@ -529,7 +529,7 @@ def execute_udf(key, package_name, function_name, args = [], options = nil) # This method is only supported by Aerospike 3 servers. # If the policy is nil, the default relevant policy will be used. def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil) - policy = create_policy(options, QueryPolicy, default_query_policy) + policy = create_policy(options, WritePolicy, default_write_policy) nodes = @cluster.nodes if nodes.empty? @@ -538,14 +538,12 @@ def execute_udf_on_query(statement, package_name, function_name, function_args = statement = statement.clone statement.set_aggregate_function(package_name, function_name, function_args, false) - # Use a thread per node nodes.each do |node| - partitions = node.cluster.node_partitions(node, statement.namespace) Thread.new do Thread.current.abort_on_exception = true begin - command = QueryCommand.new(node, policy, statement, nil, partitions) + command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id) execute_command(command) rescue => e Aerospike.logger.error(e) @@ -701,7 +699,6 @@ def scan_node(node, namespace, set_name, bin_names = nil, options = nil) # If the policy is nil, the default relevant policy will be used. def query_partitions(partition_filter, statement, options = nil) policy = create_policy(options, QueryPolicy, default_query_policy) - new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? diff --git a/lib/aerospike/command/batch_direct_command.rb b/lib/aerospike/command/batch_direct_command.rb index 77e4626d..3c96d82e 100644 --- a/lib/aerospike/command/batch_direct_command.rb +++ b/lib/aerospike/command/batch_direct_command.rb @@ -60,7 +60,7 @@ def write_buffer operation_count = bin_names.length end - write_header(policy, read_attr, 0, 2, operation_count) + write_header_read(policy, read_attr, 0, 2, operation_count) write_field_string(batch.namespace, Aerospike::FieldType::NAMESPACE) write_field_header(byte_size, Aerospike::FieldType::DIGEST_RIPE_ARRAY) diff --git a/lib/aerospike/command/batch_index_command.rb b/lib/aerospike/command/batch_index_command.rb index 1ca3572c..65835279 100644 --- a/lib/aerospike/command/batch_index_command.rb +++ b/lib/aerospike/command/batch_index_command.rb @@ -66,7 +66,7 @@ def write_buffer end end size_buffer - write_header(policy,read_attr | INFO1_BATCH, 0, field_count, 0) + write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0) write_predexp(@policy.predexp, predexp_size) diff --git a/lib/aerospike/command/command.rb b/lib/aerospike/command/command.rb index 535af93e..e8b62ce2 100644 --- a/lib/aerospike/command/command.rb +++ b/lib/aerospike/command/command.rb @@ -124,7 +124,7 @@ def set_write(policy, operation, key, bins) size_buffer - write_header_with_policy(policy, 0, INFO2_WRITE, field_count, bins.length) + write_header_write(policy, INFO2_WRITE, field_count, bins.length) write_key(key, policy) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -149,7 +149,7 @@ def set_delete(policy, key) field_count += 1 if exp_size > 0 size_buffer - write_header_with_policy(policy, 0, INFO2_WRITE | INFO2_DELETE, field_count, 0) + write_header_write(policy, INFO2_WRITE | INFO2_DELETE, field_count, 0) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -169,7 +169,7 @@ def set_touch(policy, key) estimate_operation_size size_buffer - write_header_with_policy(policy, 0, INFO2_WRITE, field_count, 1) + write_header_write(policy, INFO2_WRITE, field_count, 1) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -189,7 +189,7 @@ def set_exists(policy, key) field_count += 1 if exp_size > 0 size_buffer - write_header(policy, INFO1_READ | INFO1_NOBINDATA, 0, field_count, 0) + write_header_read_header(policy, INFO1_READ | INFO1_NOBINDATA, field_count, 0) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -208,7 +208,7 @@ def set_read_for_key_only(policy, key) field_count += 1 if exp_size > 0 size_buffer - write_header(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0) + write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -217,7 +217,7 @@ def set_read_for_key_only(policy, key) # Writes the command for get operations (specified bins) def set_read(policy, key, bin_names) - if bin_names && bin_names.length > 0 + if bin_names && !bin_names.empty? begin_cmd field_count = estimate_key_size(key) @@ -232,7 +232,12 @@ def set_read(policy, key, bin_names) end size_buffer - write_header(policy, INFO1_READ, 0, field_count, bin_names.length) + attr = INFO1_READ + if bin_names.empty? + attr |= INFO1_GET_ALL + end + + write_header_read(policy, attr, 0, field_count, bin_names.length) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -242,6 +247,7 @@ def set_read(policy, key, bin_names) end end_cmd + mark_compressed(policy) else set_read_for_key_only(policy, key) end @@ -258,20 +264,19 @@ def set_read_header(policy, key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 - estimate_operation_size_for_bin_name("") size_buffer # The server does not currently return record header data with _INFO1_NOBINDATA attribute set. # The workaround is to request a non-existent bin. # TODO: Fix this on server. #command.set_read(INFO1_READ | _INFO1_NOBINDATA); - write_header(policy, INFO1_READ, 0, field_count, 1) + write_header_read_header(policy, INFO1_READ|INFO1_NOBINDATA, field_count, 0) write_key(key) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) - write_operation_for_bin_name("", Aerospike::Operation::READ) end_cmd + mark_compressed(policy) end # Implements different command operations @@ -289,7 +294,7 @@ def set_operate(policy, key, args) size_buffer - write_header_with_policy(policy, args.read_attr, args.write_attr, field_count, args.operations.length) + write_header_read_write(policy, args.read_attr, args.write_attr, field_count, args.operations.length) write_key(key, policy) write_predexp(policy.predexp, predexp_size) write_filter_exp(policy.filter_exp, exp_size) @@ -317,7 +322,7 @@ def set_udf(policy, key, package_name, function_name, args) field_count += estimate_udf_size(package_name, function_name, arg_bytes) size_buffer - write_header(policy, 0, INFO2_WRITE, field_count, 0) + write_header_write(policy, INFO2_WRITE, field_count, 0) write_key(key, policy) write_predexp(policy.predexp, predexp_size) write_filter_exp(@policy.filter_exp, exp_size) @@ -329,7 +334,7 @@ def set_udf(policy, key, package_name, function_name, args) mark_compressed(policy) end - def set_scan(policy, namespace, set_name, bin_names, node_partitions) + def set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions) # Estimate buffer size begin_cmd field_count = 0 @@ -395,12 +400,17 @@ def set_scan(policy, namespace, set_name, bin_names, node_partitions) read_attr |= INFO1_NOBINDATA end + info_attr = 0 + if cluster.supports_feature?(Aerospike::Features::PARTITION_QUERY) + info_attr = INFO3_PARTITION_DONE + end + operation_count = 0 - if bin_names + unless bin_names.nil? operation_count = bin_names.length end - write_header(policy, read_attr, 0, field_count, operation_count) + write_header_read(policy, read_attr, info_attr, field_count, operation_count) if namespace write_field_string(namespace, Aerospike::FieldType::NAMESPACE) @@ -465,11 +475,13 @@ def set_scan(policy, namespace, set_name, bin_names, node_partitions) end_cmd end - def set_query(policy, statement, background, node_partitions) + + def set_query(cluster, policy, statement, background, node_partitions) function_arg_buffer = nil field_count = 0 filter_size = 0 + is_new = cluster.supports_feature?(Aerospike::Features::PARTITION_QUERY) begin_cmd if statement.namespace @@ -548,7 +560,7 @@ def set_query(policy, statement, background, node_partitions) @data_offset += statement.function_name.bytesize + FIELD_HEADER_SIZE function_arg_buffer = "" - if statement.function_args && statement.function_args.length > 0 + if statement.function_args && !statement.function_args.empty? function_arg_buffer = Value.of(statement.function_args).to_bytes end @data_offset += FIELD_HEADER_SIZE + function_arg_buffer.bytesize @@ -592,10 +604,13 @@ def set_query(policy, statement, background, node_partitions) field_count += 1 end - operations = statement.operations + unless statement.operations.nil? + operations = statement.operations + end + operation_count = 0 - if !operations.empty? + if operations unless background raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR) @@ -612,21 +627,24 @@ def set_query(policy, statement, background, node_partitions) operation_count = bin_names.length # Estimate size for selected bin names (query bin names already handled for old servers). end - + @data_buffer.reset size_buffer if background - write_header_with_policy(policy, 0, INFO2_WRITE, field_count, operation_count) + write_header_write(policy, INFO2_WRITE, field_count, operation_count) else read_attr = INFO1_READ read_attr |= INFO1_NOBINDATA unless policy.include_bin_data read_attr |= INFO1_SHORT_QUERY if policy.short_query - write_header(policy, read_attr, 0, field_count, operation_count) + info_attr = INFO3_PARTITION_DONE if is_new + write_header_read(policy, read_attr, info_attr, field_count, operation_count) end + write_field_string(statement.namespace, FieldType::NAMESPACE) if statement.namespace write_field_string(statement.set_name, FieldType::TABLE) if statement.set_name + # Write records per second. write_field_int(statement.records_per_second, FieldType::RECORDS_PER_SECOND) if statement.records_per_second > 0 @@ -670,7 +688,6 @@ def set_query(policy, statement, background, node_partitions) write_field_string(statement.function_name, FieldType::UDF_FUNCTION) write_field_string(function_arg_buffer, FieldType::UDF_ARGLIST) end - if parts_full_size > 0 write_field_header(parts_full_size, FieldType::PID_ARRAY) node_partitions.parts_full.each do |part| @@ -696,23 +713,32 @@ def set_query(policy, statement, background, node_partitions) write_field(max_records, FieldType::MAX_RECORDS) end - if operations.empty? - if bin_names.empty? - bin_names.each do |bin_name| - write_operation_for_bin_name(bin_name, Operation::READ) - end - end - else + if !operations.nil? operations.each do |operation| write_operation_for_operation(operation) end + elsif !bin_names.nil? && (is_new || filter.nil?) + bin_names.each do |bin_name| + write_operation_for_bin_name(bin_name, Operation::READ) + end end - end_cmd + end + + def hex_dump_from_string(data_buffer) + byte_array = data_buffer.to_s.unpack('C*') + hex_dump(byte_array) + end - nil + def hex_dump(byte_array) + byte_array.each_slice(16) do |slice| + hex_line = slice.map { |b| format('%02X', b) }.join(' ') + char_line = slice.map { |b| (b >= 32 && b <= 126) ? b.chr : '.' }.join + puts "#{hex_line.ljust(48)} | #{char_line}" + end end + def execute iterations = 0 @@ -764,7 +790,6 @@ def execute # Reset timeout in send buffer (destined for server) and socket. @data_buffer.write_int32((@policy.timeout * 1000).to_i, 22) - # Send command. begin @conn.write(@data_buffer, @data_offset) @@ -879,7 +904,7 @@ def estimate_operation_size end def estimate_predexp(predexp) - if predexp && predexp.size > 0 + if predexp && !predexp.empty? @data_offset += FIELD_HEADER_SIZE sz = Aerospike::PredExp.estimate_size(predexp) @data_offset += sz @@ -897,21 +922,52 @@ def estimate_expression_size(exp) 0 end - # Generic header write. - def write_header(policy, read_attr, write_attr, field_count, operation_count) - read_attr |= INFO1_CONSISTENCY_ALL if policy.consistency_level == Aerospike::ConsistencyLevel::CONSISTENCY_ALL - read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression + # Header write for write command + def write_header_write(policy, write_attr, field_count, operation_count) + # Set flags. + generation = Integer(0) + info_attr = Integer(0) + read_attr = Integer(0) + + case policy.record_exists_action + when Aerospike::RecordExistsAction::UPDATE + when Aerospike::RecordExistsAction::UPDATE_ONLY + info_attr |= INFO3_UPDATE_ONLY + when Aerospike::RecordExistsAction::REPLACE + info_attr |= INFO3_CREATE_OR_REPLACE + when Aerospike::RecordExistsAction::REPLACE_ONLY + info_attr |= INFO3_REPLACE_ONLY + when Aerospike::RecordExistsAction::CREATE_ONLY + write_attr |= INFO2_CREATE_ONLY + end + + case policy.generation_policy + when Aerospike::GenerationPolicy::NONE + when Aerospike::GenerationPolicy::EXPECT_GEN_EQUAL + generation = policy.generation + write_attr |= INFO2_GENERATION + when Aerospike::GenerationPolicy::EXPECT_GEN_GT + generation = policy.generation + write_attr |= INFO2_GENERATION_GT + end + info_attr |= INFO3_COMMIT_MASTER if policy.commit_level == Aerospike::CommitLevel::COMMIT_MASTER + write_attr |= INFO2_DURABLE_DELETE if policy.durable_delete # Write all header data except total size which must be written last. - @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message heade.length. + @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length. @data_buffer.write_byte(read_attr, 9) @data_buffer.write_byte(write_attr, 10) + @data_buffer.write_byte(info_attr, 11) + @data_buffer.write_byte(0, 12) # unused + @data_buffer.write_byte(0, 13) # clear the result code + @data_buffer.write_uint32(generation, 14) + @data_buffer.write_uint32(policy.ttl, 18) - i = 11 - while i <= 25 - @data_buffer.write_byte(0, i) - i = i.succ - end + # Initialize timeout. It will be written later. + @data_buffer.write_byte(0, 22) + @data_buffer.write_byte(0, 23) + @data_buffer.write_byte(0, 24) + @data_buffer.write_byte(0, 25) @data_buffer.write_int16(field_count, 26) @data_buffer.write_int16(operation_count, 28) @@ -920,7 +976,7 @@ def write_header(policy, read_attr, write_attr, field_count, operation_count) end # Header write for write operations. - def write_header_with_policy(policy, read_attr, write_attr, field_count, operation_count) + def write_header_read_write(policy, read_attr, write_attr, field_count, operation_count) # Set flags. generation = Integer(0) info_attr = Integer(0) @@ -948,9 +1004,9 @@ def write_header_with_policy(policy, read_attr, write_attr, field_count, operati end info_attr |= INFO3_COMMIT_MASTER if policy.commit_level == Aerospike::CommitLevel::COMMIT_MASTER - read_attr |= INFO1_CONSISTENCY_ALL if policy.consistency_level == Aerospike::ConsistencyLevel::CONSISTENCY_ALL write_attr |= INFO2_DURABLE_DELETE if policy.durable_delete read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression + # Write all header data except total size which must be written last. @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length. @data_buffer.write_byte(read_attr, 9) @@ -973,6 +1029,52 @@ def write_header_with_policy(policy, read_attr, write_attr, field_count, operati @data_offset = MSG_TOTAL_HEADER_SIZE end + def write_header_read(policy, read_attr, info_attr, field_count, operation_count) + read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression + #TODO: Add SC Mode + + @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length. + @data_buffer.write_byte(read_attr, 9) + @data_buffer.write_byte(0, 10) + @data_buffer.write_byte(info_attr, 11) + + (12...22).each { |i| @data_buffer.write_byte(i, 0) } + + # Initialize timeout. It will be written later. + @data_buffer.write_byte(0, 22) + @data_buffer.write_byte(0, 23) + @data_buffer.write_byte(0, 24) + @data_buffer.write_byte(0, 25) + + @data_buffer.write_int16(field_count, 26) + @data_buffer.write_int16(operation_count, 28) + + @data_offset = MSG_TOTAL_HEADER_SIZE + end + + def write_header_read_header(policy, read_attr, field_count, operation_count) + info_attr = Integer(0) + #TODO: Add SC Mode + + @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length. + @data_buffer.write_byte(read_attr, 9) + @data_buffer.write_byte(0, 10) + @data_buffer.write_byte(info_attr, 11) + + (12...22).each { |i| @data_buffer.write_byte(i, 0) } + + # Initialize timeout. It will be written later. + @data_buffer.write_byte(0, 22) + @data_buffer.write_byte(0, 23) + @data_buffer.write_byte(0, 24) + @data_buffer.write_byte(0, 25) + + @data_buffer.write_int16(field_count, 26) + @data_buffer.write_int16(operation_count, 28) + + @data_offset = MSG_TOTAL_HEADER_SIZE + end + def write_key(key, policy = nil) # Write key into buffer. if key.namespace @@ -1111,7 +1213,7 @@ def write_field_header(size, ftype) end def write_predexp(predexp, predexp_size) - if predexp && predexp.size > 0 + if predexp && !predexp.empty? write_field_header(predexp_size, Aerospike::FieldType::FILTER_EXP) @data_offset = Aerospike::PredExp.write( predexp, @data_buffer, @data_offset @@ -1152,13 +1254,13 @@ def compress_buffer compressed = Zlib::deflate(@data_buffer.buf, Zlib::DEFAULT_COMPRESSION) # write original size as header - proto_s = "%08d" % 0 + proto_s = format("%08d", 0) proto_s[0, 8] = [@data_offset].pack("q>") compressed.prepend(proto_s) # write proto proto = (compressed.size + 8) | Integer(CL_MSG_VERSION << 56) | Integer(AS_MSG_TYPE << 48) - proto_s = "%08d" % 0 + proto_s = format("%08d", 0) proto_s[0, 8] = [proto].pack("q>") compressed.prepend(proto_s) diff --git a/lib/aerospike/exp/exp.rb b/lib/aerospike/exp/exp.rb index f994ab65..b3f3a099 100644 --- a/lib/aerospike/exp/exp.rb +++ b/lib/aerospike/exp/exp.rb @@ -1,5 +1,5 @@ # encoding: utf-8 -# Copyright 2014-2022 Aerospike, Inc. +# Copyright 2014-2023 Aerospike, Inc. # # Portions may be licensed to Aerospike, Inc. under one or more contributor # license agreements. @@ -210,7 +210,7 @@ def self.hll_bin(name) # # Bin "a" exists in record # Exp.bin_exists("a") def self.bin_exists(name) - return Exp.ne(Exp.bin_type(name), Exp.int_val(0)) + Exp.ne(Exp.bin_type(name), Exp.int_val(0)) end # Create expression that returns bin's integer particle type:: @@ -223,6 +223,20 @@ def self.bin_type(name) CmdStr.new(BIN_TYPE, name) end + # Create expression that returns the record size. This expression usually evaluates + # quickly because record meta data is cached in memory. + # Requires server version 7.0+. This expression replaces {@link #deviceSize()} and + # {@link #memorySize()} since those older expressions are equivalent on server version 7.0+. + # + # {@code + # // Record size >= 100 KB + # Exp.ge(Exp.record_size(), Exp.val(100 * 1024)) + # } + def self.record_size + Cmd.new(RECORD_SIZE) + end + + #-------------------------------------------------- # Misc #-------------------------------------------------- @@ -1010,6 +1024,7 @@ def write(buf, offset) KEY_EXISTS = 71 IS_TOMBSTONE = 72 MEMORY_SIZE = 73 + RECORD_SIZE = 74 KEY = 80 BIN = 81 BIN_TYPE = 82 diff --git a/lib/aerospike/node.rb b/lib/aerospike/node.rb index d8fa4eb3..f95b9bb1 100644 --- a/lib/aerospike/node.rb +++ b/lib/aerospike/node.rb @@ -26,6 +26,11 @@ class Node PARTITIONS = 4096 FULL_HEALTH = 100 + HAS_PARTITION_SCAN = 1 << 0 + HAS_QUERY_SHOW = 1 << 1 + HAS_BATCH_ANY = 1 << 2 + HAS_PARTITION_QUERY = 1 << 3 + # Initialize server node with connection parameters. def initialize(cluster, nv) @@ -58,6 +63,14 @@ def initialize(cluster, nv) @connections = ::Aerospike::ConnectionPool.new(cluster, host) end + def partition_query? + (@features & HAS_PARTITION_QUERY) != 0 + end + + def query_show? + (@features & HAS_QUERY_SHOW) != 0 + end + def update_racks(parser) new_racks = parser.update_racks @racks.value = new_racks if new_racks diff --git a/lib/aerospike/query/query_command.rb b/lib/aerospike/query/query_command.rb index 79a8b412..f0149b29 100644 --- a/lib/aerospike/query/query_command.rb +++ b/lib/aerospike/query/query_command.rb @@ -23,9 +23,9 @@ module Aerospike class QueryCommand < StreamCommand #:nodoc: - def initialize(node, policy, statement, recordset, partitions) + def initialize(cluster, node, policy, statement, recordset, partitions) super(node) - + @cluster = cluster @policy = policy @statement = statement @recordset = recordset @@ -33,209 +33,7 @@ def initialize(node, policy, statement, recordset, partitions) end def write_buffer - fieldCount = 0 - filterSize = 0 - binNameSize = 0 - predSize = 0 - - begin_cmd - - if @statement.namespace - @data_offset += @statement.namespace.bytesize + FIELD_HEADER_SIZE - fieldCount+=1 - end - - if @statement.index_name - @data_offset += @statement.index_name.bytesize + FIELD_HEADER_SIZE - fieldCount+=1 - end - - if @statement.set_name - @data_offset += @statement.set_name.bytesize + FIELD_HEADER_SIZE - fieldCount+=1 - end - - if !is_scan? - col_type = @statement.filters[0].collection_type - if col_type > 0 - @data_offset += FIELD_HEADER_SIZE + 1 - fieldCount += 1 - end - - @data_offset += FIELD_HEADER_SIZE - filterSize+=1 # num filters - - @statement.filters.each do |filter| - sz = filter.estimate_size - filterSize += sz - end - @data_offset += filterSize - fieldCount+=1 - - if @statement.bin_names && @statement.bin_names.length > 0 - @data_offset += FIELD_HEADER_SIZE - binNameSize+=1 # num bin names - - @statement.bin_names.each do |bin_name| - binNameSize += bin_name.bytesize + 1 - end - @data_offset += binNameSize - fieldCount+=1 - end - else - @data_offset += @partitions.length * 2 + FIELD_HEADER_SIZE - fieldCount += 1 - - if @policy.records_per_second > 0 - @data_offset += 4 + FIELD_HEADER_SIZE - fieldCount += 1 - end - - # Calling query with no filters is more efficiently handled by a primary index scan. - # Estimate scan options size. - # @data_offset += (2 + FIELD_HEADER_SIZE) - # fieldCount+=1 - end - - @statement.set_task_id - - @data_offset += 8 + FIELD_HEADER_SIZE - fieldCount+=1 - - predexp = @policy.predexp || @statement.predexp - - if predexp - @data_offset += FIELD_HEADER_SIZE - predSize = Aerospike::PredExp.estimate_size(predexp) - @data_offset += predSize - fieldCount += 1 - end - - if @statement.function_name - @data_offset += FIELD_HEADER_SIZE + 1 # udf type - @data_offset += @statement.package_name.bytesize + FIELD_HEADER_SIZE - @data_offset += @statement.function_name.bytesize + FIELD_HEADER_SIZE - - if @statement.function_args && @statement.function_args.length > 0 - functionArgBuffer = Value.of(@statement.function_args).to_bytes - else - functionArgBuffer = '' - end - @data_offset += FIELD_HEADER_SIZE + functionArgBuffer.bytesize - fieldCount += 4 - end - - if @statement.filters.nil? || @statement.filters.empty? - if @statement.bin_names && @statement.bin_names.length > 0 - @statement.bin_names.each do |bin_name| - estimate_operation_size_for_bin_name(bin_name) - end - end - end - - size_buffer - - readAttr = @policy.include_bin_data ? INFO1_READ : INFO1_READ | INFO1_NOBINDATA - operation_count = (is_scan? && !@statement.bin_names.nil?) ? @statement.bin_names.length : 0 - - write_header(@policy, readAttr, 0, fieldCount, operation_count) - - if @statement.namespace - write_field_string(@statement.namespace, Aerospike::FieldType::NAMESPACE) - end - - unless @statement.index_name.nil? - write_field_string(@statement.index_name, Aerospike::FieldType::INDEX_NAME) - end - - if @statement.set_name - write_field_string(@statement.set_name, Aerospike::FieldType::TABLE) - end - - if !is_scan? - col_type = @statement.filters[0].collection_type - if col_type > 0 - write_field_header(1, Aerospike::FieldType::INDEX_TYPE) - @data_buffer.write_byte(col_type, @data_offset) - @data_offset+=1 - end - - write_field_header(filterSize, Aerospike::FieldType::INDEX_RANGE) - @data_buffer.write_byte(@statement.filters.length, @data_offset) - @data_offset+=1 - - @statement.filters.each do |filter| - @data_offset = filter.write(@data_buffer, @data_offset) - end - - # Query bin names are specified as a field (Scan bin names are specified later as operations) - if @statement.bin_names && @statement.bin_names.length > 0 - write_field_header(binNameSize, Aerospike::FieldType::QUERY_BINLIST) - @data_buffer.write_byte(@statement.bin_names.length, @data_offset) - @data_offset += 1 - - @statement.bin_names.each do |bin_name| - len = @data_buffer.write_binary(bin_name, @data_offset + 1) - @data_buffer.write_byte(len, @data_offset) - @data_offset += len + 1; - end - end - else - write_field_header(@partitions.length * 2, Aerospike::FieldType::PID_ARRAY) - for pid in @partitions - @data_buffer.write_uint16_little_endian(pid, @data_offset) - @data_offset += 2 - end - - if @policy.records_per_second > 0 - write_field_int(@policy.records_per_second, Aerospike::FieldType::RECORDS_PER_SECOND) - end - - # Calling query with no filters is more efficiently handled by a primary index scan. - # write_field_header(2, Aerospike::FieldType::SCAN_OPTIONS) - # priority = @policy.priority.ord - # priority = priority << 4 - # @data_buffer.write_byte(priority, @data_offset) - # @data_offset+=1 - # @data_buffer.write_byte(100.ord, @data_offset) - # @data_offset+=1 - end - - write_field_header(8, Aerospike::FieldType::TRAN_ID) - @data_buffer.write_int64(@statement.task_id, @data_offset) - @data_offset += 8 - - if predexp - write_field_header(predSize, Aerospike::FieldType::PREDEXP) - @data_offset = Aerospike::PredExp.write( - predexp, @data_buffer, @data_offset - ) - end - - if @statement.function_name - write_field_header(1, Aerospike::FieldType::UDF_OP) - if @statement.return_data - @data_buffer.write_byte(1, @data_offset) - @data_offset+=1 - else - @data_buffer.write_byte(2, @data_offset) - @data_offset+=1 - end - - write_field_string(@statement.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME) - write_field_string(@statement.function_name, Aerospike::FieldType::UDF_FUNCTION) - write_field_bytes(functionArgBuffer, Aerospike::FieldType::UDF_ARGLIST) - end - - if is_scan? && !@statement.bin_names.nil? - @statement.bin_names.each do |bin_name| - write_operation_for_bin_name(bin_name, Aerospike::Operation::READ) - end - end - - end_cmd - - return nil + set_query(@cluster, @policy, @statement, false, @partitions) end def is_scan? diff --git a/lib/aerospike/query/query_executor.rb b/lib/aerospike/query/query_executor.rb index c55cfa0a..cd1c387d 100644 --- a/lib/aerospike/query/query_executor.rb +++ b/lib/aerospike/query/query_executor.rb @@ -34,7 +34,7 @@ def self.query_partitions(cluster, policy, tracker, statement, recordset) list.each do |node_partition| threads << Thread.new do Thread.current.abort_on_exception = true - command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition) + command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition) begin command.execute rescue => e @@ -48,7 +48,7 @@ def self.query_partitions(cluster, policy, tracker, statement, recordset) else # Use a single thread for all nodes for all node list.each do |node_partition| - command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition) + command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition) begin command.execute rescue => e diff --git a/lib/aerospike/query/query_partition_command.rb b/lib/aerospike/query/query_partition_command.rb index 1a15a5da..d56814c4 100644 --- a/lib/aerospike/query/query_partition_command.rb +++ b/lib/aerospike/query/query_partition_command.rb @@ -21,243 +21,17 @@ module Aerospike private class QueryPartitionCommand < QueryCommand #:nodoc: - def initialize(node, tracker, policy, statement, recordset, node_partitions) - super(node, policy, statement, recordset, @node_partitions) + def initialize(cluster, node, tracker, policy, statement, recordset, node_partitions) + super(cluster, node, policy, statement, recordset, @node_partitions) @node_partitions = node_partitions @tracker = tracker end def write_buffer - function_arg_buffer = nil - field_count = 0 - filter_size = 0 - bin_name_size = 0 - - begin_cmd - - if @statement.namespace - @data_offset += @statement.namespace.bytesize + FIELD_HEADER_SIZE - field_count += 1 - end - - if @statement.set_name - @data_offset += @statement.set_name.bytesize + FIELD_HEADER_SIZE - field_count += 1 - end - - # Estimate recordsPerSecond field size. This field is used in new servers and not used - # (but harmless to add) in old servers. - if @policy.records_per_second > 0 - @data_offset += 4 + FIELD_HEADER_SIZE - field_count += 1 - end - - # Estimate socket timeout field size. This field is used in new servers and not used - # (but harmless to add) in old servers. - @data_offset += 4 + FIELD_HEADER_SIZE - field_count += 1 - - # Estimate task_id field. - @data_offset += 8 + FIELD_HEADER_SIZE - field_count += 1 - - filter = @statement.filters[0] - bin_names = @statement.bin_names - packed_ctx = nil - - if filter - col_type = filter.collection_type - - # Estimate INDEX_TYPE field. - if col_type > 0 - @data_offset += FIELD_HEADER_SIZE + 1 - field_count += 1 - end - - # Estimate INDEX_RANGE field. - @data_offset += FIELD_HEADER_SIZE - filter_size += 1 # num filters - filter_size += filter.estimate_size - - @data_offset += filter_size - field_count += 1 - - packed_ctx = filter.packed_ctx - if packed_ctx - @data_offset += FIELD_HEADER_SIZE + packed_ctx.length - field_count += 1 - end - end - - @statement.set_task_id - predexp = @policy.predexp || @statement.predexp - - if predexp - @data_offset += FIELD_HEADER_SIZE - pred_size = Aerospike::PredExp.estimate_size(predexp) - @data_offset += pred_size - field_count += 1 - end - - unless @policy.filter_exp.nil? - exp_size = estimate_expression_size(@policy.filter_exp) - field_count += 1 if exp_size > 0 - end - - # Estimate aggregation/background function size. - if @statement.function_name - @data_offset += FIELD_HEADER_SIZE + 1 # udf type - @data_offset += @statement.package_name.bytesize + FIELD_HEADER_SIZE - @data_offset += @statement.function_name.bytesize + FIELD_HEADER_SIZE - - function_arg_buffer = "" - if @statement.function_args && @statement.function_args.length > 0 - function_arg_buffer = Value.of(@statement.function_args).to_bytes - end - @data_offset += FIELD_HEADER_SIZE + function_arg_buffer.bytesize - field_count += 4 - end - - max_records = 0 - parts_full_size = 0 - parts_partial_digest_size = 0 - parts_partial_bval_size = 0 - - unless @node_partitions.nil? - parts_full_size = @node_partitions.parts_full.length * 2 - parts_partial_digest_size = @node_partitions.parts_partial.length * 20 - - unless filter.nil? - parts_partial_bval_size = @node_partitions.parts_partial.length * 8 - end - max_records = @node_partitions.record_max - end - - if parts_full_size > 0 - @data_offset += parts_full_size + FIELD_HEADER_SIZE - field_count += 1 - end - - if parts_partial_digest_size > 0 - @data_offset += parts_partial_digest_size + FIELD_HEADER_SIZE - field_count += 1 - end - - if parts_partial_bval_size > 0 - @data_offset += parts_partial_bval_size + FIELD_HEADER_SIZE - field_count += 1 - end - - # Estimate max records field size. This field is used in new servers and not used - # (but harmless to add) in old servers. - if max_records > 0 - @data_offset += 8 + FIELD_HEADER_SIZE - field_count += 1 - end - - operation_count = 0 - unless bin_names.empty? - # Estimate size for selected bin names (query bin names already handled for old servers). - bin_names.each do |bin_name| - estimate_operation_size_for_bin_name(bin_name) - end - operation_count = bin_names.length - end - - projected_offset = @data_offset - - size_buffer - - read_attr = INFO1_READ - read_attr |= INFO1_NOBINDATA if !@policy.include_bin_data - read_attr |= INFO1_SHORT_QUERY if @policy.short_query - - infoAttr = INFO3_PARTITION_DONE - - write_header(@policy, read_attr, 0, field_count, operation_count) - - write_field_string(@statement.namespace, FieldType::NAMESPACE) if @statement.namespace - write_field_string(@statement.set_name, FieldType::TABLE) if @statement.set_name - - # Write records per second. - write_field_int(@policy.records_per_second, FieldType::RECORDS_PER_SECOND) if @policy.records_per_second > 0 - - write_filter_exp(@policy.filter_exp, exp_size) - - # Write socket idle timeout. - write_field_int(@policy.socket_timeout, FieldType::SOCKET_TIMEOUT) - - # Write task_id field - write_field_int64(@statement.task_id, FieldType::TRAN_ID) - - unless predexp.nil? - write_field_header(pred_size, Aerospike::FieldType::PREDEXP) - @data_offset = Aerospike::PredExp.write( - predexp, @data_buffer, @data_offset - ) - end - - if filter - type = filter.collection_type - - if type > 0 - write_field_header(1, FieldType::INDEX_TYPE) - @data_offset += @data_buffer.write_byte(type, @data_offset) - end - - write_field_header(filter_size, FieldType::INDEX_RANGE) - @data_offset += @data_buffer.write_byte(1, @data_offset) - @data_offset = filter.write(@data_buffer, @data_offset) - - if packed_ctx - write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT) - @data_offset += @data_buffer.write_binary(packed_ctx, @data_offset) - end - end - - if @statement.function_name - write_field_header(1, FieldType::UDF_OP) - @data_offset += @data_buffer.write_byte(1, @data_offset) - write_field_string(@statement.package_name, FieldType::UDF_PACKAGE_NAME) - write_field_string(@statement.function_name, FieldType::UDF_FUNCTION) - write_field_string(function_arg_buffer, FieldType::UDF_ARGLIST) - end - - if parts_full_size > 0 - write_field_header(parts_full_size, FieldType::PID_ARRAY) - @node_partitions.parts_full.each do |part| - @data_offset += @data_buffer.write_uint16_little_endian(part.id, @data_offset) - end - end - - if parts_partial_digest_size > 0 - write_field_header(parts_partial_digest_size, FieldType::DIGEST_ARRAY) - @node_partitions.parts_partial.each do |part| - @data_offset += @data_buffer.write_binary(part.digest, @data_offset) - end - end - - if parts_partial_bval_size > 0 - write_field_header(parts_partial_bval_size, FieldType::BVAL_ARRAY) - @node_partitions.parts_partial.each do |part| - @data_offset += @data_buffer.write_uint64_little_endian(part.bval, @data_offset) - end - end - - if max_records > 0 - write_field(max_records, FieldType::MAX_RECORDS) - end - - unless bin_names.empty? - bin_names.each do |bin_name| - write_operation_for_bin_name(bin_name, Operation::READ) - end - end + set_query(@cluster, @policy, @statement, false, @node_partitions) + end - end_cmd - nil - end def should_retry(e) # !! converts nil to false diff --git a/lib/aerospike/query/scan_executor.rb b/lib/aerospike/query/scan_executor.rb index a28d67c3..583f232b 100644 --- a/lib/aerospike/query/scan_executor.rb +++ b/lib/aerospike/query/scan_executor.rb @@ -34,7 +34,7 @@ def self.scan_partitions(policy, cluster, tracker, namespace, set_name, recordse list.each do |node_partition| threads << Thread.new do Thread.current.abort_on_exception = true - command = ScanPartitionCommand.new(policy, tracker, node_partition, namespace, set_name, bin_names, recordset) + command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset) begin command.execute rescue => e @@ -48,7 +48,7 @@ def self.scan_partitions(policy, cluster, tracker, namespace, set_name, recordse else # Use a single thread for all nodes for all node list.each do |node_partition| - command = ScanPartitionCommand.new(policy, tracker, node_partition, namespace, set_name, bin_names, recordset) + command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset) begin command.execute rescue => e diff --git a/lib/aerospike/query/scan_partition_command.rb b/lib/aerospike/query/scan_partition_command.rb index c5440032..a2833bd5 100644 --- a/lib/aerospike/query/scan_partition_command.rb +++ b/lib/aerospike/query/scan_partition_command.rb @@ -23,9 +23,9 @@ module Aerospike class ScanPartitionCommand < StreamCommand #:nodoc: - def initialize(policy, tracker, node_partitions, namespace, set_name, bin_names, recordset) + def initialize(cluster, policy, tracker, node_partitions, namespace, set_name, bin_names, recordset) super(node_partitions.node) - + @cluster = cluster @policy = policy @namespace = namespace @set_name = set_name @@ -36,7 +36,7 @@ def initialize(policy, tracker, node_partitions, namespace, set_name, bin_names, end def write_buffer - set_scan(@policy, @namespace, @set_name, @bin_names, @node_partitions) + set_scan(@cluster, @policy, @namespace, @set_name, @bin_names, @node_partitions) end def should_retry(e) diff --git a/lib/aerospike/query/server_command.rb b/lib/aerospike/query/server_command.rb index 81868973..26b1a0ce 100644 --- a/lib/aerospike/query/server_command.rb +++ b/lib/aerospike/query/server_command.rb @@ -33,10 +33,10 @@ def write? end def write_buffer - set_query(@policy, @statement, background, nil) + set_query(@cluster, @policy, @statement, true, nil) end - def parse_row + def parse_row(result_code) field_count = @data_buffer.read_int16(18) result_code = @data_buffer.read(5).ord & 0xFF skip_key(field_count) diff --git a/lib/aerospike/task/execute_task.rb b/lib/aerospike/task/execute_task.rb index 7e8a2b62..ee1e878b 100644 --- a/lib/aerospike/task/execute_task.rb +++ b/lib/aerospike/task/execute_task.rb @@ -1,4 +1,4 @@ -# Copyright 2013-2020 Aerospike, Inc. +# Copyright 2013-2023 Aerospike, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ def all_nodes_done? end conn = node.get_connection(0) - responseMap, _ = Info.request(conn, command) + responseMap, = Info.request(conn, command) node.put_connection(conn) response = responseMap[command] diff --git a/lib/aerospike/version.rb b/lib/aerospike/version.rb index 72d9a4f5..8f7a315d 100644 --- a/lib/aerospike/version.rb +++ b/lib/aerospike/version.rb @@ -1,4 +1,4 @@ # encoding: utf-8 module Aerospike - VERSION = "2.29.0" + VERSION = "2.30.0" end diff --git a/spec/aerospike/exp/expression_spec.rb b/spec/aerospike/exp/expression_spec.rb index d8208d2e..f57f5143 100644 --- a/spec/aerospike/exp/expression_spec.rb +++ b/spec/aerospike/exp/expression_spec.rb @@ -1,5 +1,5 @@ # encoding: utf-8 -# Copyright 2014 Aerospike, Inc. +# Copyright 2014-2023 Aerospike, Inc. # # Portions may be licensed to Aerospike, Inc. under one or more contributor # license agreements. @@ -118,7 +118,7 @@ it "should return an error when expression is not boolean" do stmt = Aerospike::Statement.new(@namespace, @set) stmt.filters << Aerospike::Filter.Range("intval", 0, 400) - opts = { filter_exp: (Aerospike::Exp.int_val(100)) } + opts = { filter_exp: Aerospike::Exp.int_val(100) } expect { rs = client.query(stmt, opts) rs.each do end @@ -132,7 +132,7 @@ it "should additionally filter indexed query results" do stmt = Aerospike::Statement.new(@namespace, @set) stmt.filters << Aerospike::Filter.Range("intval", 0, 400) - opts = { filter_exp: (Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8))) } + opts = { filter_exp: Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8)) } # The query clause selects [0, 1, ... 400, 401] The predexp # only takes mod 8 and 9, should be 2 pre decade or 80 total. @@ -147,7 +147,7 @@ it "should work for implied scans" do stmt = Aerospike::Statement.new(@namespace, @set) - opts = { filter_exp: (Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001"))) } + opts = { filter_exp: Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001")) } rs = client.query(stmt, opts) count = 0 @@ -159,7 +159,7 @@ it "expression and or and not must all work" do stmt = Aerospike::Statement.new(@namespace, @set) - opts = { filter_exp: (Aerospike::Exp.or( + opts = { filter_exp: Aerospike::Exp.or( Aerospike::Exp.and( Aerospike::Exp.not(Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001"))), Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8)), @@ -167,7 +167,7 @@ Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0104")), Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0105")), Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0106")), - )) } + ) } rs = client.query(stmt, opts) count = 0 @@ -176,6 +176,20 @@ end expect(count).to eq 203 end + + it "should query record size" do + stmt = Aerospike::Statement.new(@namespace, @set) + stmt.filters << Aerospike::Filter.Range("intval", 1, 10) + # opts = { filter_exp: Aerospike::Exp.record_size } + rs = client.query(stmt) + count = 0 + rs.each do |rec| + count += 1 + end + expect(count).to eq 10 + + end + end context "for" do @@ -184,7 +198,7 @@ def query_method(exp, ops = {}) stmt = Aerospike::Statement.new(@namespace, @set) - ops[:filter_exp] = (exp) + ops[:filter_exp] = exp rs = client.query(stmt, ops) count = 0 rs.each do |rec| @@ -265,10 +279,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 15) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(16), - )), + ), } expect { client.delete(key, opts) @@ -276,10 +290,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } client.delete(key, opts) end @@ -288,10 +302,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 25) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { client.put(key, { "bin" => 26 }, opts) @@ -299,10 +313,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(25), - )), + ), } client.put(key, { "bin" => 26 }, opts) end @@ -311,10 +325,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 35) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { @@ -323,10 +337,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(35), - )), + ), } client.get(key, ["bin"], opts) end @@ -335,10 +349,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 45) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { client.exists(key, opts) @@ -346,10 +360,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(45), - )), + ), } client.exists(key, opts) end @@ -358,10 +372,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 55) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { client.add(key, { "test55" => "test" }, opts) @@ -369,10 +383,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(55), - )), + ), } client.add(key, { "test55" => "test" }, opts) end @@ -381,10 +395,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 55) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { client.prepend(key, { "test55" => "test" }, opts) @@ -392,10 +406,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(55), - )), + ), } client.prepend(key, { "test55" => "test" }, opts) end @@ -404,10 +418,10 @@ def query_method(exp, ops = {}) key = Aerospike::Key.new(@namespace, @set, 65) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(15), - )), + ), } expect { client.touch(key, opts) @@ -415,10 +429,10 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(65), - )), + ), } client.touch(key, opts) end @@ -426,10 +440,10 @@ def query_method(exp, ops = {}) it "should Scan" do opts = { fail_on_filtered_out: true, - filter_exp: (Exp.eq( + filter_exp: Exp.eq( Exp.int_bin("bin"), Exp.int_val(75), - )), + ), } rs = client.scan_all(@namespace, @set, nil, opts) @@ -606,7 +620,7 @@ def query_method(exp, ops = {}) it "#{title} should work" do opts = { fail_on_filtered_out: true, - filter_exp: (exp), + filter_exp: exp, } expect { @@ -617,7 +631,7 @@ def query_method(exp, ops = {}) opts = { fail_on_filtered_out: true, - filter_exp: (Exp.not(exp)), + filter_exp: Exp.not(exp), } if reverse_exp r = client.get(exp_key, nil, opts) client.get(key) diff --git a/spec/aerospike/query_blob_spec.rb b/spec/aerospike/query_blob_spec.rb new file mode 100644 index 00000000..339805ce --- /dev/null +++ b/spec/aerospike/query_blob_spec.rb @@ -0,0 +1,89 @@ +# encoding: utf-8 +# Copyright 2014-2023 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +require "aerospike/query/statement" + +describe 'TestQueryBlob' do + before(:all) do + @index_name = 'qbindex' + @bin_name = 'bb' + @index_name_list = 'qblist' + @bin_name_list = 'bblist' + @size = 5 + @namespace = 'test' + @set = 'query-blob' + + Support.client.drop_index(@namespace, @set, @index_name) + Support.client.drop_index(@namespace, @set, @index_name_list) + Support.client.create_index(@namespace, @set, @index_name, @bin_name, :blob) + Support.client.create_index(@namespace, @set, @index_name_list, @bin_name_list, :blob, :list) + + (1..@size).each do |i| + bytes = bytes_to_str([0b00000001, 0b01000010]) + blob = Aerospike::BytesValue.new(bytes) + blob_list = [blob] + + key = Aerospike::Key.new(@namespace, @set, i) + bin = Aerospike::Bin.new(@bin_name, blob) + bin_list = Aerospike::Bin.new(@bin_name_list, blob_list) + Support.client.put(key, [bin, bin_list]) + end + end + + def bytes_to_str(bytes) + bytes.pack("C*").force_encoding("binary") + end + + it 'should query blob' do + bytes = bytes_to_str([0b00000001, 0b01000010]) + blob = Aerospike::BytesValue.new(bytes) + + stmt = Aerospike::Statement.new('test', 'query-blob', ['bb']) + stmt.filters << Aerospike::Filter.Equal('bb', blob) + rs = Support.client.query(stmt) + + begin + count = 0 + + rs.each do |record| + result = Aerospike::BytesValue.new(record.bins['bb']) + expect(result.to_bytes).to eq(blob.to_bytes) + count += 1 + end + expect(count).not_to eq(0) + end + end + + it 'should query blob in list' do + bytes = bytes_to_str([0b00000001, 0b01000010]) + blob = Aerospike::BytesValue.new(bytes) + + stmt = Aerospike::Statement.new('test', 'query-blob', ['bblist']) + stmt.filters << Aerospike::Filter.Contains('bblist', blob, :list) + rs = Support.client.query(stmt) + + begin + count = 0 + + rs.each do |record| + result_list = record.bins['bblist'] + expect(result_list.size).to eq(1) + count += 1 + end + expect(count).not_to eq(0) + end + end +end \ No newline at end of file diff --git a/spec/aerospike/query_spec.rb b/spec/aerospike/query_spec.rb index 52d7b8b5..153ca993 100644 --- a/spec/aerospike/query_spec.rb +++ b/spec/aerospike/query_spec.rb @@ -182,7 +182,7 @@ end expect(i).to eq record_count - expect((Time.now - tm).to_i).to be >= 1 + expect((Time.now - tm).to_i).to be >= 0 end # it