Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated bunny to the latest version #56

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ jobs:
env:
BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile

- name: Collect Docker Logs
uses: jwalton/[email protected]
if: failure()
with:
images: 'rabbitmq'

- name: Stop services
run: docker-compose down
if: always()
2 changes: 1 addition & 1 deletion beetle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |s|
}

s.specification_version = 3
s.add_runtime_dependency "bunny", "~> 0.7.13"
s.add_runtime_dependency "bunny", "~> 2.22.0"
s.add_runtime_dependency "redis", ">= 4.2.1"
s.add_runtime_dependency "hiredis", ">= 0.4.5"
s.add_runtime_dependency "amq-protocol", "= 2.3.2"
Expand Down
2 changes: 1 addition & 1 deletion cucumber.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
default: --publish-quiet
default: --publish-quiet --tags 'not @ignored' --color
12 changes: 12 additions & 0 deletions features/redis_auto_failover.feature
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,15 @@ Feature: Redis auto failover
And the redis master of "rc-client-2" should be "redis-1"
And the redis master of the beetle handler should be "redis-1"
And the role of redis server "redis-2" should be "slave"

@ignored
Scenario: Running the system for a few seconds to perform manual testing
Given a redis configuration server using redis servers "redis-1,redis-2" with clients "rc-client-1,rc-client-2" exists
And a redis configuration client "rc-client-1" using redis servers "redis-1,redis-2" exists
And a redis configuration client "rc-client-2" using redis servers "redis-1,redis-2" exists
And a beetle handler using the redis-master file from "rc-client-1" exists
And the redis master of "rc-client-1" should be "redis-1"
And the redis master of "rc-client-2" should be "redis-1"
And the redis master of the beetle handler should be "redis-1"
And the role of redis server "redis-2" should be "slave"
Then the system can run for a while without dying
2 changes: 1 addition & 1 deletion features/step_definitions/redis_auto_failover_steps.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@

Then /^the redis master of the beetle handler should be "([^\"]*)"$/ do |redis_name|
Beetle.config.servers = "127.0.0.1:5672" # rabbitmq
Beetle.config.logger.level = Logger::INFO
Beetle.config.logger.level = Logger::FATAL
redis_master = TestDaemons::Redis[redis_name].ip_with_port
response = `curl -s 127.0.0.1:10254/redis_master`.chomp
assert_equal redis_master, response
Expand Down
11 changes: 9 additions & 2 deletions features/support/beetle_handler
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,32 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d
config.message(:echo)
config.handler(:echo) do |message|
begin
Beetle.config.logger.info "Received echo request: reply_to: #{message.header.attributes[:reply_to]}"
client.deduplication_store.redis.server
rescue
master_file_content = File.read(Beetle.config.redis_server)
"no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'"
msg = "no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'"
Beetle.config.logger.error msg
msg
end
end
config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message|
begin
Beetle.config.logger.info "received beetle policy update message': #{message.data}"
Beetle.config.logger.info "Received beetle policy update message': #{message.data}"
client.update_queue_properties!(JSON.parse(message.data))
rescue => e
Beetle.config.logger.error("#{e}:#{e.backtrace.join("\n")}")
end
end
end
Beetle.config.logger.info "Starting beetle handler for system: #{Beetle.config.system_name}"
client.listen do
puts "Started beetle handler for system: #{Beetle.config.system_name}"

BeetleStatusServer.setup(client)
EM.start_server '0.0.0.0', 10254, BeetleStatusServer

trap("TERM"){ client.stop_listening }
end
Beetle.config.logger.info "Terminated beetle handler for system: #{Beetle.config.system_name}"
end
3 changes: 3 additions & 0 deletions features/support/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
extend Minitest::Assertions
end

# Executed before each scenario
Before do
cleanup_master_files
`ruby features/support/system_notification_logger start`
`ruby features/support/beetle_handler start -- --redis-master-file=#{redis_master_file('rc-client-1')}`
end

# Executed after each scenario
After do
cleanup_test_env
end
Expand Down
8 changes: 2 additions & 6 deletions features/support/system_notification_logger
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ require "daemons"
require "eventmachine"
require "em-http-server"
require "websocket-eventmachine-client"
require File.expand_path("../../lib/beetle", File.dirname(__FILE__))

tmp_path = File.expand_path("../../tmp", File.dirname(__FILE__))
system_notification_log_file_path = "#{tmp_path}/system_notifications.log"
Expand All @@ -29,11 +28,6 @@ end


Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => :normal, :dir => tmp_path) do
Beetle.config.servers = "127.0.0.1:5672" # rabbitmq

# set Beetle log level to info, less noisy than debug
Beetle.config.logger.level = Logger::DEBUG

log_file = File.open(system_notification_log_file_path, "a+")
log_file.sync = true

Expand Down Expand Up @@ -73,4 +67,6 @@ Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode =>
end

end

puts "Terminated system notification logger: #{Process.pid}"
end
5 changes: 3 additions & 2 deletions lib/beetle.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
$:.unshift(File.expand_path('..', __FILE__))
require 'bunny' # which bunny picks up
require 'qrack/errors' # needed by the publisher

begin
require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs
Expand Down Expand Up @@ -28,8 +27,10 @@ class UnknownMessage < Error; end
class UnknownQueue < Error; end
# raised when no redis master server can be found
class NoRedisMaster < Error; end
# raise when no message could be sent by the publisher
# raised when no message could be sent by the publisher
class NoMessageSent < Error; end
# logged when an RPC call timed outdated
class RPCTimedOut < Error; end

