From 34edb0190b518f0ba179ab0435e4e904f3ab9d31 Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Tue, 22 Oct 2024 20:44:51 +0200 Subject: [PATCH] [CLIENT-2826] Support QueryDuration enum in QueryPolicy --- CHANGELOG.md | 1 + lib/aerospike.rb | 1 + lib/aerospike/command/batch_index_command.rb | 2 +- lib/aerospike/command/command.rb | 23 ++++++--- lib/aerospike/policy/query_duration.rb | 48 +++++++++++++++++ lib/aerospike/policy/query_policy.rb | 21 +++++--- spec/aerospike/query_spec.rb | 54 ++++++++++++-------- 7 files changed, 112 insertions(+), 38 deletions(-) create mode 100644 lib/aerospike/policy/query_duration.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 284981b1..900719d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file. - **New Features** - [CLIENT-2833] Support `Policy#ReadTouchTtlPercent`. + - [CLIENT-2826] Support `QueryDuration` in `QueryPolicy#ExpectedDuration`. - **Fixes** - [CLIENT-3144] Various fixes. PR #132 and #133 Thanks to [Igor Pstyga](https://github.com/opti) diff --git a/lib/aerospike.rb b/lib/aerospike.rb index b6cc426c..de009a5f 100644 --- a/lib/aerospike.rb +++ b/lib/aerospike.rb @@ -90,6 +90,7 @@ require "aerospike/geo_json" require "aerospike/ttl" +require "aerospike/policy/query_duration" require "aerospike/policy/client_policy" require "aerospike/policy/priority" require "aerospike/policy/record_exists_action" diff --git a/lib/aerospike/command/batch_index_command.rb b/lib/aerospike/command/batch_index_command.rb index 495e52f5..8a6a8f45 100644 --- a/lib/aerospike/command/batch_index_command.rb +++ b/lib/aerospike/command/batch_index_command.rb @@ -62,7 +62,7 @@ def write_buffer end end size_buffer - write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0) + write_header_read(policy, read_attr | INFO1_BATCH, 0, 0, field_count, 0) write_filter_exp(@policy.filter_exp, exp_size) diff --git a/lib/aerospike/command/command.rb b/lib/aerospike/command/command.rb index 785a4f69..7b3095cf 100644 --- a/lib/aerospike/command/command.rb +++ b/lib/aerospike/command/command.rb @@ -58,7 +58,8 @@ module Aerospike INFO2_DURABLE_DELETE = Integer(1 << 4) # Create only. Fail if record already exists. INFO2_CREATE_ONLY = Integer(1 << 5) - + # Treat as long query, but relax read consistency. + INFO2_RELAX_AP_LONG_QUERY = (1 << 6) # Return a result for every operation. INFO2_RESPOND_ALL_OPS = Integer(1 << 7) @@ -195,7 +196,7 @@ def set_read_for_key_only(policy, key) field_count += 1 if exp_size > 0 size_buffer - write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, field_count, 0) + write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, 0, field_count, 0) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) end_cmd @@ -220,7 +221,7 @@ def set_read(policy, key, bin_names) attr |= INFO1_GET_ALL end - write_header_read(policy, attr, 0, field_count, bin_names.length) + write_header_read(policy, attr, 0, 0, field_count, bin_names.length) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) @@ -377,7 +378,7 @@ def set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions) operation_count = bin_names.length end - write_header_read(policy, read_attr, info_attr, field_count, operation_count) + write_header_read(policy, read_attr, 0, info_attr, field_count, operation_count) if namespace write_field_string(namespace, Aerospike::FieldType::NAMESPACE) @@ -591,10 +592,16 @@ def set_query(cluster, policy, statement, background, node_partitions) write_header_write(policy, INFO2_WRITE, field_count, operation_count) else read_attr = INFO1_READ + write_attr = 0 + read_attr |= INFO1_NOBINDATA unless policy.include_bin_data - read_attr |= INFO1_SHORT_QUERY if policy.short_query + if policy.short_query || policy.expected_duration == QueryDuration::SHORT + read_attr |= INFO1_SHORT_QUERY + elsif policy.expected_duration == QueryDuration::LONG_RELAX_AP + write_attr |= INFO2_RELAX_AP_LONG_QUERY + end info_attr = INFO3_PARTITION_DONE if is_new - write_header_read(policy, read_attr, info_attr, field_count, operation_count) + write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count) end @@ -960,13 +967,13 @@ def write_header_read_write(policy, args, field_count) @data_offset = MSG_TOTAL_HEADER_SIZE end - def write_header_read(policy, read_attr, info_attr, field_count, operation_count) + def write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count) read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression #TODO: Add SC Mode @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length. @data_buffer.write_byte(read_attr, 9) - @data_buffer.write_byte(0, 10) + @data_buffer.write_byte(write_attr, 10) @data_buffer.write_byte(info_attr, 11) (12...18).each { |i| @data_buffer.write_byte(0, i) } diff --git a/lib/aerospike/policy/query_duration.rb b/lib/aerospike/policy/query_duration.rb new file mode 100644 index 00000000..63704bdc --- /dev/null +++ b/lib/aerospike/policy/query_duration.rb @@ -0,0 +1,48 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Aerospike + + # Defines the expected query duration. The server treats the query in different ways depending on the expected duration. + # This enum is ignored for aggregation queries, background queries and server versions < 6.0. + module QueryDuration + + # The query is expected to return more than 100 records per node. The server optimizes for a large record set in + # the following ways: + # + # Allow query to be run in multiple threads using the server's query threading configuration. + # Do not relax read consistency for AP namespaces. + # Add the query to the server's query monitor. + # Do not add the overall latency to the server's latency histogram. + # Do not allow server timeouts. + LONG = 0 + + # The query is expected to return less than 100 records per node. The server optimizes for a small record set in + # the following ways: + # Always run the query in one thread and ignore the server's query threading configuration. + # Allow query to be inlined directly on the server's service thread. + # Relax read consistency for AP namespaces. + # Do not add the query to the server's query monitor. + # Add the overall latency to the server's latency histogram. + # Allow server timeouts. The default server timeout for a short query is 1 second. + SHORT = 1 + + # Treat query as a LONG query, but relax read consistency for AP namespaces. + # This value is treated exactly like LONG for server versions < 7.1. + LONG_RELAX_AP = 2 + + end # module + +end # module diff --git a/lib/aerospike/policy/query_policy.rb b/lib/aerospike/policy/query_policy.rb index 787b3b13..3645ce32 100644 --- a/lib/aerospike/policy/query_policy.rb +++ b/lib/aerospike/policy/query_policy.rb @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations under # the License. +require 'aerospike/policy/query_duration' require 'aerospike/policy/policy' module Aerospike @@ -22,16 +23,10 @@ module Aerospike # Container object for query policy command. class QueryPolicy < Policy - attr_accessor :concurrent_nodes - attr_accessor :max_records - attr_accessor :include_bin_data - attr_accessor :record_queue_size - attr_accessor :records_per_second - attr_accessor :socket_timeout - attr_accessor :short_query + attr_accessor :concurrent_nodes, :max_records, :include_bin_data, :record_queue_size, :records_per_second, :socket_timeout, :short_query, :expected_duration def initialize(opt={}) - super(opt) + super # Indicates if bin data is retrieved. If false, only record digests (and # user keys if stored on the server) are retrieved. @@ -74,11 +69,21 @@ def initialize(opt={}) # Default is 0 @records_per_second = opt[:records_per_second] || 0 + # Expected query duration. The server treats the query in different ways depending on the expected duration. + # This field is ignored for aggregation queries, background queries and server versions < 6.0. + # + # Default: QueryDuration::LONG + @expected_duration = opt[:expected_duration] || QueryDuration::LONG + + # DEPRECATED # Detemine wether query expected to return less than 100 records. # If true, the server will optimize the query for a small record set. # This field is ignored for aggregation queries, background queries # and server versions 6.0+. # + # This field is deprecated and will eventually be removed. Use {expected_duration} instead. + # For backwards compatibility: If ShortQuery is true, the query is treated as a short query and + # {expected_duration} is ignored. If {short_query} is false, {expected_duration} is used as defaults to {Policy#QueryDuration#LONG}. # Default: false @short_query = opt[:short_query] ||false diff --git a/spec/aerospike/query_spec.rb b/spec/aerospike/query_spec.rb index 153ca993..bcf30928 100644 --- a/spec/aerospike/query_spec.rb +++ b/spec/aerospike/query_spec.rb @@ -57,8 +57,8 @@ bin_map = { 'bin1' => "value#{i}", 'bin2' => i, - 'bin3' => [ i, i + 1_000, i + 1_000_000 ], - 'bin4' => { "key#{i}" => i }, + 'bin3' => [i, i + 1_000, i + 1_000_000], + 'bin4' => { "key#{i}" => i } } Support.client.put(key, bin_map) end @@ -92,8 +92,20 @@ expect(count).to be > 0 end + it "returns all record bins, expected_duration: QueryDuration::SHORT", skip: !Support.min_version?("7.0.0") do + stmt = Aerospike::Statement.new(@namespace, @set) + stmt.filters << Aerospike::Filter.Equal('bin2', 1) + rs = client.query(stmt, expected_duration: Aerospike::QueryDuration::SHORT) + count = 0 + rs.each do |rec| + count += 1 + expect(rec.bins.keys).to contain_exactly("bin1", "bin2", "bin3", "bin4") + end + expect(count).to be > 0 + end + it "returns only the selected bins" do - bins = ["bin1", "bin2"] + bins = %w[bin1 bin2] stmt = Aerospike::Statement.new(@namespace, @set, bins) stmt.filters << Aerospike::Filter.Equal('bin2', 1) rs = client.query(stmt) @@ -152,7 +164,7 @@ rs = client.query(stmt) records = 0 - expect { rs.each { records += 1 } }.not_to raise_error() + expect { rs.each { records += 1 } }.not_to raise_error expect(records).to eql(0) end @@ -164,13 +176,13 @@ bin_map = { 'bin1' => "value#{i}", 'bin2' => i, - 'bin3' => [ i, i + 1_000, i + 1_000_000 ], - 'bin4' => { "key#{i}" => i }, + 'bin3' => [i, i + 1_000, i + 1_000_000], + 'bin4' => { "key#{i}" => i } } Support.client.put(key, bin_map) end - stmt = Aerospike::Statement.new(@namespace, set, ['bin1', 'bin2']) + stmt = Aerospike::Statement.new(@namespace, set, %w[bin1 bin2]) rs = client.query(stmt, :records_per_second => (@record_count / 4).to_i) i = 0 @@ -193,7 +205,7 @@ context "Numeric Bins" do it "should return relevant records" do - stmt = Aerospike::Statement.new(@namespace, @set, ['bin1', 'bin2']) + stmt = Aerospike::Statement.new(@namespace, @set, %w[bin1 bin2]) stmt.filters = [Aerospike::Filter.Equal('bin2', 1)] rs = client.query(stmt) @@ -236,7 +248,7 @@ context "List Bins" do it "should return relevant records" do - stmt = Aerospike::Statement.new(@namespace, @set, ['bin2', 'bin3']) + stmt = Aerospike::Statement.new(@namespace, @set, %w[bin2 bin3]) stmt.filters = [Aerospike::Filter.Contains('bin3', 42, :list)] rs = client.query(stmt) @@ -351,12 +363,12 @@ context "Geospatial Filter", skip: !Support.feature?(Aerospike::Features::GEO) do - let(:lon){ 103.9114 } - let(:lat){ 1.3083 } - let(:radius){ 1000 } - let(:point){ Aerospike::GeoJSON.new({type: "Point", coordinates: [lon, lat]}) } - let(:point2){ Aerospike::GeoJSON.new({type: "Point", coordinates: [lon + 1, lat + 1]}) } - let(:region){ Aerospike::GeoJSON.new({type: "Polygon", coordinates: [[[103.6055, 1.1587], [103.6055, 1.4707], [104.0884, 1.4707], [104.0884, 1.1587], [103.6055, 1.1587]]]}) } + let(:lon) { 103.9114 } + let(:lat) { 1.3083 } + let(:radius) { 1000 } + let(:point) { Aerospike::GeoJSON.new({ type: "Point", coordinates: [lon, lat] }) } + let(:point2) { Aerospike::GeoJSON.new({ type: "Point", coordinates: [lon + 1, lat + 1] }) } + let(:region) { Aerospike::GeoJSON.new({ type: "Polygon", coordinates: [[[103.6055, 1.1587], [103.6055, 1.4707], [104.0884, 1.4707], [104.0884, 1.1587], [103.6055, 1.1587]]] }) } before(:all) do tasks = [] @@ -386,7 +398,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it @@ -399,7 +411,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it @@ -412,7 +424,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it @@ -425,7 +437,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it @@ -438,7 +450,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it @@ -451,7 +463,7 @@ rs = client.query(stmt) results = [] - rs.each{|record| results << record } + rs.each { |record| results << record } expect(results.map(&:key)).to eq [key] end # it