From 78c72130dda3fa9c7e717fe010429ad3af8acac6 Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Tue, 26 Mar 2024 13:41:23 +0100 Subject: [PATCH] add ITR::Coverage::Writer which encapsulates a background worker thread with a buffer to collect and send out code coverage events --- lib/datadog/ci/itr/coverage/writer.rb | 100 ++++++++++++++++++ lib/datadog/ci/itr/runner.rb | 8 ++ sig/datadog/ci/itr/coverage/writer.rbs | 44 ++++++++ sig/datadog/ci/transport/http.rbs | 2 + .../ddtrace/0/datadog/core/buffer/cruby.rbs | 10 ++ .../ddtrace/0/datadog/core/buffer/random.rbs | 8 ++ .../0/datadog/core/buffer/thread_safe.rbs | 8 ++ .../ddtrace/0/datadog/core/workers/async.rbs | 69 ++++++++++++ .../0/datadog/core/workers/interval_loop.rbs | 52 +++++++++ .../0/datadog/core/workers/polling.rbs | 21 ++++ .../ddtrace/0/datadog/core/workers/queue.rbs | 21 ++++ 11 files changed, 343 insertions(+) create mode 100644 lib/datadog/ci/itr/coverage/writer.rb create mode 100644 sig/datadog/ci/itr/coverage/writer.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/buffer/cruby.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/buffer/random.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/buffer/thread_safe.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/workers/async.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/workers/interval_loop.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/workers/polling.rbs create mode 100644 vendor/rbs/ddtrace/0/datadog/core/workers/queue.rbs diff --git a/lib/datadog/ci/itr/coverage/writer.rb b/lib/datadog/ci/itr/coverage/writer.rb new file mode 100644 index 00000000..682b18fa --- /dev/null +++ b/lib/datadog/ci/itr/coverage/writer.rb @@ -0,0 +1,100 @@ +# frozen_string_literal: true + +require "datadog/core/workers/async" +require "datadog/core/workers/queue" +require "datadog/core/workers/polling" + +require "datadog/core/buffer/cruby" +require "datadog/core/buffer/thread_safe" + +require "datadog/core/environment/ext" + +module Datadog + module CI + module ITR + module Coverage + class Writer + include Core::Workers::Queue + include Core::Workers::Polling + + attr_reader :transport + + DEFAULT_BUFFER_MAX_SIZE = 10_000 + DEFAULT_SHUTDOWN_TIMEOUT = 60 + + def initialize(transport:, options: {}) + @transport = transport + + # Workers::Polling settings + self.enabled = options.fetch(:enabled, true) + + # Workers::Async::Thread settings + self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART + + # Workers::IntervalLoop settings + self.loop_base_interval = options[:interval] if options.key?(:interval) + self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio) + self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max) + + @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) + + self.buffer = buffer_klass.new(@buffer_size) + + @shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT) + end + + def write(event) + # Start worker thread. If the process has forked, it will trigger #after_fork to + # reconfigure the worker accordingly. + perform + + enqueue(event) + end + + def perform(*events) + responses = transport.send_events(events) + + loop_back_off! if responses.find(&:server_error?) + + nil + end + + def stop(force_stop = false, timeout = @shutdown_timeout) + buffer.close if running? + + super + end + + def enqueue(event) + buffer.push(event) + end + + def dequeue + # Wrap results in Array because they are + # splatted as args against Coverage::Writer#perform. + [buffer.pop] + end + + def async? + true + end + + def after_fork + # In multiprocess environments, forks will share the same buffer until its written to. + # A.K.A. copy-on-write. We don't want forks to write events generated from another process. + # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.) + self.buffer = buffer_klass.new(@buffer_size) + end + + def buffer_klass + if Core::Environment::Ext::RUBY_ENGINE == "ruby" + Core::Buffer::CRuby + else + Core::Buffer::ThreadSafe + end + end + end + end + end + end +end diff --git a/lib/datadog/ci/itr/runner.rb b/lib/datadog/ci/itr/runner.rb index bf3ea489..2f276982 100644 --- a/lib/datadog/ci/itr/runner.rb +++ b/lib/datadog/ci/itr/runner.rb @@ -70,6 +70,14 @@ def stop_coverage(_test) return if !enabled? || !code_coverage? coverage_collector&.stop + # event = Coverage::Event.new( + # test_id: test.id, + # test_suite_id: + # test.test_suite_id, + # test_session_id: test.test_session_id, + # coverage: coverage + # ) + # @coverage_writer.write(event) end private diff --git a/sig/datadog/ci/itr/coverage/writer.rbs b/sig/datadog/ci/itr/coverage/writer.rbs new file mode 100644 index 00000000..3e1f269f --- /dev/null +++ b/sig/datadog/ci/itr/coverage/writer.rbs @@ -0,0 +1,44 @@ +module Datadog + module CI + module ITR + module Coverage + class Writer + include Datadog::Core::Workers::Async::Thread + include Datadog::Core::Workers::Polling + include Datadog::Core::Workers::Queue + include Datadog::Core::Workers::IntervalLoop + + @transport: Datadog::CI::ITR::Coverage::Transport + + @buffer_size: Integer + + @shutdown_timeout: Integer + + attr_reader transport: Datadog::CI::ITR::Coverage::Transport + + DEFAULT_BUFFER_MAX_SIZE: 10000 + + DEFAULT_SHUTDOWN_TIMEOUT: 60 + + def initialize: (transport: Datadog::CI::ITR::Coverage::Transport, ?options: ::Hash[untyped, untyped]) -> void + + def write: (Datadog::CI::ITR::Coverage::Event event) -> untyped + + def perform: (*Datadog::CI::ITR::Coverage::Event events) -> nil + + def stop: (?bool force_stop, ?Integer timeout) -> untyped + + def enqueue: (Datadog::CI::ITR::Coverage::Event event) -> untyped + + def dequeue: () -> ::Array[Datadog::CI::ITR::Coverage::Event] + + def async?: () -> true + + def after_fork: () -> untyped + + def buffer_klass: () -> untyped + end + end + end + end +end diff --git a/sig/datadog/ci/transport/http.rbs b/sig/datadog/ci/transport/http.rbs index cf0110bd..9b02cebb 100644 --- a/sig/datadog/ci/transport/http.rbs +++ b/sig/datadog/ci/transport/http.rbs @@ -40,6 +40,8 @@ module Datadog end class ResponseDecorator < ::SimpleDelegator + include Datadog::Core::Transport::Response + def initialize: (untyped anything) -> void def trace_count: () -> Integer end diff --git a/vendor/rbs/ddtrace/0/datadog/core/buffer/cruby.rbs b/vendor/rbs/ddtrace/0/datadog/core/buffer/cruby.rbs new file mode 100644 index 00000000..05b7430f --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/buffer/cruby.rbs @@ -0,0 +1,10 @@ +module Datadog + module Core + module Buffer + class CRuby < Random + FIXNUM_MAX: untyped + def replace!: (untyped item) -> untyped + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/buffer/random.rbs b/vendor/rbs/ddtrace/0/datadog/core/buffer/random.rbs new file mode 100644 index 00000000..72a00f85 --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/buffer/random.rbs @@ -0,0 +1,8 @@ +module Datadog + module Core + module Buffer + class Random + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/buffer/thread_safe.rbs b/vendor/rbs/ddtrace/0/datadog/core/buffer/thread_safe.rbs new file mode 100644 index 00000000..52f83646 --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/buffer/thread_safe.rbs @@ -0,0 +1,8 @@ +module Datadog + module Core + module Buffer + class ThreadSafe < Datadog::Core::Buffer::Random + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/workers/async.rbs b/vendor/rbs/ddtrace/0/datadog/core/workers/async.rbs new file mode 100644 index 00000000..cc381ca0 --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/workers/async.rbs @@ -0,0 +1,69 @@ +module Datadog + module Core + module Workers + module Async + module Thread + FORK_POLICY_STOP: :stop + + FORK_POLICY_RESTART: :restart + + SHUTDOWN_TIMEOUT: 1 + MUTEX_INIT: untyped + + def self.included: (untyped base) -> untyped + module PrependedMethods + def perform: (*untyped args) -> (untyped | nil) + end + + attr_reader error: untyped + + attr_reader result: untyped + + attr_writer fork_policy: untyped + + def join: (?untyped? timeout) -> (true | untyped) + + def terminate: () -> (false | true) + + def run_async?: () -> (false | untyped) + + def started?: () -> untyped + + def running?: () -> untyped + + def error?: () -> (false | untyped) + + def completed?: () -> untyped + + def failed?: () -> untyped + + def forked?: () -> untyped + + def fork_policy: () -> untyped + + attr_writer result: untyped + + def mutex: () -> untyped + + def after_fork: () -> nil + + private + + attr_reader pid: untyped + + def mutex_after_fork: () -> untyped + + def worker: () -> untyped + + def start_async: () ?{ () -> untyped } -> untyped + + def start_worker: () { () -> untyped } -> nil + + def stop_fork: () -> untyped + + def restart_after_fork: () ?{ () -> untyped } -> untyped + end + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/workers/interval_loop.rbs b/vendor/rbs/ddtrace/0/datadog/core/workers/interval_loop.rbs new file mode 100644 index 00000000..21e58087 --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/workers/interval_loop.rbs @@ -0,0 +1,52 @@ +module Datadog + module Core + module Workers + module IntervalLoop + BACK_OFF_RATIO: ::Float + + BACK_OFF_MAX: 5 + + BASE_INTERVAL: 1 + MUTEX_INIT: untyped + + def self.included: (untyped base) -> untyped + module PrependedMethods + def perform: (*untyped args) -> untyped + end + + def stop_loop: () -> (false | true) + + def work_pending?: () -> untyped + + def run_loop?: () -> (false | untyped) + + def loop_base_interval: () -> untyped + + def loop_back_off_ratio: () -> untyped + + def loop_back_off_max: () -> untyped + + def loop_wait_time: () -> untyped + + def loop_wait_time=: (untyped value) -> untyped + + def loop_back_off!: () -> untyped + def loop_wait_before_first_iteration?: () -> false + + attr_writer loop_back_off_max: untyped + + attr_writer loop_back_off_ratio: untyped + + attr_writer loop_base_interval: untyped + + def mutex: () -> untyped + + private + + def perform_loop: () { () -> untyped } -> (nil | untyped) + + def shutdown: () -> untyped + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/workers/polling.rbs b/vendor/rbs/ddtrace/0/datadog/core/workers/polling.rbs new file mode 100644 index 00000000..7f4d8f9c --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/workers/polling.rbs @@ -0,0 +1,21 @@ +module Datadog + module Core + module Workers + module Polling + SHUTDOWN_TIMEOUT: 1 + + def self.included: (Class | Module base) -> void + + module PrependedMethods + def perform: (*untyped args) -> untyped + end + + def stop: (?bool force_stop, ?::Integer timeout) -> untyped + + def enabled?: () -> bool + + def enabled=: (bool value) -> bool + end + end + end +end diff --git a/vendor/rbs/ddtrace/0/datadog/core/workers/queue.rbs b/vendor/rbs/ddtrace/0/datadog/core/workers/queue.rbs new file mode 100644 index 00000000..e538c89e --- /dev/null +++ b/vendor/rbs/ddtrace/0/datadog/core/workers/queue.rbs @@ -0,0 +1,21 @@ +module Datadog + module Core + module Workers + module Queue + def self.included: (untyped base) -> untyped + module PrependedMethods + def perform: (*untyped args) -> (untyped | nil) + end + + def buffer: () -> untyped + + def enqueue: (*untyped args) -> untyped + + def dequeue: () -> untyped + def work_pending?: () -> untyped + + attr_writer buffer: untyped + end + end + end +end