Skip to content

Commit

Permalink
updated bunny to the latest version
Browse files Browse the repository at this point in the history
The tests are green, but we need to study the interplay between the
various new timeout options in the multi threaded bunny implementation.

This is still WIP!!!
  • Loading branch information
skaes committed Aug 14, 2022
1 parent 7034bf0 commit 3dea802
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 102 deletions.
2 changes: 1 addition & 1 deletion beetle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |s|
}

s.specification_version = 3
s.add_runtime_dependency "bunny", "~> 0.7.12"
s.add_runtime_dependency "bunny", "~> 2.19.0"
s.add_runtime_dependency "redis", ">= 4.2.1"
s.add_runtime_dependency "hiredis", ">= 0.4.5"
s.add_runtime_dependency "amq-protocol", "= 2.3.2"
Expand Down
6 changes: 5 additions & 1 deletion features/step_definitions/redis_auto_failover_steps.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@
response = client.rpc(:echo, 'echo')
t2 = Time.now
# puts "OK,#{redis_master} =?= #{response.join(',')} after #{t2-t1}, attempt #{i+1}"
break if expected_response == response
if expected_response == response
client.stop_publishing
break
end
sleep 0.1
end
assert_equal expected_response, response
end
Expand Down
6 changes: 4 additions & 2 deletions features/support/beetle_handler
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d
end
config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message|
begin
Beetle.config.logger.info "received beetle policy update message': #{message.data}"
Beetle.config.logger.info "Received beetle policy update message': #{message.data}"
client.update_queue_properties!(JSON.parse(message.data))
rescue => e
Beetle.config.logger.error("#{e}:#{e.backtrace.join("\n")}")
end
end
end
Beetle.config.logger.info "Starting beetle handler for system: #{Beetle.config.system_name}"
client.listen do
puts "Started beetle handler for system: #{Beetle.config.system_name}"
trap("TERM"){ client.stop_listening }
end
Beetle.config.logger.info "Terminated beetle handler for system: #{Beetle.config.system_name}"
end
26 changes: 15 additions & 11 deletions features/support/system_notification_logger
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@ require "rubygems"
require "daemons"
require "eventmachine"
require "websocket-eventmachine-client"
require File.expand_path("../../lib/beetle", File.dirname(__FILE__))

tmp_path = File.expand_path("../../tmp", File.dirname(__FILE__))
system_notification_log_file_path = "#{tmp_path}/system_notifications.log"

DEBUG = false

Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => :normal, :dir => tmp_path) do
Beetle.config.servers = "127.0.0.1:5672" # rabbitmq

# set Beetle log level to info, less noisy than debug
Beetle.config.logger.level = Logger::DEBUG

log_file = File.open(system_notification_log_file_path, "a+")
log_file.sync = true

Expand All @@ -26,23 +20,33 @@ Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode =>
ws.unbind
end

puts "Starting system notification logger: #{Process.pid}"

ws_uri = 'ws://127.0.0.1:9650/notifications'
@connected = false

while !@interrupted
sleep 0.1
puts "Trying to connect to notification socket: #{ws_uri}" if DEBUG
EventMachine.run do
ws = WebSocket::EventMachine::Client.connect(:uri => 'ws://127.0.0.1:9650/notifications')
ws = WebSocket::EventMachine::Client.connect(:uri => ws_uri)
ws.onopen do
puts "established connection" if DEBUG
@connected = true
puts "Established connection to notifications server"
end
ws.onclose do
puts "server closed connection" if DEBUG && !@interrupted
puts "Notifications server closed connection" if DEBUG && @connected
@connected = false
EM.add_timer(0){ EM.stop_event_loop }
end
ws.onmessage do |text|
puts "writing message to #{system_notification_log_file_path}: #{text}"
puts "Writing message to #{system_notification_log_file_path}: #{text}"
log_file << (text + "\n")
end
puts "Started system notification logger"
trap("INT") { shutdown(ws) }
trap("TERM") { shutdown(ws) }
end
end

puts "Terminated system notification logger: #{Process.pid}"
end
1 change: 0 additions & 1 deletion lib/beetle.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
$:.unshift(File.expand_path('..', __FILE__))
require 'bunny' # which bunny picks up
require 'qrack/errors' # needed by the publisher
require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs
require 'redis'
require 'active_support/all'
Expand Down
9 changes: 6 additions & 3 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class Configuration
# to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug
# in bunny.
attr_accessor :channel_max
# the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0).
attr_accessor :heartbeat

# Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards
# compatibility, they are disabled by default.
Expand Down Expand Up @@ -115,8 +117,8 @@ class Configuration
# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
# The socket timeout in seconds for message publishing (defaults to <tt>15</tt>).
# Consider this a highly experimental feature for now.
attr_accessor :publishing_timeout

# the connect/disconnect timeout in seconds for the publishing connection
Expand Down Expand Up @@ -176,6 +178,7 @@ def initialize #:nodoc:
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
self.heartbeat = 0

self.dead_lettering_enabled = false
self.dead_lettering_msg_ttl = 1000 # 1 second
Expand All @@ -187,7 +190,7 @@ def initialize #:nodoc:

self.update_queue_properties_synchronously = false

self.publishing_timeout = 0
self.publishing_timeout = 15
self.publisher_connect_timeout = 5 # seconds
self.tmpdir = "/tmp"

Expand Down
75 changes: 45 additions & 30 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc:
@exchanges_with_bound_queues = {}
@dead_servers = {}
@bunnies = {}
@channels = {}
@throttling_options = {}
@next_throttle_refresh = Time.now
@throttled = false
Expand All @@ -27,14 +28,10 @@ def throttling_status
@throttled ? 'throttled' : 'unthrottled'
end

