Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client-618] max connections enforced #114

Open
wants to merge 6 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ jobs:
test:
runs-on: ${{matrix.os}}-latest
continue-on-error: ${{matrix.experimental}}

strategy:
matrix:
os:
- ubuntu

ruby:
- 2.6
- 2.7

experimental: [false]
env: [""]

include:
- os: ubuntu
ruby: head
Expand All @@ -30,14 +30,14 @@ jobs:
with:
ruby-version: ${{matrix.ruby}}
bundler-cache: true

- name: Start server
timeout-minutes: 5
env:
TERM: dumb
run:
.github/workflows/start_cluster.sh 2

- name: Run tests
timeout-minutes: 30
env:
Expand Down
6 changes: 6 additions & 0 deletions lib/aerospike/aerospike_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,11 @@ def initialize(msg=nil)
super(ResultCode::INVALID_NAMESPACE, msg)
end
end

class MaxConnectionsExceeded < Aerospike
def initialize(msg = nil)
super(ResultCode::MAX_CONNECTION_EXCEEDED, msg)
end
end
end
end
1 change: 0 additions & 1 deletion lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {})
policy = create_policy(options, Policy, default_info_policy)

node = @cluster.random_node
conn = node.get_connection(policy.timeout)

if set_name && !set_name.to_s.strip.empty?
str_cmd = "truncate:namespace=#{namespace}"
Expand Down
12 changes: 10 additions & 2 deletions lib/aerospike/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,22 @@ def update_partitions(parser)

def request_info(policy, *commands)
node = random_node
conn = node.get_connection(policy.timeout)
begin
conn = node.get_connection(policy.timeout)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
end

def request_node_info(node, policy, *commands)
conn = node.get_connection(policy.timeout)
begin
conn = node.get_connection(policy.timeout)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
Info.request(conn, *commands).tap do
node.put_connection(conn)
end
Expand Down
3 changes: 1 addition & 2 deletions lib/aerospike/command/admin_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,8 @@ def execute_command(cluster, policy)
timeout = 1
timeout = policy.timeout if policy && policy.timeout > 0

conn = node.get_connection(timeout)

begin
conn = node.get_connection(timeout)
conn.write(@data_buffer, @data_offset)
conn.read(@data_buffer, HEADER_SIZE)
node.put_connection(conn)
Expand Down
3 changes: 3 additions & 0 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ def execute
@node = get_node
@conn = @node.get_connection(@policy.timeout)
rescue => e
if e.is_a?(Aerospike::Exceptions::MaxConnectionsExceeded)
Aerospike.logger.error("Maximum connections established. No new connection can be created. #{e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue. Logging without re-raising the exception.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not re-raised exception here as there is a retry loop here

end
if @node
# Socket connection error has occurred. Decrease health and retry.
@node.decrease_health
Expand Down
7 changes: 7 additions & 0 deletions lib/aerospike/result_code.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ module ResultCode

attr_reader :code

#Maximum connections established. New connections cannot be created.
MAX_CONNECTION_EXCEEDED = -21

# One or more keys failed in a batch.
BATCH_FAILED = -20

Expand Down Expand Up @@ -299,6 +302,7 @@ module ResultCode
# Internal error.
QUERY_DUPLICATE = 215


def self.message(code)
case code
when BATCH_FAILED
Expand Down Expand Up @@ -577,6 +581,9 @@ def self.message(code)
when QUERY_DUPLICATE
"Internal query error"

when MAX_CONNECTION_EXCEEDED
"Maximum new connections exceeded."

else
"ResultCode #{code} unknown in the client. Please file a github issue."
end # case
Expand Down
7 changes: 5 additions & 2 deletions lib/aerospike/task/execute_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ def all_nodes_done?
elsif node.supports_feature?(Aerospike::Features::QUERY_SHOW)
command = cmd2
end

conn = node.get_connection(0)
begin
conn = node.get_connection(0)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
responseMap, _ = Info.request(conn, command)
node.put_connection(conn)

Expand Down
6 changes: 5 additions & 1 deletion lib/aerospike/task/index_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
response_map = Info.request(conn, command)
_, response = response_map.first
match = response.to_s.match(MATCHER)
Expand Down
6 changes: 5 additions & 1 deletion lib/aerospike/task/udf_register_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
response_map = Info.request(conn, command)
_, response = response_map.first
index = response.to_s.index("filename=#{@package_name}")
Expand Down
6 changes: 5 additions & 1 deletion lib/aerospike/task/udf_remove_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def all_nodes_done?
nodes = @cluster.nodes

nodes.each do |node|
conn = node.get_connection(1)
begin
conn = node.get_connection(1)
rescue => e
Aerospike.logger.error("Get connection failed with exception: #{e}")
khaf marked this conversation as resolved.
Show resolved Hide resolved
end
response_map = Info.request(conn, command)
_, response = response_map.first
index = response.to_s.index("filename=#{@package_name}")
Expand Down
16 changes: 12 additions & 4 deletions lib/aerospike/utils/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
module Aerospike
class ConnectionPool < Pool

attr_accessor :cluster, :host
attr_accessor :cluster, :host, :number_of_creations
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I Believe we can use a better name. :total_conns, etc.


def initialize(cluster, host)
self.cluster = cluster
self.host = host
@number_of_creations = 0
@mutex = Mutex.new
super(cluster.connection_queue_size)
end

def create
conn = nil
loop do
conn = cluster.create_connection(host)
break if conn.connected?
@mutex.synchronize do
if @number_of_creations >= @max_size
raise Aerospike::Exceptions::MaxConnectionsExceeded
else
conn = cluster.create_connection(host)
if conn.connected?
@number_of_creations += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this number decreased? You will not create new connections after max_size connections are disconnected and new connections are needed.

end
end
end
conn
end
Expand Down
19 changes: 16 additions & 3 deletions spec/aerospike/util/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

require 'rspec'
require 'aerospike'
RSpec.describe Aerospike::ConnectionPool do
let(:pool_size) { 5 }
let(:cluster) { double(connection_queue_size: pool_size) }
let(:host) { double() }
let(:instance) { described_class.new(cluster, host) }
let(:good_connection) { double(:connected? => true, :alive? => true) }
let(:connection) { double('connection') }


describe ".poll" do
context "when pool is empty" do
Expand All @@ -31,8 +34,7 @@
end

it "creates a new connection" do
connection = instance.poll()

connection = instance.poll
expect(connection).to be(good_connection)
end
end
Expand All @@ -49,6 +51,17 @@
end
end

context "enforce max connections as a hard limit" do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is good for the expected behavior, but does not consider what happens when a few connections are closed. Will the client allow new connections to be created afterwards, or will be stuck at max_conns_exceeded forever? (Which from the code, I think it does)

before do
allow(cluster).to receive(:create_connection).with(host).and_return(good_connection)
pool_size.times { instance.poll }
end

it "raise an max connection exceeded exception" do
expect { instance.poll }.to raise_aerospike_error(-21)
end
end

context "when pool contains a dead connection" do
let(:dead_connection) { spy(:connected? => true, :alive? => false) }

Expand Down