Skip to content

Commit

Permalink
add ITR::Coverage::Writer which encapsulates a background worker thre…
Browse files Browse the repository at this point in the history
…ad with a buffer to collect and send out code coverage events
  • Loading branch information
anmarchenko committed Mar 26, 2024
1 parent e73065c commit 78c7213
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 0 deletions.
100 changes: 100 additions & 0 deletions lib/datadog/ci/itr/coverage/writer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# frozen_string_literal: true

require "datadog/core/workers/async"
require "datadog/core/workers/queue"
require "datadog/core/workers/polling"

require "datadog/core/buffer/cruby"
require "datadog/core/buffer/thread_safe"

require "datadog/core/environment/ext"

module Datadog
module CI
module ITR
module Coverage
class Writer
include Core::Workers::Queue
include Core::Workers::Polling

attr_reader :transport

DEFAULT_BUFFER_MAX_SIZE = 10_000
DEFAULT_SHUTDOWN_TIMEOUT = 60

def initialize(transport:, options: {})
@transport = transport

# Workers::Polling settings
self.enabled = options.fetch(:enabled, true)

# Workers::Async::Thread settings
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART

# Workers::IntervalLoop settings
self.loop_base_interval = options[:interval] if options.key?(:interval)
self.loop_back_off_ratio = options[:back_off_ratio] if options.key?(:back_off_ratio)
self.loop_back_off_max = options[:back_off_max] if options.key?(:back_off_max)

@buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)

self.buffer = buffer_klass.new(@buffer_size)

@shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT)
end

def write(event)
# Start worker thread. If the process has forked, it will trigger #after_fork to
# reconfigure the worker accordingly.
perform

enqueue(event)
end

def perform(*events)
responses = transport.send_events(events)

loop_back_off! if responses.find(&:server_error?)

nil
end

def stop(force_stop = false, timeout = @shutdown_timeout)
buffer.close if running?

super
end

def enqueue(event)
buffer.push(event)
end

def dequeue
# Wrap results in Array because they are
# splatted as args against Coverage::Writer#perform.
[buffer.pop]
end

def async?
true
end

def after_fork
# In multiprocess environments, forks will share the same buffer until its written to.
# A.K.A. copy-on-write. We don't want forks to write events generated from another process.
# Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.)
self.buffer = buffer_klass.new(@buffer_size)
end

def buffer_klass
if Core::Environment::Ext::RUBY_ENGINE == "ruby"
Core::Buffer::CRuby
else
Core::Buffer::ThreadSafe
end
end
end
end
end
end
end
8 changes: 8 additions & 0 deletions lib/datadog/ci/itr/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ def stop_coverage(_test)
return if !enabled? || !code_coverage?

coverage_collector&.stop
# event = Coverage::Event.new(
# test_id: test.id,
# test_suite_id:
# test.test_suite_id,
# test_session_id: test.test_session_id,
# coverage: coverage
# )
# @coverage_writer.write(event)
end

private
Expand Down
44 changes: 44 additions & 0 deletions sig/datadog/ci/itr/coverage/writer.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Datadog
module CI
module ITR
module Coverage
class Writer
include Datadog::Core::Workers::Async::Thread
include Datadog::Core::Workers::Polling
include Datadog::Core::Workers::Queue
include Datadog::Core::Workers::IntervalLoop

@transport: Datadog::CI::ITR::Coverage::Transport

@buffer_size: Integer

@shutdown_timeout: Integer

attr_reader transport: Datadog::CI::ITR::Coverage::Transport

DEFAULT_BUFFER_MAX_SIZE: 10000

DEFAULT_SHUTDOWN_TIMEOUT: 60

def initialize: (transport: Datadog::CI::ITR::Coverage::Transport, ?options: ::Hash[untyped, untyped]) -> void

def write: (Datadog::CI::ITR::Coverage::Event event) -> untyped

def perform: (*Datadog::CI::ITR::Coverage::Event events) -> nil

def stop: (?bool force_stop, ?Integer timeout) -> untyped

def enqueue: (Datadog::CI::ITR::Coverage::Event event) -> untyped

def dequeue: () -> ::Array[Datadog::CI::ITR::Coverage::Event]

def async?: () -> true

def after_fork: () -> untyped

