diff --git a/lib/flatware/sink.rb b/lib/flatware/sink.rb index e9b20ef..73073b0 100644 --- a/lib/flatware/sink.rb +++ b/lib/flatware/sink.rb @@ -1,5 +1,6 @@ require 'drb/drb' require 'set' +require 'flatware/sink/signal' module Flatware module Sink @@ -31,7 +32,7 @@ def initialize(jobs:, formatter:, sink:, worker_count: 0, **) end def start - trap_interrupt + @signal = Signal.listen(&method(:summarize_remaining)) formatter.jobs jobs DRb.start_service(sink, self, verbose: Flatware.verbose?) DRb.thread.join @@ -72,29 +73,8 @@ def respond_to_missing?(name, include_all) private - def trap_interrupt - Thread.main[:signals] = Queue.new - - Thread.new(&method(:handle_interrupt)) - - trap 'INT' do - Thread.main[:signals] << :int - end - end - - def handle_interrupt - Thread.main[:signals].pop - puts 'Interrupted!' - summarize_remaining - puts "\n\nCleaning up. Please wait...\n" - Process.waitall - puts 'done.' - abort - end - def interruped? - signals = Thread.main[:signals] - signals && !signals.empty? + @signal&.interruped? end def check_finished! diff --git a/lib/flatware/sink/signal.rb b/lib/flatware/sink/signal.rb new file mode 100644 index 0000000..c6621eb --- /dev/null +++ b/lib/flatware/sink/signal.rb @@ -0,0 +1,51 @@ +module Flatware + module Sink + class Signal + def initialize(&on_interrupt) + Thread.main[:signals] = Queue.new + + @on_interrupt = on_interrupt + end + + def interruped? + !signals.empty? + end + + def listen + Thread.new(&method(:handle_signals)) + + ::Signal.trap('INT') { signals << :int } + ::Signal.trap('CLD') { signals << :cld } + + self + end + + def self.listen(&block) + new(&block).listen + end + + private + + def handle_signals + puts signal_message(signals.pop) + Process.waitall + @on_interrupt.call + puts 'done.' + abort + end + + def signal_message(signal) + format(<<~MESSAGE, { cld: 'A worker died', int: 'Interrupted' }.fetch(signal)) + + %s! + + Cleaning up. Please wait... + MESSAGE + end + + def signals + Thread.main[:signals] + end + end + end +end diff --git a/spec/flatware/sink_spec.rb b/spec/flatware/sink_spec.rb index 36a8287..db27109 100644 --- a/spec/flatware/sink_spec.rb +++ b/spec/flatware/sink_spec.rb @@ -1,24 +1,27 @@ require 'spec_helper' require 'drb' +require 'timeout' describe Flatware::Sink do - let(:sink_endpoint) do + let!(:sink_endpoint) do server = TCPServer.new('127.0.0.1', 0) port = server.addr[1] server.close - "druby://localhost:#{port}" + "druby://0.0.0.0:#{port}" end let! :formatter do double( 'Formatter', - ready: nil, - summarize: nil, + finished: nil, jobs: nil, progress: nil, - finished: nil, + ready: nil, + summarize: nil, summarize_remaining: nil - ) + ).tap do |formatter| + allow(formatter).to receive(:summarize_remaining, &method(:puts)) + end end let :defaults do @@ -28,21 +31,52 @@ } end - context 'when I have work to do, but am interupted' do - it 'exits' do - job = double 'job', id: 'int.feature' + def connect + Timeout.timeout(2) do + sleep 0.1 + DRbObject.new_with_uri(sink_endpoint) + rescue DRb::DRbConnError + retry + end + end + + def fork_server(&block) + IO.popen('-') do |f| + if f + connect.ready(1) + block.call(f) + Process.waitall + else + described_class.start_server(**defaults, jobs: [job]) + end + end + end + + context 'when I have work to do' do + let(:job) { Flatware::Job.new('int.feature') } + + context 'but a worker dies' do + it 'explains and exits non-zero' do + fork_server do |server| + Process.kill 'CLD', server.pid + + expect(server.read).to match(/A worker died/).and(match(/int\.feature/)) + + Process.wait(server.pid) + expect(Process.last_status).to_not be_success + end + end + end - IO.popen('-') do |f| - if f - sleep 1 - Process.kill 'INT', f.pid + context 'but am interupted' do + it 'explains and exits non-zero' do + fork_server do |server| + Process.kill 'INT', server.pid - expect(f.read).to match(/Interrupted/) + expect(server.read).to match(/Interrupted/).and(match(/int\.feature/)) - child_pids = Flatware.pids { |cpid| cpid.ppid == Process.pid } - expect(child_pids).to_not include f.pid - else - described_class.start_server(**defaults, jobs: [job]) + Process.wait(server.pid) + expect(Process.last_status).to_not be_success end end end