Skip to content

Commit

Permalink
Merge pull request #82 from briandunn/example/catch-cld
Browse files Browse the repository at this point in the history
Fix: flatware hangs if a worker dies
  • Loading branch information
briandunn authored Mar 20, 2024
2 parents 0868585 + dab34e2 commit 85be8f5
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 41 deletions.
26 changes: 3 additions & 23 deletions lib/flatware/sink.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'drb/drb'
require 'set'
require 'flatware/sink/signal'

module Flatware
module Sink
Expand Down Expand Up @@ -31,7 +32,7 @@ def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
end

def start
trap_interrupt
@signal = Signal.listen(&method(:summarize_remaining))
formatter.jobs jobs
DRb.start_service(sink, self, verbose: Flatware.verbose?)
DRb.thread.join
Expand Down Expand Up @@ -72,29 +73,8 @@ def respond_to_missing?(name, include_all)

private

def trap_interrupt
Thread.main[:signals] = Queue.new

Thread.new(&method(:handle_interrupt))

trap 'INT' do
Thread.main[:signals] << :int
end
end

def handle_interrupt
Thread.main[:signals].pop
puts 'Interrupted!'
summarize_remaining
puts "\n\nCleaning up. Please wait...\n"
Process.waitall
puts 'done.'
abort
end

def interruped?
signals = Thread.main[:signals]
signals && !signals.empty?
@signal&.interruped?
end

def check_finished!
Expand Down
51 changes: 51 additions & 0 deletions lib/flatware/sink/signal.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module Flatware
module Sink
class Signal
def initialize(&on_interrupt)
Thread.main[:signals] = Queue.new

@on_interrupt = on_interrupt
end

def interruped?
!signals.empty?
end

def listen
Thread.new(&method(:handle_signals))

::Signal.trap('INT') { signals << :int }
::Signal.trap('CLD') { signals << :cld }

self
end

def self.listen(&block)
new(&block).listen
end

private

def handle_signals
puts signal_message(signals.pop)
Process.waitall
@on_interrupt.call
puts 'done.'
abort
end

def signal_message(signal)
format(<<~MESSAGE, { cld: 'A worker died', int: 'Interrupted' }.fetch(signal))
%s!
Cleaning up. Please wait...
MESSAGE
end

def signals
Thread.main[:signals]
end
end
end
end
70 changes: 52 additions & 18 deletions spec/flatware/sink_spec.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
require 'spec_helper'
require 'drb'
require 'timeout'

describe Flatware::Sink do
let(:sink_endpoint) do
let!(:sink_endpoint) do
server = TCPServer.new('127.0.0.1', 0)
port = server.addr[1]
server.close
"druby://localhost:#{port}"
"druby://0.0.0.0:#{port}"
end

let! :formatter do
double(
'Formatter',
ready: nil,
summarize: nil,
finished: nil,
jobs: nil,
progress: nil,
finished: nil,
ready: nil,
summarize: nil,
summarize_remaining: nil
)
).tap do |formatter|
allow(formatter).to receive(:summarize_remaining, &method(:puts))
end
end

let :defaults do
Expand All @@ -28,21 +31,52 @@
}
end

context 'when I have work to do, but am interupted' do
it 'exits' do
job = double 'job', id: 'int.feature'
def connect
Timeout.timeout(2) do
sleep 0.1
DRbObject.new_with_uri(sink_endpoint)
rescue DRb::DRbConnError
retry
end
end

def fork_server(&block)
IO.popen('-') do |f|
if f
connect.ready(1)
block.call(f)
Process.waitall
else
described_class.start_server(**defaults, jobs: [job])
end
end
end

context 'when I have work to do' do
let(:job) { Flatware::Job.new('int.feature') }

context 'but a worker dies' do
it 'explains and exits non-zero' do
fork_server do |server|
Process.kill 'CLD', server.pid

expect(server.read).to match(/A worker died/).and(match(/int\.feature/))

Process.wait(server.pid)
expect(Process.last_status).to_not be_success
end
end
end

IO.popen('-') do |f|
if f
sleep 1
Process.kill 'INT', f.pid
context 'but am interupted' do
it 'explains and exits non-zero' do
fork_server do |server|
Process.kill 'INT', server.pid

expect(f.read).to match(/Interrupted/)
expect(server.read).to match(/Interrupted/).and(match(/int\.feature/))

child_pids = Flatware.pids { |cpid| cpid.ppid == Process.pid }
expect(child_pids).to_not include f.pid
else
described_class.start_server(**defaults, jobs: [job])
Process.wait(server.pid)
expect(Process.last_status).to_not be_success
end
end
end
Expand Down

0 comments on commit 85be8f5

Please sign in to comment.