def buffer_klass: () -> untyped
end
end
end
end
end
2 changes: 2 additions & 0 deletions sig/datadog/ci/transport/http.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ module Datadog
end

class ResponseDecorator < ::SimpleDelegator
include Datadog::Core::Transport::Response

def initialize: (untyped anything) -> void
def trace_count: () -> Integer
end
Expand Down
10 changes: 10 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/buffer/cruby.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module Datadog
module Core
module Buffer
class CRuby < Random
FIXNUM_MAX: untyped
def replace!: (untyped item) -> untyped
end
end
end
end
8 changes: 8 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/buffer/random.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module Datadog
module Core
module Buffer
class Random
end
end
end
end
8 changes: 8 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/buffer/thread_safe.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module Datadog
module Core
module Buffer
class ThreadSafe < Datadog::Core::Buffer::Random
end
end
end
end
69 changes: 69 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/workers/async.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module Datadog
module Core
module Workers
module Async
module Thread
FORK_POLICY_STOP: :stop

FORK_POLICY_RESTART: :restart

SHUTDOWN_TIMEOUT: 1
MUTEX_INIT: untyped

def self.included: (untyped base) -> untyped
module PrependedMethods
def perform: (*untyped args) -> (untyped | nil)
end

attr_reader error: untyped

attr_reader result: untyped

attr_writer fork_policy: untyped

def join: (?untyped? timeout) -> (true | untyped)

def terminate: () -> (false | true)

def run_async?: () -> (false | untyped)

def started?: () -> untyped

def running?: () -> untyped

def error?: () -> (false | untyped)

def completed?: () -> untyped

def failed?: () -> untyped

def forked?: () -> untyped

def fork_policy: () -> untyped

attr_writer result: untyped

def mutex: () -> untyped

def after_fork: () -> nil

private

attr_reader pid: untyped

def mutex_after_fork: () -> untyped

def worker: () -> untyped

def start_async: () ?{ () -> untyped } -> untyped

def start_worker: () { () -> untyped } -> nil

def stop_fork: () -> untyped

def restart_after_fork: () ?{ () -> untyped } -> untyped
end
end
end
end
end
52 changes: 52 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/workers/interval_loop.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
module Datadog
module Core
module Workers
module IntervalLoop
BACK_OFF_RATIO: ::Float

BACK_OFF_MAX: 5

BASE_INTERVAL: 1
MUTEX_INIT: untyped

def self.included: (untyped base) -> untyped
module PrependedMethods
def perform: (*untyped args) -> untyped
end

def stop_loop: () -> (false | true)

def work_pending?: () -> untyped

def run_loop?: () -> (false | untyped)

def loop_base_interval: () -> untyped

def loop_back_off_ratio: () -> untyped

def loop_back_off_max: () -> untyped

def loop_wait_time: () -> untyped

def loop_wait_time=: (untyped value) -> untyped

def loop_back_off!: () -> untyped
def loop_wait_before_first_iteration?: () -> false

attr_writer loop_back_off_max: untyped

attr_writer loop_back_off_ratio: untyped

attr_writer loop_base_interval: untyped

def mutex: () -> untyped

private

def perform_loop: () { () -> untyped } -> (nil | untyped)

def shutdown: () -> untyped
end
end
end
end
21 changes: 21 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/workers/polling.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Datadog
module Core
module Workers
module Polling
SHUTDOWN_TIMEOUT: 1

def self.included: (Class | Module base) -> void

module PrependedMethods
def perform: (*untyped args) -> untyped
end

def stop: (?bool force_stop, ?::Integer timeout) -> untyped

def enabled?: () -> bool

def enabled=: (bool value) -> bool
end
end
end
end
21 changes: 21 additions & 0 deletions vendor/rbs/ddtrace/0/datadog/core/workers/queue.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Datadog
module Core
module Workers
module Queue
def self.included: (untyped base) -> untyped
module PrependedMethods
def perform: (*untyped args) -> (untyped | nil)
end

def buffer: () -> untyped

def enqueue: (*untyped args) -> untyped

def dequeue: () -> untyped
def work_pending?: () -> untyped

attr_writer buffer: untyped
end
end
end
end

0 comments on commit 78c7213

Please sign in to comment.