diff --git a/beetle.gemspec b/beetle.gemspec index 69509ce4..35e9a44d 100644 --- a/beetle.gemspec +++ b/beetle.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| } s.specification_version = 3 - s.add_runtime_dependency "bunny", "~> 0.7.12" + s.add_runtime_dependency "bunny", "~> 2.19.0" s.add_runtime_dependency "redis", ">= 4.2.1" s.add_runtime_dependency "hiredis", ">= 0.4.5" s.add_runtime_dependency "amq-protocol", "= 2.3.2" diff --git a/features/step_definitions/redis_auto_failover_steps.rb b/features/step_definitions/redis_auto_failover_steps.rb index ffe36903..3b4ba3d0 100644 --- a/features/step_definitions/redis_auto_failover_steps.rb +++ b/features/step_definitions/redis_auto_failover_steps.rb @@ -165,7 +165,11 @@ response = client.rpc(:echo, 'echo') t2 = Time.now # puts "OK,#{redis_master} =?= #{response.join(',')} after #{t2-t1}, attempt #{i+1}" - break if expected_response == response + if expected_response == response + client.stop_publishing + break + end + sleep 0.1 end assert_equal expected_response, response end diff --git a/features/support/beetle_handler b/features/support/beetle_handler index eb97adac..c70e5128 100755 --- a/features/support/beetle_handler +++ b/features/support/beetle_handler @@ -35,14 +35,16 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d end config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message| begin - Beetle.config.logger.info "received beetle policy update message': #{message.data}" + Beetle.config.logger.info "Received beetle policy update message': #{message.data}" client.update_queue_properties!(JSON.parse(message.data)) rescue => e Beetle.config.logger.error("#{e}:#{e.backtrace.join("\n")}") end end end + Beetle.config.logger.info "Starting beetle handler for system: #{Beetle.config.system_name}" client.listen do - puts "Started beetle handler for system: #{Beetle.config.system_name}" + trap("TERM"){ client.stop_listening } end + Beetle.config.logger.info "Terminated beetle handler for system: #{Beetle.config.system_name}" end diff --git a/features/support/system_notification_logger b/features/support/system_notification_logger index 30e4c5db..4c1721c9 100755 --- a/features/support/system_notification_logger +++ b/features/support/system_notification_logger @@ -4,7 +4,6 @@ require "rubygems" require "daemons" require "eventmachine" require "websocket-eventmachine-client" -require File.expand_path("../../lib/beetle", File.dirname(__FILE__)) tmp_path = File.expand_path("../../tmp", File.dirname(__FILE__)) system_notification_log_file_path = "#{tmp_path}/system_notifications.log" @@ -12,11 +11,6 @@ system_notification_log_file_path = "#{tmp_path}/system_notifications.log" DEBUG = false Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => :normal, :dir => tmp_path) do - Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - - # set Beetle log level to info, less noisy than debug - Beetle.config.logger.level = Logger::DEBUG - log_file = File.open(system_notification_log_file_path, "a+") log_file.sync = true @@ -26,23 +20,33 @@ Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => ws.unbind end + puts "Starting system notification logger: #{Process.pid}" + + ws_uri = 'ws://127.0.0.1:9650/notifications' + @connected = false + while !@interrupted + sleep 0.1 + puts "Trying to connect to notification socket: #{ws_uri}" if DEBUG EventMachine.run do - ws = WebSocket::EventMachine::Client.connect(:uri => 'ws://127.0.0.1:9650/notifications') + ws = WebSocket::EventMachine::Client.connect(:uri => ws_uri) ws.onopen do - puts "established connection" if DEBUG + @connected = true + puts "Established connection to notifications server" end ws.onclose do - puts "server closed connection" if DEBUG && !@interrupted + puts "Notifications server closed connection" if DEBUG && @connected + @connected = false EM.add_timer(0){ EM.stop_event_loop } end ws.onmessage do |text| - puts "writing message to #{system_notification_log_file_path}: #{text}" + puts "Writing message to #{system_notification_log_file_path}: #{text}" log_file << (text + "\n") end - puts "Started system notification logger" trap("INT") { shutdown(ws) } trap("TERM") { shutdown(ws) } end end + + puts "Terminated system notification logger: #{Process.pid}" end diff --git a/lib/beetle.rb b/lib/beetle.rb index b6eb94e6..4d84ff13 100644 --- a/lib/beetle.rb +++ b/lib/beetle.rb @@ -1,6 +1,5 @@ $:.unshift(File.expand_path('..', __FILE__)) require 'bunny' # which bunny picks up -require 'qrack/errors' # needed by the publisher require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs require 'redis' require 'active_support/all' diff --git a/lib/beetle/configuration.rb b/lib/beetle/configuration.rb index b82db2ff..644306c9 100644 --- a/lib/beetle/configuration.rb +++ b/lib/beetle/configuration.rb @@ -81,6 +81,8 @@ class Configuration # to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug # in bunny. attr_accessor :channel_max + # the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0). + attr_accessor :heartbeat # Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards # compatibility, they are disabled by default. @@ -115,8 +117,8 @@ class Configuration # Returns the port on which the Rabbit API is hosted attr_accessor :api_port - # the socket timeout in seconds for message publishing (defaults to 0). - # consider this a highly experimental feature for now. + # The socket timeout in seconds for message publishing (defaults to 15). + # Consider this a highly experimental feature for now. attr_accessor :publishing_timeout # the connect/disconnect timeout in seconds for the publishing connection @@ -176,6 +178,7 @@ def initialize #:nodoc: self.frame_max = 131072 self.channel_max = 2047 self.prefetch_count = 1 + self.heartbeat = 0 self.dead_lettering_enabled = false self.dead_lettering_msg_ttl = 1000 # 1 second @@ -187,7 +190,7 @@ def initialize #:nodoc: self.update_queue_properties_synchronously = false - self.publishing_timeout = 0 + self.publishing_timeout = 15 self.publisher_connect_timeout = 5 # seconds self.tmpdir = "/tmp" diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index d4b4a9bd..ef93afe4 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc: @exchanges_with_bound_queues = {} @dead_servers = {} @bunnies = {} + @channels = {} @throttling_options = {} @next_throttle_refresh = Time.now @throttled = false @@ -27,14 +28,10 @@ def throttling_status @throttled ? 'throttled' : 'unthrottled' end - # list of exceptions potentially raised by bunny - # these need to be lazy, because qrack exceptions are only defined after a connection has been established + # List of exceptions potentially raised by bunny. def bunny_exceptions [ - Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError, - Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError, - Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError, - Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error + Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error ] end @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc: recycle_dead_servers unless @dead_servers.empty? throttle! if opts[:redundant] - publish_with_redundancy(exchange_name, message_name, data, opts) + publish_with_redundancy(exchange_name, message_name, data.to_s, opts) else - publish_with_failover(exchange_name, message_name, data, opts) + publish_with_failover(exchange_name, message_name, data.to_s, opts) end end end @@ -120,6 +117,7 @@ def publish_with_redundancy(exchange_name, message_name, data, opts) #:nodoc: def rpc(message_name, data, opts={}) #:nodoc: opts = @client.messages[message_name].merge(opts.symbolize_keys) + timeout = opts.delete(:timeout) || RPC_DEFAULT_TIMEOUT exchange_name = opts.delete(:exchange) opts.delete(:queue) recycle_dead_servers unless @dead_servers.empty? @@ -131,17 +129,21 @@ def rpc(message_name, data, opts={}) #:nodoc: select_next_server bind_queues_for_exchange(exchange_name) # create non durable, autodeleted temporary queue with a server assigned name - queue = bunny.queue + queue = channel.queue("", :durable => false, :auto_delete => true) opts = Message.publishing_options(opts.merge :reply_to => queue.name) logger.debug "Beetle: trying to send #{message_name}:#{opts[:message_id]} to #{@server}" exchange(exchange_name).publish(data, opts) logger.debug "Beetle: message sent!" - logger.debug "Beetle: listening on reply queue #{queue.name}" - queue.subscribe(:message_max => 1, :timeout => opts[:timeout] || RPC_DEFAULT_TIMEOUT) do |msg| + q = Queue.new + consumer = queue.subscribe do |info, properties, payload| logger.debug "Beetle: received reply!" - result = msg[:payload] - status = msg[:header].properties[:headers][:status] + result = payload + q.push properties[:headers]["status"] end + Timeout.timeout(timeout) do + status = q.pop + end + consumer.cancel logger.debug "Beetle: rpc complete!" rescue *bunny_exceptions => e stop!(e) @@ -194,26 +196,38 @@ def bunny end def bunny? - @bunnies[@server] + !!@bunnies[@server] end def new_bunny b = Bunny.new( - :host => current_host, - :port => current_port, - :logging => !!@options[:logging], - :user => @client.config.user, - :pass => @client.config.password, - :vhost => @client.config.vhost, - :frame_max => @client.config.frame_max, - :channel_max => @client.config.channel_max, - :socket_timeout => @client.config.publishing_timeout, - :connect_timeout => @client.config.publisher_connect_timeout, - :spec => '09') + :host => current_host, + :port => current_port, + :logger => @client.config.logger, + :username => @client.config.user, + :password => @client.config.password, + :vhost => @client.config.vhost, + :automatically_recover => false, + :frame_max => @client.config.frame_max, + :channel_max => @client.config.channel_max, + :read_timeout => @client.config.publishing_timeout, + :write_timeout => @client.config.publishing_timeout, + :continuation_timeout => @client.config.publishing_timeout, + :connection_timeout => @client.config.publisher_connect_timeout, + :heartbeat => @client.config.heartbeat, + ) b.start b end + def channel + @channels[@server] ||= bunny.create_channel + end + + def channel? + !!@channels[@server] + end + # retry dead servers after ignoring them for 10.seconds # if all servers are dead, retry the one which has been dead for the longest time def recycle_dead_servers @@ -244,7 +258,7 @@ def select_next_server end def create_exchange!(name, opts) - bunny.exchange(name, opts) + channel.exchange(name, opts) end def bind_queues_for_exchange(exchange_name) @@ -255,9 +269,8 @@ def bind_queues_for_exchange(exchange_name) def declare_queue!(queue_name, creation_options) logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}") - queue = bunny.queue(queue_name, creation_options) - - policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options) + queue = channel.queue(queue_name, creation_options) + policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options) publish_policy_options(policy_options) queue end @@ -272,8 +285,9 @@ def stop!(exception=nil) Beetle::Timer.timeout(timeout) do logger.debug "Beetle: closing connection from publisher to #{server}" if exception - bunny.__send__ :close_socket + bunny.__send__ :close_connection else + channel.close if channel? bunny.stop end end @@ -282,6 +296,7 @@ def stop!(exception=nil) Beetle::reraise_expectation_errors! ensure @bunnies[@server] = nil + @channels[@server] = nil @exchanges[@server] = {} @queues[@server] = {} end diff --git a/test/beetle/amqp_gem_behavior_test.rb b/test/beetle/amqp_gem_behavior_test.rb index 937c978e..fea10271 100644 --- a/test/beetle/amqp_gem_behavior_test.rb +++ b/test/beetle/amqp_gem_behavior_test.rb @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test EM::Timer.new(1){ connection.close { EM.stop }} channel = AMQP::Channel.new(connection) channel.on_error { puts "woot"} - exchange = channel.topic("beetle_tests") - queue = AMQP::Queue.new(channel) + exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true) + queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true) queue.bind(exchange, :key => "#") queue.subscribe { } queue.subscribe { } diff --git a/test/beetle/beetle_test.rb b/test/beetle/beetle_test.rb index cfe266c1..a5e1bccb 100644 --- a/test/beetle/beetle_test.rb +++ b/test/beetle/beetle_test.rb @@ -2,7 +2,7 @@ module Beetle class HostnameTest < Minitest::Test - test "should use canonical name if possible " do + test "should use canonical name if possible " do addr = mock("addr") addr.expects(:canonname).returns("a.b.com") Socket.expects(:gethostname).returns("a.b.com") diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index 80db6eee..3c7e8342 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -4,8 +4,8 @@ module Beetle class PublisherTest < Minitest::Test def setup - client = Client.new - @pub = Publisher.new(client) + @client = Client.new + @pub = Publisher.new(@client) end test "acccessing a bunny for a server which doesn't have one should create it and associate it with the server" do @@ -18,16 +18,20 @@ def setup test "new bunnies should be created using current host and port and they should be started" do m = mock("dummy") expected_bunny_options = { - :host => @pub.send(:current_host), :port => @pub.send(:current_port), - :logging => false, - :user => "guest", - :pass => "guest", + :host => @pub.send(:current_host), + :port => @pub.send(:current_port), + :logger => @client.config.logger, + :username => "guest", + :password => "guest", :vhost => "/", - :socket_timeout => 0, - :connect_timeout => 5, + :automatically_recover => false, + :read_timeout => 15, + :write_timeout => 15, + :continuation_timeout => 15, + :connection_timeout => 5, :frame_max => 131072, :channel_max => 2047, - :spec => '09' + :heartbeat => 0, } Bunny.expects(:new).with(expected_bunny_options).returns(m) m.expects(:start) @@ -55,7 +59,7 @@ def setup test "stop!(exception) should close the bunny socket if an exception is not nil" do b = mock("bunny") - b.expects(:close_socket) + b.expects(:close_connection) @pub.expects(:bunny?).returns(true) @pub.expects(:bunny).returns(b) @pub.send(:stop!, Exception.new) @@ -114,7 +118,7 @@ def setup nice_exchange = mock("nice exchange") @pub.stubs(:exchange).with("mama-exchange").returns(raising_exchange).then.returns(raising_exchange).then.returns(nice_exchange) - raising_exchange.expects(:publish).raises(Bunny::ConnectionError).twice + raising_exchange.expects(:publish).raises(Bunny::ConnectionError,'').twice nice_exchange.expects(:publish) @pub.expects(:set_current_server).twice @pub.expects(:stop!).twice @@ -143,9 +147,9 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) e.expects(:publish).in_sequence(redundant) @@ -159,13 +163,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) assert_raises Beetle::NoMessageSent do @pub.publish_with_redundancy("mama-exchange", "mama", @data, @opts) @@ -189,7 +193,7 @@ def setup e.expects(:publish).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:stop!).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) @@ -261,13 +265,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) assert_raises Beetle::NoMessageSent do @pub.publish_with_failover("mama-exchange", "mama", @data, @opts) @@ -296,8 +300,10 @@ def setup q = mock("queue") q.expects(:bind).with(:the_exchange, {:key => "haha.#"}) m = mock("Bunny") - m.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) - @pub.expects(:bunny).returns(m).twice + channel= mock("channel") + m.expects(:create_channel).returns(channel) + channel.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) + @pub.expects(:bunny).returns(m) @pub.send(:queue, "some_queue") assert_equal q, @pub.send(:queues)["some_queue"] @@ -468,10 +474,12 @@ def setup end test "accessing a given exchange should create it using the config. further access should return the created exchange" do - m = mock("Bunny") - m.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) + bunny = mock("bunny") + channel = mock("channel") + channel.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) @client.register_exchange("some_exchange", :type => :topic, :durable => true) - @pub.expects(:bunny).returns(m) + @pub.expects(:bunny).returns(bunny) + bunny.expects(:create_channel).returns(channel) ex = @pub.send(:exchange, "some_exchange") assert @pub.send(:exchanges).include?("some_exchange") ex2 = @pub.send(:exchange, "some_exchange") @@ -544,15 +552,16 @@ def setup test "stop should shut down all bunnies" do @pub.servers = ["localhost:1111", "localhost:2222"] s = sequence("shutdown") - bunny = mock("bunny") + bunny1 = mock("bunny1") + bunny2 = mock("bunny2") @pub.expects(:set_current_server).with("localhost:1111").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny1).in_sequence(s) + bunny1.expects(:stop).in_sequence(s) @pub.expects(:set_current_server).with("localhost:2222").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny2).in_sequence(s) + bunny2.expects(:stop).in_sequence(s) @pub.stop end end @@ -568,8 +577,10 @@ def setup test "rpc should return a timeout status if bunny throws an exception" do bunny = mock("bunny") + channel = mock("channel") @pub.expects(:bunny).returns(bunny) - bunny.expects(:queue).raises(Bunny::ConnectionError.new) + bunny.expects(:create_channel).returns(channel) + channel.expects(:queue).raises(Bunny::ConnectionError, '') s = sequence("rpc") @pub.expects(:select_next_server).in_sequence(s) @pub.expects(:bind_queues_for_exchange).with("some_exchange").in_sequence(s) @@ -579,28 +590,32 @@ def setup test "rpc should return a timeout status if the answer doesn't arrive in time" do bunny = mock("bunny") + @pub.expects(:bunny).returns(bunny) + channel = mock("channel") reply_queue = mock("reply_queue") exchange = mock("exchange") - @pub.expects(:bunny).returns(bunny) - bunny.expects(:queue).returns(reply_queue) + bunny.expects(:create_channel).returns(channel) + channel.expects(:queue).returns(reply_queue) reply_queue.stubs(:name).returns("reply_queue") s = sequence("rpc") @pub.expects(:select_next_server).in_sequence(s) @pub.expects(:bind_queues_for_exchange).with("some_exchange").in_sequence(s) @pub.expects(:exchange).with("some_exchange").returns(exchange).in_sequence(s) exchange.expects(:publish).in_sequence(s) - reply_queue.expects(:subscribe).with(:message_max => 1, :timeout => 10).in_sequence(s) - assert_equal "TIMEOUT", @pub.rpc("test", "hello").first + reply_queue.expects(:subscribe).in_sequence(s) + assert_equal "TIMEOUT", @pub.rpc("test", "hello", :timeout => 0.001).first end test "rpc should recover dead servers before selecting the next server" do + bunny = mock("bunny") + @pub.expects(:bunny).returns(bunny) @pub.servers << "localhost:3333" @pub.send(:mark_server_dead) - bunny = mock("bunny") reply_queue = mock("reply_queue") exchange = mock("exchange") - @pub.expects(:bunny).returns(bunny) - bunny.expects(:queue).returns(reply_queue) + channel = mock("channel") + bunny.expects(:create_channel).returns(channel) + channel.expects(:queue).returns(reply_queue) reply_queue.stubs(:name).returns("reply_queue") s = sequence("rpc") @pub.expects(:recycle_dead_servers).in_sequence(s) @@ -608,27 +623,28 @@ def setup @pub.expects(:bind_queues_for_exchange).with("some_exchange").in_sequence(s) @pub.expects(:exchange).with("some_exchange").returns(exchange).in_sequence(s) exchange.expects(:publish).in_sequence(s) - reply_queue.expects(:subscribe).with(:message_max => 1, :timeout => 10).in_sequence(s) - assert_equal "TIMEOUT", @pub.rpc("test", "hello").first - end + reply_queue.expects(:subscribe).in_sequence(s) + assert_equal "TIMEOUT", @pub.rpc("test", "hello", :timeout => 0.001).first + end test "rpc should fetch the result and the status code from the reply message" do bunny = mock("bunny") + @pub.expects(:bunny).returns(bunny) reply_queue = mock("reply_queue") exchange = mock("exchange") - @pub.expects(:bunny).returns(bunny) - bunny.expects(:queue).returns(reply_queue) + channel = mock("channel") + bunny.expects(:create_channel).returns(channel) + channel.expects(:queue).returns(reply_queue) reply_queue.stubs(:name).returns("reply_queue") s = sequence("rpc") @pub.expects(:select_next_server).in_sequence(s) @pub.expects(:bind_queues_for_exchange).with("some_exchange").in_sequence(s) @pub.expects(:exchange).with("some_exchange").returns(exchange).in_sequence(s) exchange.expects(:publish).in_sequence(s) - header = mock("header") - header.expects(:properties).returns({:headers => {:status => "OK"}}) - msg = {:payload => 1, :header => header} - reply_queue.expects(:subscribe).with(:message_max => 1, :timeout => 10).in_sequence(s).yields(msg) - assert_equal ["OK",1], @pub.rpc("test", "hello") + consumer = mock("consumer") + consumer.expects(:cancel) + reply_queue.expects(:subscribe).in_sequence(s).yields([{},{:headers => {"status" => "OK"}},'1']).returns(consumer) + assert_equal ["OK",'1'], @pub.rpc("test", "hello") end end