Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: prevent wrongly unwatching with follow_inodes #4237

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,30 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
if !@follow_inodes
need_unwatch_in_stop_watchers = true
else
# When using @follow_inodes, need this to unwatch the rotated old inode when it disappears.
# After `update_watcher` detaches an old TailWatcher, the inode is lost from the `@tails`.
# So that inode can't be contained in `removed_hash`, and can't be unwatched by `stop_watchers`.
#
# This logic may work for `@follow_inodes false` too.
# Just limiting the case to supress the impact to existing logics.
@pf&.unwatch_removed_targets(target_paths_hash)
need_unwatch_in_stop_watchers = false
end

removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
stop_watchers(removed_hash, unwatched: need_unwatch_in_stop_watchers) unless removed_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
@startup = false if @startup
end
Expand Down Expand Up @@ -502,10 +515,6 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
Expand Down
18 changes: 15 additions & 3 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ def [](target_info)
}
end

def unwatch_removed_targets(existing_targets)
@map.reject { |key, entry|
existing_targets.key?(key)
}.each_key { |key|
unwatch_key(key)
}
end

def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
unwatch_key(@follow_inodes ? target_info.ino : target_info.path)
end

def load(existing_targets = nil)
Expand Down Expand Up @@ -118,6 +124,12 @@ def try_compact

private

def unwatch_key(key)
if (entry = @map.delete(key))
entry.update_pos(UNWATCHED_POSITION)
end
end

def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries
Expand Down
32 changes: 31 additions & 1 deletion test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class IntailPositionFileTest < Test::Unit::TestCase
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}
TEST_CONTENT_INODES = {
1 => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
0 => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}

def write_data(f, content)
f.write(content)
Expand Down Expand Up @@ -221,7 +225,7 @@ def follow_inodes_block
end

sub_test_case '#unwatch' do
test 'deletes entry by path' do
test 'unwatch entry by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = File.stat(@file).ino
Expand All @@ -239,6 +243,32 @@ def follow_inodes_block

assert_not_equal p1, p2
end

test 'unwatch entries by inode' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_INODES, logger: $log)

existing_targets = TEST_CONTENT_INODES.select do |inode, target_info|
inode == 1
end
pe_to_unwatch = pf[TEST_CONTENT_INODES[0]]

pf.unwatch_removed_targets(existing_targets)

assert_equal(
{
map_keys: [TEST_CONTENT_INODES[1].ino],
unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION,
},
{
map_keys: pf.instance_variable_get(:@map).keys,
unwatched_pe_pos: pe_to_unwatch.read_pos,
}
)

unwatched_pe_retaken = pf[TEST_CONTENT_INODES[0]]
assert_not_equal pe_to_unwatch, unwatched_pe_retaken
end
end

sub_test_case 'FilePositionEntry' do
Expand Down
31 changes: 9 additions & 22 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2704,22 +2704,14 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down Expand Up @@ -2802,7 +2794,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
Expand Down Expand Up @@ -2861,8 +2854,9 @@ def test_updateTW_after_refreshTW
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# However, it is already added in `refresh_watcher`, so `update_watcher` doesn't create the new TailWatcher.
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

Expand All @@ -2886,22 +2880,15 @@ def test_updateTW_after_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down