Skip to content

Commit

Permalink
in_tail: prevent wrongly unwatching with follow_inodes
Browse files Browse the repository at this point in the history
We must not unwatch targets that still exist.
If unwatching an existing target, it causes log duplication.

In the existing implementation, `update_watcher` needs to unwatch
the old TailWatcher since `refresh_watcher` can't unwatch it when
`update_watcher` is called first.
It is because `update_watcher` discards the old TailWatcher and
`refresh_watcher` can't recognize the old inode is disappeared.

However, it can wrongly unwatch an existing inode because the
old inode may still exist.
(See the diff of test cases.)

Thus, we need a new mechanism to correctly unwatch targets when
follow_inodes.

This fix is based on the idea that we should unwatch based on
directly on PositionFile's data.

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Takuro Ashie <[email protected]>
  • Loading branch information
daipom and ashie committed Jul 12, 2023
1 parent e120693 commit 51848da
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 33 deletions.
23 changes: 16 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,30 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
if !@follow_inodes
need_unwatch_in_stop_watchers = true
else
# When using @follow_inodes, need this to unwatch the rotated old inode when it disappears.
# After `update_watcher` detaches an old TailWatcher, the inode is lost from the `@tails`.
# So that inode can't be contained in `removed_hash`, and can't be unwatched by `stop_watchers`.
#
# This logic may work for `@follow_inodes false` too.
# Just limiting the case to supress the impact to existing logics.
@pf&.unwatch_removed_targets(target_paths_hash)
need_unwatch_in_stop_watchers = false
end

removed_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
stop_watchers(removed_hash, unwatched: need_unwatch_in_stop_watchers) unless removed_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
@startup = false if @startup
end
Expand Down Expand Up @@ -502,10 +515,6 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
Expand Down
18 changes: 15 additions & 3 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ def [](target_info)
}
end

def unwatch_removed_targets(existing_targets)
@map.reject { |key, entry|
existing_targets.key?(key)
}.each_key { |key|
unwatch_key(key)
}
end

def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
unwatch_key(@follow_inodes ? target_info.ino : target_info.path)
end

def load(existing_targets = nil)
Expand Down Expand Up @@ -118,6 +124,12 @@ def try_compact

private

def unwatch_key(key)
if (entry = @map.delete(key))
entry.update_pos(UNWATCHED_POSITION)
end
end

def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries
Expand Down
32 changes: 31 additions & 1 deletion test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class IntailPositionFileTest < Test::Unit::TestCase
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}
TEST_CONTENT_INODES = {
1 => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
0 => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}

def write_data(f, content)
f.write(content)
Expand Down Expand Up @@ -221,7 +225,7 @@ def follow_inodes_block
end

sub_test_case '#unwatch' do
test 'deletes entry by path' do
test 'unwatch entry by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = File.stat(@file).ino
Expand All @@ -239,6 +243,32 @@ def follow_inodes_block

assert_not_equal p1, p2
end

test 'unwatch entries by inode' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_INODES, logger: $log)

existing_targets = TEST_CONTENT_INODES.select do |inode, target_info|
inode == 1
end
pe_to_unwatch = pf[TEST_CONTENT_INODES[0]]

pf.unwatch_removed_targets(existing_targets)

assert_equal(
{
map_keys: [TEST_CONTENT_INODES[1].ino],
unwatched_pe_pos: Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION,
},
{
map_keys: pf.instance_variable_get(:@map).keys,
unwatched_pe_pos: pe_to_unwatch.read_pos,
}
)

unwatched_pe_retaken = pf[TEST_CONTENT_INODES[0]]
assert_not_equal pe_to_unwatch, unwatched_pe_retaken
end
end

sub_test_case 'FilePositionEntry' do
Expand Down
31 changes: 9 additions & 22 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2704,22 +2704,14 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down Expand Up @@ -2802,7 +2794,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
Expand Down Expand Up @@ -2861,8 +2854,9 @@ def test_updateTW_after_refreshTW
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# However, it is already added in `refresh_watcher`, so `update_watcher` doesn't create the new TailWatcher.
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

Expand All @@ -2886,22 +2880,15 @@ def test_updateTW_after_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down

0 comments on commit 51848da

Please sign in to comment.