From ce898dd572d5745663abd456486169d772daa8aa Mon Sep 17 00:00:00 2001 From: Katuya Kawakami Date: Tue, 11 Jul 2023 12:44:02 +0900 Subject: [PATCH] in_tail: use inode for key of TailWatchers This allows unwatching to be completely handled in `refresh_wathcer`. `update_watcher` needed to unwatch because the key was path, but it caused log duplication in some cases. This key change fixes that problem. Note about the key of TailWatchers: Regardless of `follow_inodes`, both path or inode is acceptable for the key of `tails`. It is not wrong that the current logic uses `path` for the key. It just means that using inode as the key can solve the unwatching problem easily. perhaps we don't need to use different keys depending on `follow_inodes`. We can always use inode for the key. (Future work) Signed-off-by: Katuya Kawakami Signed-off-by: Masaki Hatada Co-authored-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 37 +++++++++++------ test/plugin/test_in_tail.rb | 78 +++++++++++++++++++++--------------- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 88556ca6ab..7e9f2d5e1c 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -352,13 +352,13 @@ def expand_paths def existence_path hash = {} - @tails.each {|path, tw| + @tails.each_value do |tw| if @follow_inodes hash[tw.ino] = TargetInfo.new(tw.path, tw.ino) else hash[tw.path] = TargetInfo.new(tw.path, tw.ino) end - } + end hash end @@ -443,7 +443,12 @@ def construct_watcher(target_info) return end - @tails[path] = tw + if @follow_inodes + @tails[target_info.ino] = tw + else + @tails[path] = tw + end + tw.on_notify end @@ -459,9 +464,17 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch remove_path_from_group_watcher(target_info.path) if remove_watcher - tw = @tails.delete(target_info.path) + if @follow_inodes + tw = @tails.delete(target_info.ino) + else + tw = @tails.delete(target_info.path) + end else - tw = @tails[target_info.path] + if @follow_inodes + tw = @tails[target_info.ino] + else + tw = @tails[target_info.path] + end end if tw tw.unwatched = unwatched @@ -475,8 +488,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch end def close_watcher_handles - @tails.keys.each do |path| - tw = @tails.delete(path) + @tails.keys.each do |key| + tw = @tails.delete(key) if tw tw.close end @@ -502,16 +515,14 @@ def update_watcher(tail_watcher, pe, new_inode) new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes - # When follow_inodes is true, it's not cleaned up by refresh_watcher. - # So it should be unwatched here explicitly. - tail_watcher.unwatched = true + @tails.delete(tail_watcher.ino) new_position_entry = @pf[new_target_info] # If `refresh_watcher` find the new file before, this will not be zero. - # In this case, only we have to do is detaching the current tail_watcher. + # In this case, we don't need to create a new TailWatcher. if new_position_entry.read_inode == 0 - @tails[path] = setup_watcher(new_target_info, new_position_entry) - @tails[path].on_notify + @tails[new_inode] = setup_watcher(new_target_info, new_position_entry) + @tails[new_inode].on_notify end else @tails[path] = setup_watcher(new_target_info, pe) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 81bf0dfa7d..0ac25bc97c 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2156,13 +2156,13 @@ def test_should_close_watcher_after_rotate_wait target_info = create_target_info("#{@tmp_dir}/tail.txt") mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once d.run(shutdown: false) - assert d.instance.instance_variable_get(:@tails)[target_info.path] + assert d.instance.instance_variable_get(:@tails)[target_info.ino] Timecop.travel(now + 10) do d.instance.instance_eval do - sleep 0.1 until @tails[target_info.path] == nil + sleep 0.1 until @tails[target_info.ino] == nil end - assert_nil d.instance.instance_variable_get(:@tails)[target_info.path] + assert_nil d.instance.instance_variable_get(:@tails)[target_info.ino] end d.instance_shutdown end @@ -2679,12 +2679,20 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) - # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + # @tails: + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } + # The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is just `1s`. sleep 3 # This reproduces the following situation: # Rotation => update_watcher => refresh_watchers # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers # Append to the new current log file. @@ -2704,22 +2712,15 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW assert_equal( { - # TODO: This is BUG!! We need to fix it and replace this with the next. - record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], - # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], - # TODO: This is BUG!! We need to fix it and replace this with the next. position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], ], - # position_entries: [ - # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], - # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - # ], }, { record_values: record_values, @@ -2769,15 +2770,23 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: - # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # @tails: + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } sleep 2 # This reproduces the following situation: - # Rotation => update_watcher => refresh_watchers + # Rotation => update_watcher => refresh_watchers # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers - # The old TailWathcer is detached here since `rotate_wait` is `4s`. + # The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is `4s`. sleep 3 # Append to the new current log file. @@ -2802,7 +2811,8 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], ], }, @@ -2856,17 +2866,28 @@ def test_updateTW_after_refreshTW Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} # This reproduces the following situation: - # Rotation => refresh_watchers => update_watcher + # Rotation => refresh_watchers => update_watcher # This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1) - # This overwrites `@tails["tail.txt"]` + # @tails => { + # inode_0: TailWatcher(path: "tail.txt", inode: inode_0), + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } d.instance.refresh_watchers - # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher: # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) - # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + # However, it is already added in `refresh_watcher`, so this only remove and detach the old TailWatcher. + # It is detached here since `rotate_wait` is just `1s`. + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } sleep 3 # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers # Append to the new current log file. @@ -2886,22 +2907,15 @@ def test_updateTW_after_refreshTW assert_equal( { - # TODO: This is BUG!! We need to fix it and replace this with the next. - record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], - # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], tail_watcher_inodes: [inode_0, inode_1, inode_0], tail_watcher_io_handler_opened_statuses: [false, false, false], - # TODO: This is BUG!! We need to fix it and replace this with the next. position_entries: [ - ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # The recorded path is old, but it is no problem. The path is not used when using follow_inodes. + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_0], ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], ], - # position_entries: [ - # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], - # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], - # ], }, { record_values: record_values,