Skip to content

Commit

Permalink
Merge pull request #89 from briandunn/fix/cld-aint-all-bad
Browse files Browse the repository at this point in the history
Fix: don't assume SIGCLD is bad. Check child statuses.
  • Loading branch information
briandunn authored May 17, 2024
2 parents 689dea6 + f9823dd commit 1e6dc28
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 69 deletions.
2 changes: 1 addition & 1 deletion lib/flatware/rspec/formatters/console.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def summarize(checkpoints)
end

def summarize_remaining(remaining)
progress_formatter.output.puts(colorizer.wrap(<<~MESSAGE, :detail))
out.puts(colorizer.wrap(<<~MESSAGE, :detail))
The following specs weren't run:
Expand Down
12 changes: 9 additions & 3 deletions lib/flatware/sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ def initialize(jobs:, formatter:, sink:, worker_count: 0, **)
@checkpoints = []
@completed_jobs = []
@formatter = formatter
@interrupted = false
@jobs = group_jobs(jobs, worker_count).freeze
@queue = @jobs.dup
@sink = sink
@workers = Set.new(worker_count.times.to_a)
end

def start
@signal = Signal.listen(&method(:summarize_remaining))
Signal.listen(formatter, &method(:on_interrupt))
formatter.jobs jobs
DRb.start_service(sink, self, verbose: Flatware.verbose?)
DRb.thread.join
!failures?
!(failures? || interrupted?)
end

def ready(worker)
Expand Down Expand Up @@ -73,8 +74,13 @@ def respond_to_missing?(name, include_all)

private

def on_interrupt
@interrupted = true
summarize_remaining
end

def interrupted?
@signal&.interrupted?
@interrupted
end

def check_finished!
Expand Down
59 changes: 45 additions & 14 deletions lib/flatware/sink/signal.rb
Original file line number Diff line number Diff line change
@@ -1,46 +1,77 @@
module Flatware
module Sink
class Signal
def initialize(&on_interrupt)
Message = Struct.new(:message)

attr_reader :formatter

def initialize(formatter, &on_interrupt)
@formatter = formatter
Thread.main[:signals] = Queue.new

@on_interrupt = on_interrupt
end

def interrupted?
!signals.empty?
end

def listen
Thread.new(&method(:handle_signals))

::Signal.trap('INT') { signals << :int }
::Signal.trap('CLD') { signals << :cld }
::Signal.trap('CLD') do
signals << :cld if child_failed?
end

self
end

def self.listen(&block)
new(&block).listen
def self.listen(formatter, &block)
new(formatter, &block).listen
end

private

def child_status
_worker_pid, status = begin
Process.wait2(-1, Process::WNOHANG)
rescue Errno::ECHILD
[]
end
status
end

def child_statuses
statuses = []
loop do
status = child_status
return statuses unless status

statuses << status
end
end

def child_failed?
child_statuses.any? { |status| !status.success? }
end

def handle_signals
puts signal_message(signals.pop)
Process.waitall
@on_interrupt.call
puts 'done.'
signal_message(signals.pop) do
Process.waitall
@on_interrupt.call
end

abort
end

def signal_message(signal)
format(<<~MESSAGE, { cld: 'A worker died', int: 'Interrupted' }.fetch(signal))
formatter.message(Message.new(format(<<~MESSAGE, { cld: 'A worker died', int: 'Interrupted' }.fetch(signal))))
%s!
Cleaning up. Please wait...
Waiting for workers to finish their current jobs...
MESSAGE

yield

formatter.message(Message.new('done.'))
end

def signals
Expand Down
105 changes: 105 additions & 0 deletions spec/flatware/sink/signal_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
require 'spec_helper'

describe Flatware::Sink::Signal do
let(:formatter_queue) { Queue.new }

let(:formatter) do
queue = formatter_queue

Class.new do
define_method(:message, &queue.method(:push))
end.new
end

let(:signal_blocks) { {} }

let(:on_interrupt) do
-> {}.tap do |block|
allow(block).to receive(:call)
end
end

before do
allow(Process).to receive(:waitall)

allow(Signal).to receive(:trap) do |signal, &block|
signal_blocks[signal] = block
end

@subject = described_class.listen(formatter, &on_interrupt).tap do |instance|
allow(instance).to receive(:abort)
end
end

attr_reader :subject

def send_signal(signal)
signal_blocks.fetch(signal).call
end

shared_examples_for 'a signal initiated shutdown' do |expected_message|
before do
@messages = 2.times.map do
Timeout.timeout(1, StandardError, 'formatter did not receive within 1 sec') do
formatter_queue.pop.message
end
end
end

attr_reader :messages

it 'aborts' do
expect(subject).to have_received(:abort)
end

it 'tells the formatter to emit the signal message' do
expect(messages).to match([include(expected_message), 'done.'])
end

it 'calls on_interrupt' do
expect(on_interrupt).to have_received(:call)
end

it 'waits for workers' do
expect(Process).to have_received(:waitall)
end
end

describe 'on SIGINT' do
before do
send_signal('INT')
end

it_should_behave_like 'a signal initiated shutdown', 'Interrupted'
end

describe 'on SIGCLD' do
context 'when a child failed' do
before do
allow(Process).to receive(:wait2).and_return(
[nil, double(success?: true)],
[nil, double(success?: false)],
nil
)

send_signal('CLD')
end

it_should_behave_like 'a signal initiated shutdown', 'A worker died'
end

context 'when a child has not failed' do
before do
allow(Process).to receive(:wait2).and_return nil

send_signal('CLD')
end

it 'does nothing' do
expect(on_interrupt).to_not have_received(:call)
expect(subject).to_not have_received(:abort)
expect(formatter_queue).to be_empty
end
end
end
end
51 changes: 0 additions & 51 deletions spec/flatware/sink_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,57 +31,6 @@
}
end

def connect
Timeout.timeout(2) do
sleep 0.1
DRbObject.new_with_uri(sink_endpoint)
rescue DRb::DRbConnError
retry
end
end

def fork_server(&block)
IO.popen('-') do |f|
if f
connect.ready(1)
block.call(f)
Process.waitall
else
described_class.start_server(**defaults, jobs: [job])
end
end
end

context 'when I have work to do' do
let(:job) { Flatware::Job.new('int.feature') }

context 'but a worker dies' do
it 'explains and exits non-zero' do
fork_server do |server|
Process.kill 'CLD', server.pid

expect(server.read).to match(/A worker died/).and(match(/int\.feature/))

Process.wait(server.pid)
expect(Process.last_status).to_not be_success
end
end
end

context 'but am interupted' do
it 'explains and exits non-zero' do
fork_server do |server|
Process.kill 'INT', server.pid

expect(server.read).to match(/Interrupted/).and(match(/int\.feature/))

Process.wait(server.pid)
expect(Process.last_status).to_not be_success
end
end
end
end

context 'there is no work' do
it 'sumarizes' do
server = described_class::Server.new jobs: [], **defaults
Expand Down

0 comments on commit 1e6dc28

Please sign in to comment.