# list of exceptions potentially raised by bunny
# these need to be lazy, because qrack exceptions are only defined after a connection has been established
# List of exceptions potentially raised by bunny.
def bunny_exceptions
[
Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError,
Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError,
Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError,
Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
]
end

Expand All @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc:
recycle_dead_servers unless @dead_servers.empty?
throttle!
if opts[:redundant]
publish_with_redundancy(exchange_name, message_name, data, opts)
publish_with_redundancy(exchange_name, message_name, data.to_s, opts)
else
publish_with_failover(exchange_name, message_name, data, opts)
publish_with_failover(exchange_name, message_name, data.to_s, opts)
end
end
end
Expand Down Expand Up @@ -120,6 +117,7 @@ def publish_with_redundancy(exchange_name, message_name, data, opts) #:nodoc:

def rpc(message_name, data, opts={}) #:nodoc:
opts = @client.messages[message_name].merge(opts.symbolize_keys)
timeout = opts.delete(:timeout) || RPC_DEFAULT_TIMEOUT
exchange_name = opts.delete(:exchange)
opts.delete(:queue)
recycle_dead_servers unless @dead_servers.empty?
Expand All @@ -131,17 +129,21 @@ def rpc(message_name, data, opts={}) #:nodoc:
select_next_server
bind_queues_for_exchange(exchange_name)
# create non durable, autodeleted temporary queue with a server assigned name
queue = bunny.queue
queue = channel.queue("", :durable => false, :auto_delete => true)
opts = Message.publishing_options(opts.merge :reply_to => queue.name)
logger.debug "Beetle: trying to send #{message_name}:#{opts[:message_id]} to #{@server}"
exchange(exchange_name).publish(data, opts)
logger.debug "Beetle: message sent!"
logger.debug "Beetle: listening on reply queue #{queue.name}"
queue.subscribe(:message_max => 1, :timeout => opts[:timeout] || RPC_DEFAULT_TIMEOUT) do |msg|
q = Queue.new
consumer = queue.subscribe do |info, properties, payload|
logger.debug "Beetle: received reply!"
result = msg[:payload]
status = msg[:header].properties[:headers][:status]
result = payload
q.push properties[:headers]["status"]
end
Timeout.timeout(timeout) do
status = q.pop
end
consumer.cancel
logger.debug "Beetle: rpc complete!"
rescue *bunny_exceptions => e
stop!(e)
Expand Down Expand Up @@ -194,26 +196,38 @@ def bunny
end

def bunny?
@bunnies[@server]
!!@bunnies[@server]
end

def new_bunny
b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
:pass => @client.config.password,
:vhost => @client.config.vhost,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:host => current_host,
:port => current_port,
:logger => @client.config.logger,
:username => @client.config.user,
:password => @client.config.password,
:vhost => @client.config.vhost,
:automatically_recover => false,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:read_timeout => @client.config.publishing_timeout,
:write_timeout => @client.config.publishing_timeout,
:continuation_timeout => @client.config.publishing_timeout,
:connection_timeout => @client.config.publisher_connect_timeout,
:heartbeat => @client.config.heartbeat,
)
b.start
b
end

def channel
@channels[@server] ||= bunny.create_channel
end

def channel?
!!@channels[@server]
end

# retry dead servers after ignoring them for 10.seconds
# if all servers are dead, retry the one which has been dead for the longest time
def recycle_dead_servers
Expand Down Expand Up @@ -244,7 +258,7 @@ def select_next_server
end

def create_exchange!(name, opts)
bunny.exchange(name, opts)
channel.exchange(name, opts)
end

def bind_queues_for_exchange(exchange_name)
Expand All @@ -255,9 +269,8 @@ def bind_queues_for_exchange(exchange_name)

def declare_queue!(queue_name, creation_options)
logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}")
queue = bunny.queue(queue_name, creation_options)

policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options)
queue = channel.queue(queue_name, creation_options)
policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options)
publish_policy_options(policy_options)
queue
end
Expand All @@ -272,8 +285,9 @@ def stop!(exception=nil)
Beetle::Timer.timeout(timeout) do
logger.debug "Beetle: closing connection from publisher to #{server}"
if exception
bunny.__send__ :close_socket
bunny.__send__ :close_connection
else
channel.close if channel?
bunny.stop
end
end
Expand All @@ -282,6 +296,7 @@ def stop!(exception=nil)
Beetle::reraise_expectation_errors!
ensure
@bunnies[@server] = nil
@channels[@server] = nil
@exchanges[@server] = {}
@queues[@server] = {}
end
Expand Down
4 changes: 2 additions & 2 deletions test/beetle/amqp_gem_behavior_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test
EM::Timer.new(1){ connection.close { EM.stop }}
channel = AMQP::Channel.new(connection)
channel.on_error { puts "woot"}
exchange = channel.topic("beetle_tests")
queue = AMQP::Queue.new(channel)
exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true)
queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true)
queue.bind(exchange, :key => "#")
queue.subscribe { }
queue.subscribe { }
Expand Down
2 changes: 1 addition & 1 deletion test/beetle/beetle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Beetle
class HostnameTest < Minitest::Test
test "should use canonical name if possible " do
test "should use canonical name if possible " do
addr = mock("addr")
addr.expects(:canonname).returns("a.b.com")
Socket.expects(:gethostname).returns("a.b.com")
Expand Down
Loading

0 comments on commit 3dea802

Please sign in to comment.