Skip to content

Commit

Permalink
in_tail: use inode for key of TailWatchers
Browse files Browse the repository at this point in the history
This allows unwatching to be completely handled in `refresh_wathcer`.

`update_watcher` needed to unwatch because the key was path, but it
caused log duplication in some cases.

This key change fixes that problem.

Note about the key of TailWatchers:
Regardless of `follow_inodes`, both path or inode is acceptable
for the key of `tails`.
It is not wrong that the current logic uses `path` for the key.
It just means that using inode as the key can solve the unwatching
problem easily.

perhaps we don't need to use different keys depending on
`follow_inodes`. We can always use inode for the key. (Future work)

Signed-off-by: Katuya Kawakami <[email protected]>
Signed-off-by: Masaki Hatada <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
kattz-kawa and daipom committed Jul 11, 2023
1 parent 77d7229 commit ce898dd
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 45 deletions.
37 changes: 24 additions & 13 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ def expand_paths

def existence_path
hash = {}
@tails.each {|path, tw|
@tails.each_value do |tw|
if @follow_inodes
hash[tw.ino] = TargetInfo.new(tw.path, tw.ino)
else
hash[tw.path] = TargetInfo.new(tw.path, tw.ino)
end
}
end
hash
end

Expand Down Expand Up @@ -443,7 +443,12 @@ def construct_watcher(target_info)
return
end

@tails[path] = tw
if @follow_inodes
@tails[target_info.ino] = tw
else
@tails[path] = tw
end

tw.on_notify
end

Expand All @@ -459,9 +464,17 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
remove_path_from_group_watcher(target_info.path)

if remove_watcher
tw = @tails.delete(target_info.path)
if @follow_inodes
tw = @tails.delete(target_info.ino)
else
tw = @tails.delete(target_info.path)
end
else
tw = @tails[target_info.path]
if @follow_inodes
tw = @tails[target_info.ino]
else
tw = @tails[target_info.path]
end
end
if tw
tw.unwatched = unwatched
Expand All @@ -475,8 +488,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end

def close_watcher_handles
@tails.keys.each do |path|
tw = @tails.delete(path)
@tails.keys.each do |key|
tw = @tails.delete(key)
if tw
tw.close
end
Expand All @@ -502,16 +515,14 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true
@tails.delete(tail_watcher.ino)

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
# In this case, we don't need to create a new TailWatcher.
if new_position_entry.read_inode == 0
@tails[path] = setup_watcher(new_target_info, new_position_entry)
@tails[path].on_notify
@tails[new_inode] = setup_watcher(new_target_info, new_position_entry)
@tails[new_inode].on_notify
end
else
@tails[path] = setup_watcher(new_target_info, pe)
Expand Down
78 changes: 46 additions & 32 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2156,13 +2156,13 @@ def test_should_close_watcher_after_rotate_wait
target_info = create_target_info("#{@tmp_dir}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info.path]
assert d.instance.instance_variable_get(:@tails)[target_info.ino]

Timecop.travel(now + 10) do
d.instance.instance_eval do
sleep 0.1 until @tails[target_info.path] == nil
sleep 0.1 until @tails[target_info.ino] == nil
end
assert_nil d.instance.instance_variable_get(:@tails)[target_info.path]
assert_nil d.instance.instance_variable_get(:@tails)[target_info.ino]
end
d.instance_shutdown
end
Expand Down Expand Up @@ -2679,12 +2679,20 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
# @tails:
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
# The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is just `1s`.
sleep 3

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# Append to the new current log file.
Expand All @@ -2704,22 +2712,15 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down Expand Up @@ -2769,15 +2770,23 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# @tails:
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
sleep 2

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# The old TailWathcer is detached here since `rotate_wait` is `4s`.
# The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is `4s`.
sleep 3

# Append to the new current log file.
Expand All @@ -2802,7 +2811,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
Expand Down Expand Up @@ -2856,17 +2866,28 @@ def test_updateTW_after_refreshTW
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# This reproduces the following situation:
# Rotation => refresh_watchers => update_watcher
# Rotation => refresh_watchers => update_watcher
# This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1)
# This overwrites `@tails["tail.txt"]`
# @tails => {
# inode_0: TailWatcher(path: "tail.txt", inode: inode_0),
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
# However, it is already added in `refresh_watcher`, so this only remove and detach the old TailWatcher.
# It is detached here since `rotate_wait` is just `1s`.
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
sleep 3

# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# Append to the new current log file.
Expand All @@ -2886,22 +2907,15 @@ def test_updateTW_after_refreshTW

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
Expand Down

0 comments on commit ce898dd

Please sign in to comment.