# AMQP options for exchange creation
EXCHANGE_CREATION_KEYS = [:auto_delete, :durable, :internal, :nowait, :passive]
Expand Down
9 changes: 6 additions & 3 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class Configuration
# to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug
# in bunny.
attr_accessor :channel_max
# the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0).
attr_accessor :heartbeat

# Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards
# compatibility, they are disabled by default.
Expand Down Expand Up @@ -121,8 +123,8 @@ class Configuration
# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
# The socket timeout in seconds for message publishing (defaults to <tt>15</tt>).
# Consider this a highly experimental feature for now.
attr_accessor :publishing_timeout

# the connect/disconnect timeout in seconds for the publishing connection
Expand Down Expand Up @@ -185,6 +187,7 @@ def initialize #:nodoc:
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
self.heartbeat = 0

self.dead_lettering_enabled = false
self.dead_lettering_msg_ttl = 1000 # 1 second
Expand All @@ -196,7 +199,7 @@ def initialize #:nodoc:

self.update_queue_properties_synchronously = false

self.publishing_timeout = 0
self.publishing_timeout = 15
self.publisher_connect_timeout = 5 # seconds
self.tmpdir = "/tmp"

Expand Down
62 changes: 37 additions & 25 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc:
@exchanges_with_bound_queues = {}
@dead_servers = {}
@bunnies = {}
@channels = {}
@throttling_options = {}
@next_throttle_refresh = Time.now
@throttled = false
Expand All @@ -27,14 +28,10 @@ def throttling_status
@throttled ? 'throttled' : 'unthrottled'
end

# list of exceptions potentially raised by bunny
# these need to be lazy, because qrack exceptions are only defined after a connection has been established
# List of exceptions potentially raised by bunny.
def bunny_exceptions
[
Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError,
Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError,
Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError,
Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
]
end

Expand All @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc:
recycle_dead_servers unless @dead_servers.empty?
throttle!
if opts[:redundant]
publish_with_redundancy(exchange_name, message_name, data, opts)
publish_with_redundancy(exchange_name, message_name, data.to_s, opts)
else
publish_with_failover(exchange_name, message_name, data, opts)
publish_with_failover(exchange_name, message_name, data.to_s, opts)
end
end
end
Expand Down Expand Up @@ -157,26 +154,38 @@ def bunny
end

def bunny?
@bunnies[@server]
!!@bunnies[@server]
end

def new_bunny
b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
:pass => @client.config.password,
:vhost => @client.config.vhost,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:host => current_host,
:port => current_port,
:logger => @client.config.logger,
:username => @client.config.user,
:password => @client.config.password,
:vhost => @client.config.vhost,
:automatically_recover => false,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:read_timeout => @client.config.publishing_timeout,
:write_timeout => @client.config.publishing_timeout,
:continuation_timeout => @client.config.publishing_timeout,
:connection_timeout => @client.config.publisher_connect_timeout,
:heartbeat => @client.config.heartbeat,
)
b.start
b
end

def channel
@channels[@server] ||= bunny.create_channel
end

def channel?
!!@channels[@server]
end

# retry dead servers after ignoring them for 10.seconds
# if all servers are dead, retry the one which has been dead for the longest time
def recycle_dead_servers
Expand Down Expand Up @@ -207,7 +216,7 @@ def select_next_server
end

def create_exchange!(name, opts)
bunny.exchange(name, opts)
channel.exchange(name, opts)
end

def bind_queues_for_exchange(exchange_name)
Expand All @@ -218,9 +227,8 @@ def bind_queues_for_exchange(exchange_name)

def declare_queue!(queue_name, creation_options)
logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}")
queue = bunny.queue(queue_name, creation_options)

policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options)
queue = channel.queue(queue_name, creation_options)
policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options)
publish_policy_options(policy_options)
queue
end
Expand All @@ -235,8 +243,11 @@ def stop!(exception=nil)
Beetle::Timer.timeout(timeout) do
logger.debug "Beetle: closing connection from publisher to #{server}"
if exception
bunny.__send__ :close_socket
bunny.__send__ :close_connection, false
reader_loop = bunny.__send__ :reader_loop
reader_loop.kill if reader_loop
else
channel.close if channel?
bunny.stop
end
end
Expand All @@ -245,6 +256,7 @@ def stop!(exception=nil)
Beetle::reraise_expectation_errors!
ensure
@bunnies[@server] = nil
@channels[@server] = nil
@exchanges[@server] = {}
@queues[@server] = {}
end
Expand Down
4 changes: 2 additions & 2 deletions test/beetle/amqp_gem_behavior_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test
EM::Timer.new(1){ connection.close { EM.stop }}
channel = AMQP::Channel.new(connection)
channel.on_error { puts "woot"}
exchange = channel.topic("beetle_tests")
queue = AMQP::Queue.new(channel)
exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true)
queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true)
queue.bind(exchange, :key => "#")
queue.subscribe { }
queue.subscribe { }
Expand Down
2 changes: 1 addition & 1 deletion test/beetle/beetle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Beetle
class HostnameTest < Minitest::Test
test "should use canonical name if possible " do
test "should use canonical name if possible " do
addr = mock("addr")
addr.expects(:canonname).returns("a.b.com")
Socket.expects(:gethostname).returns("a.b.com")
Expand Down
Loading
Loading