Skip to content

Commit

Permalink
in_tail: use inode for key of TailWatchers when follow_inodes
Browse files Browse the repository at this point in the history
This may improve the maintainability.

Note: Regardless of `follow_inodes`, both path or inode is acceptable
for the key of `tails`.
It is not wrong that the current logic uses `path` for the key.

Perhaps we don't need to use different keys depending on
`follow_inodes`. We can always use inode for the key. (Future work)

Signed-off-by: Katuya Kawakami <[email protected]>
Signed-off-by: Masaki Hatada <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
kattz-kawa and daipom committed Jul 11, 2023
1 parent 77d7229 commit 45b193f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 23 deletions.
48 changes: 36 additions & 12 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,13 @@ def expand_paths

def existence_path
hash = {}
@tails.each {|path, tw|
@tails.each_value do |tw|
if @follow_inodes
hash[tw.ino] = TargetInfo.new(tw.path, tw.ino)
else
hash[tw.path] = TargetInfo.new(tw.path, tw.ino)
end
}
end
hash
end

Expand Down Expand Up @@ -443,7 +443,12 @@ def construct_watcher(target_info)
return
end

@tails[path] = tw
if @follow_inodes
@tails[target_info.ino] = tw
else
@tails[path] = tw
end

tw.on_notify
end

Expand All @@ -459,9 +464,17 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
remove_path_from_group_watcher(target_info.path)

if remove_watcher
tw = @tails.delete(target_info.path)
if @follow_inodes
tw = @tails.delete(target_info.ino)
else
tw = @tails.delete(target_info.path)
end
else
tw = @tails[target_info.path]
if @follow_inodes
tw = @tails[target_info.ino]
else
tw = @tails[target_info.path]
end
end
if tw
tw.unwatched = unwatched
Expand All @@ -475,8 +488,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end

def close_watcher_handles
@tails.keys.each do |path|
tw = @tails.delete(path)
@tails.keys.each do |key|
tw = @tails.delete(key)
if tw
tw.close
end
Expand All @@ -502,16 +515,27 @@ def update_watcher(tail_watcher, pe, new_inode)
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
@tails.delete(tail_watcher.ino)

# TODO: This can cause log duplication. We need to fix this.
# (This problem exists from the start of implementation of follow_inodes)
#
# The old inode can still exist.
# In that case, we can't unwatch it because it causes log duplication.
# (`refresh_watcher` will find the inode as a new inode and read them again.)
#
# However, if the old inode already has disappeared, it must be unwatched here.
# It is because the old inode is removed from `@tails`, so `refresh_watcher` can't
# recognize that it has disappeared and need to be unwatched.
# So simply removing this line causes leak of unwatch.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
# In this case, we don't need to create a new TailWatcher.
if new_position_entry.read_inode == 0
@tails[path] = setup_watcher(new_target_info, new_position_entry)
@tails[path].on_notify
@tails[new_inode] = setup_watcher(new_target_info, new_position_entry)
@tails[new_inode].on_notify
end
else
@tails[path] = setup_watcher(new_target_info, pe)
Expand Down
49 changes: 38 additions & 11 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2156,13 +2156,13 @@ def test_should_close_watcher_after_rotate_wait
target_info = create_target_info("#{@tmp_dir}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info.path]
assert d.instance.instance_variable_get(:@tails)[target_info.ino]

Timecop.travel(now + 10) do
d.instance.instance_eval do
sleep 0.1 until @tails[target_info.path] == nil
sleep 0.1 until @tails[target_info.ino] == nil
end
assert_nil d.instance.instance_variable_get(:@tails)[target_info.path]
assert_nil d.instance.instance_variable_get(:@tails)[target_info.ino]
end
d.instance_shutdown
end
Expand Down Expand Up @@ -2679,12 +2679,20 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
# @tails:
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
# The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is just `1s`.
sleep 3

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# Append to the new current log file.
Expand Down Expand Up @@ -2769,15 +2777,23 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# @tails:
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
sleep 2

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# The old TailWathcer is detached here since `rotate_wait` is `4s`.
# The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is `4s`.
sleep 3

# Append to the new current log file.
Expand Down Expand Up @@ -2856,17 +2872,28 @@ def test_updateTW_after_refreshTW
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# This reproduces the following situation:
# Rotation => refresh_watchers => update_watcher
# Rotation => refresh_watchers => update_watcher
# This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1)
# This overwrites `@tails["tail.txt"]`
# @tails => {
# inode_0: TailWatcher(path: "tail.txt", inode: inode_0),
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
# However, it is already added in `refresh_watcher`, so this only remove and detach the old TailWatcher.
# It is detached here since `rotate_wait` is just `1s`.
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# }
sleep 3

# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
# @tails => {
# inode_1: TailWatcher(path: "tail.txt", inode: inode_1),
# inode_0: TailWatcher(path: "tail.txt1", inode: inode_0),
# }
d.instance.refresh_watchers

# Append to the new current log file.
Expand Down

0 comments on commit 45b193f

Please sign in to comment.