diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 866dc91b367..e7484aa3fe6 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -4,6 +4,15 @@ env: VAULT_PATH: "kv/ci-shared/observability-ingest/cloud/gcp" DOCKER_REGISTRY: "docker.elastic.co" steps: + - label: "check-ci" + key: "check-ci" + command: ".buildkite/scripts/steps/check-ci.sh" + agents: + provider: "gcp" + image: "family/core-ubuntu-2204" + retry: + manual: + allowed: true - group: "Unit tests" key: "unit-tests" steps: @@ -219,6 +228,18 @@ steps: provider: "gcp" machineType: "n1-standard-8" + - label: "Serverless Beats Tests" + key: "serverless-beats-integration-tests" + command: ".buildkite/scripts/steps/beats_tests.sh" + if: "build.env('CRON') == 'yes'" + agents: + provider: "gcp" + machineType: "n1-standard-8" + retry: + manual: + allowed: true + + - wait: ~ continue_on_failure: true - label: "Processing test results" @@ -227,3 +248,23 @@ steps: plugins: - junit-annotate#v2.4.1: artifacts: build/TEST-go-integration*.xml + + # Triggers a dynamic step: Sync K8s + # Runs only on main and if k8s files are changed + - label: "Trigger k8s sync" + branches: main + plugins: + - monebag/monorepo-diff#v2.5.9: + diff: "git diff --name-only HEAD~1" + watch: + - path: + - deploy/kubernetes/* + - version/docs/version.asciidoc + config: + label: "Sync K8s" + command: ".buildkite/scripts/steps/sync-k8s.sh" + agents: + provider: "gcp" + image: "family/core-ubuntu-2204" + env: + - GH_VERSION=2.4.0 diff --git a/.buildkite/scripts/common.sh b/.buildkite/scripts/common.sh index 16d32fb77ab..86c370ae360 100644 --- a/.buildkite/scripts/common.sh +++ b/.buildkite/scripts/common.sh @@ -53,7 +53,7 @@ getOSOptions() { # Wrapper function for executing mage mage() { go version - if ! [ -x "$(type -p mage | sed 's/mage is //g')" ]; + if ! [ -x "$(type -P mage | sed 's/mage is //g')" ]; then echo "installing mage ${SETUP_MAGE_VERSION}" make mage @@ -68,7 +68,7 @@ mage() { # Wrapper function for executing go go(){ # Search for the go in the Path - if ! [ -x "$(type -p go | sed 's/go is //g')" ]; + if ! [ -x "$(type -P go | sed 's/go is //g')" ]; then getOSOptions echo "installing golang "${GO_VERSION}" for "${AGENT_OS_NAME}/${AGENT_OS_ARCH}" " diff --git a/.buildkite/scripts/install-gh.sh b/.buildkite/scripts/install-gh.sh new file mode 100644 index 00000000000..ff52687d02f --- /dev/null +++ b/.buildkite/scripts/install-gh.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +# Required environment variables: +# - GH_VERSION - the version of gh to install +set -exuo pipefail + +echo "--- Install gh cli" + +MSG="environment variable missing." +DEFAULT_HOME="/usr/local" +GH_VERSION=${GH_VERSION:-$MSG} +HOME=${HOME:-$DEFAULT_HOME} +GH_CMD="${HOME}/bin/gh" + +if command -v gh +then + set +e + echo "Found GH. Checking version.." + FOUND_GH_VERSION=$(gh --version 2>&1 >/dev/null | awk '{print $3}') + if [ "$FOUND_GH_VERSION" == "$GH_VERSION" ] + then + echo "GH Versions match: $GH_VERSION. No need to install gh. Exiting." + exit 0 + else + echo "GH Version mismatch. Desired version: $GH_VERSION, found version: $FOUND_GH_VERSION. Installing new version." + fi + set -e +fi + +source .buildkite/scripts/common.sh + +OS=$(uname -s| tr '[:upper:]' '[:lower:]') +ARCH=$(uname -m| tr '[:upper:]' '[:lower:]') +if [ "${ARCH}" == "aarch64" ] ; then + ARCH_SUFFIX=arm64 +else + ARCH_SUFFIX=amd64 +fi + +echo "Downloading gh : ${GH_VERSION}..." +TMP_DIR=$(mktemp -d) +if retry 5 curl -sL "https://github.com/cli/cli/releases/download/v${GH_VERSION}/gh_${GH_VERSION}_linux_amd64.tar.gz" | tar xz -C $TMP_DIR ; then + mkdir -p "${HOME}/bin" + mv "${TMP_DIR}/gh_${GH_VERSION}_linux_amd64/bin/gh" "${GH_CMD}" + rm -rf "${TMP_DIR}" +else + echo "Something bad with the download, deleting the binary" + if [ -e "${GH_CMD}" ] ; then + rm "${GH_CMD}" + fi + exit 1 +fi + + diff --git a/.buildkite/scripts/install-kind.sh b/.buildkite/scripts/install-kind.sh index 171480d7685..ee0e0039719 100644 --- a/.buildkite/scripts/install-kind.sh +++ b/.buildkite/scripts/install-kind.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -set -exuo pipefail +set -euo pipefail echo "--- Install Kind" diff --git a/.buildkite/scripts/steps/beats_tests.sh b/.buildkite/scripts/steps/beats_tests.sh new file mode 100755 index 00000000000..b62bc947b6e --- /dev/null +++ b/.buildkite/scripts/steps/beats_tests.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +set -euo pipefail + +#========================= +# NOTE: This entire script is a temporary hack until we have buildkite set up on the beats repo. +# until then, we need some kind of serverless integration tests, hence this script, which just clones the beats repo, +# and runs the serverless integration suite against different beats +# After buildkite is set up on beats, this file/PR should be reverted. +#========================== + +source .buildkite/scripts/common.sh +STACK_PROVISIONER="${1:-"serverless"}" + +run_test_for_beat(){ + local beat_name=$1 + + #build + export WORKSPACE="build/beats/x-pack/${beat_name}" + SNAPSHOT=true PLATFORMS=linux/amd64 PACKAGES=tar.gz,zip mage package + + #run + export AGENT_BUILD_DIR="build/beats/x-pack/${beat_name}/build/distributions" + export WORKSPACE=$(pwd) + + set +e + TEST_INTEG_CLEAN_ON_EXIT=true TEST_PLATFORMS="linux/amd64" STACK_PROVISIONER="$STACK_PROVISIONER" SNAPSHOT=true mage integration:testBeatServerless $beat_name + TESTS_EXIT_STATUS=$? + set -e + + return $TESTS_EXIT_STATUS +} +#run mage before setup, since this will install go and mage +#the setup scripts will do a few things that assume we're running out of elastic-agent and will break things for beats, so run before we do actual setup +mage -l + +mkdir -p build +cd build + +git clone --filter=tree:0 git@github.com:elastic/beats.git +cd .. + +# export WORKSPACE=beats/x-pack/metricbeat + +# SNAPSHOT=true PLATFORMS=linux/amd64,windows/amd64 PACKAGES=tar.gz,zip mage package + + +# cd .. + +# export AGENT_BUILD_DIR=build/beats/x-pack/metricbeat/build/distributions +# export WORKSPACE=$(pwd) + +# set +e +# TEST_INTEG_CLEAN_ON_EXIT=true TEST_PLATFORMS="linux/amd64" STACK_PROVISIONER="$STACK_PROVISIONER" SNAPSHOT=true mage integration:testBeatServerless metricbeat +# TESTS_EXIT_STATUS=$? +# set -e + +# exit $TESTS_EXIT_STATUS + +echo "testing metricbeat..." +run_test_for_beat metricbeat + + + +echo "testing filebeat..." +run_test_for_beat filebeat + + + +echo "testing auditbeat..." +run_test_for_beat auditbeat diff --git a/.buildkite/scripts/steps/check-ci.sh b/.buildkite/scripts/steps/check-ci.sh new file mode 100755 index 00000000000..46923d1b7ef --- /dev/null +++ b/.buildkite/scripts/steps/check-ci.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/scripts/common.sh + +echo "--- Check CI" +go version +mage --version +BEAT_VERSION=$(make get-version) +echo "Beat version: $BEAT_VERSION" +make check-ci \ No newline at end of file diff --git a/.buildkite/scripts/steps/sync-k8s.sh b/.buildkite/scripts/steps/sync-k8s.sh new file mode 100644 index 00000000000..ecd64e815a4 --- /dev/null +++ b/.buildkite/scripts/steps/sync-k8s.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -euo pipefail + +export PATH=$HOME/bin:${PATH} + +source .buildkite/scripts/install-gh.sh +source .buildkite/scripts/common.sh + +export GITHUB_TOKEN=$(retry 5 vault kv get -field token kv/ci-shared/platform-ingest/github_token) + +cd deploy/kubernetes + +echo "--- [File Creation] Create-Needed-Manifest" +WITHOUTCONFIG=true make generate-k8s +./creator_k8s_manifest.sh . + +echo "--- [Clone] Kibana-Repository" +make ci-clone-kibana-repository +cp Makefile ./kibana +cd kibana +echo "--- Create Kibana PR" +make ci-create-kubernetes-templates-pull-request \ No newline at end of file diff --git a/.ci/Jenkinsfile b/.ci/Jenkinsfile deleted file mode 100644 index 23f3ca1e707..00000000000 --- a/.ci/Jenkinsfile +++ /dev/null @@ -1,316 +0,0 @@ -#!/usr/bin/env groovy - -@Library('apm@current') _ - -pipeline { - agent { label 'ubuntu-22 && immutable' } - environment { - REPO = "elastic-agent" - BASE_DIR = "src/github.com/elastic/${env.REPO}" - JOB_GIT_CREDENTIALS = "f6c7695a-671e-4f4f-a331-acdce44ff9ba" - PIPELINE_LOG_LEVEL = 'INFO' - SNAPSHOT = true - JOB_GCS_CREDENTIALS = 'fleet-ci-gcs-plugin' // Support stash/unstash v2 - JOB_GCS_BUCKET = 'fleet-ci-temp' // Support stash/unstash v2 - JOB_GCS_EXT_BUCKET = 'fleet-ci-artifacts' // Support uploadPackagesToGoogleBucket - JOB_GCS_EXT_CREDENTIALS = 'fleet-ci-gcs-plugin-file-credentials' // Support uploadPackagesToGoogleBucket - DOCKER_ELASTIC_SECRET = 'secret/observability-team/ci/docker-registry/prod' - DOCKER_REGISTRY = 'docker.elastic.co' - DEVELOPER_MODE=true - } - options { - timeout(time: 3, unit: 'HOURS') - buildDiscarder(logRotator(numToKeepStr: '20', artifactNumToKeepStr: '20', daysToKeepStr: '30')) - timestamps() - ansiColor('xterm') - disableResume() - durabilityHint('PERFORMANCE_OPTIMIZED') - rateLimitBuilds(throttle: [count: 60, durationName: 'hour', userBoost: true]) - quietPeriod(10) - } - triggers { - issueCommentTrigger("(${obltGitHubComments()}|^run (integration|end-to-end) tests|/package)") - } - parameters { - // disabled by default, but required for merge, there are two GH checks: - // opt-in with 'ci:integration' - booleanParam(name: 'integration_tests_ci', defaultValue: false, description: 'Enable Integration tests') - - // disabled by default, but required for merge: - // opt-in with 'ci:end-to-end' tag on PR - booleanParam(name: 'end_to_end_tests_ci', defaultValue: false, description: 'Enable End-to-End tests') - - // disabled by default, but required for merge: - // opt-in with 'ci:extended-windows' tag on PR - booleanParam(name: 'extended_windows_ci', defaultValue: false, description: 'Enable Extended Windows tests') - - // disabled by default, but required for merge: - // opt-in with 'ci:extended-m1' tag on PR - booleanParam(name: 'extended_m1_ci', defaultValue: false, description: 'Enable M1 tests') - } - stages { - stage('Checkout') { - steps { - pipelineManager([ cancelPreviousRunningBuilds: [ when: 'PR' ] ]) - deleteDir() - gitCheckout(basedir: "${BASE_DIR}", githubNotifyFirstTimeContributor: true) - stashV2(name: 'source', bucket: "${JOB_GCS_BUCKET}", credentialsId: "${JOB_GCS_CREDENTIALS}") - dir("${BASE_DIR}"){ - setEnvVar('ONLY_DOCS', isGitRegionMatch(patterns: [ '.*\\.(asciidoc|md)' ], shouldMatchAll: true).toString()) - setEnvVar('PACKAGING_CHANGES', isGitRegionMatch(patterns: [ '(^dev-tools/packaging/.*|.ci/Jenkinsfile|.go-version|Dockerfile)' ], shouldMatchAll: false).toString()) - setEnvVar('K8S_CHANGES', isGitRegionMatch(patterns: [ '(^deploy/kubernetes/.*|^version/docs/version.asciidoc|.ci/Jenkinsfile)' ], shouldMatchAll: false).toString()) - setEnvVar('EXT_WINDOWS_CHANGES', isGitRegionMatch(patterns: [ '.ci/Jenkinsfile' ], shouldMatchAll: false).toString()) - setEnvVar('EXT_M1_CHANGES', isGitRegionMatch(patterns: [ '.ci/Jenkinsfile' ], shouldMatchAll: false).toString()) - // set the GO_VERSION env variable with the go version to be used in withMageEnv - setEnvVar('GO_VERSION', readFile(file: '.go-version')?.trim()) - } - } - } - stage('Check'){ - steps { - withGithubNotify(context: "Check") { - withMageEnv(){ - dir("${BASE_DIR}"){ - setEnvVar('BEAT_VERSION', sh(label: 'Get beat version', script: 'make get-version', returnStdout: true)?.trim()) - log(level: 'INFO', text: "env.BEAT_VERSION=${env.BEAT_VERSION}") - cmd(label: 'check', script: 'make check-ci') - } - } - } - } - } - stage('Test') { - when { - beforeAgent true - expression { return env.ONLY_DOCS == "false" } - } - failFast false - matrix { - agent {label "${PLATFORM}"} - options { skipDefaultCheckout() } - axes { - axis { - name 'PLATFORM' - // Orka workers are not healthy (memory and connectivity issues) - values 'ubuntu-22 && immutable', 'aws && aarch64 && gobld/diskSizeGb:200', 'windows-2016 && windows-immutable', 'windows-2022 && windows-immutable' //, 'macos12 && x86_64' - } - } - stages { - stage('Package') { - when { - beforeAgent true - allOf { - anyOf { - expression { return isE2eEnabled() } - expression { return isPackageEnabled() } - not { changeRequest() } - } - // Run packaging only for the linux specific arch - expression { return (PLATFORM.contains('ubuntu') || PLATFORM.contains('aarch64')) } - } - } - environment { - ARCH = "${PLATFORM.contains('aarch64') ? 'arm64' : 'amd64'}" - DEV = true - EXTERNAL = true - } - steps { - withGithubNotify(context: "Package ${PLATFORM}") { - deleteDir() - unstashV2(name: 'source', bucket: "${JOB_GCS_BUCKET}", credentialsId: "${JOB_GCS_CREDENTIALS}") - withMageEnv(){ - dir("${BASE_DIR}"){ - withPackageEnv("${PLATFORM}") { - cmd(label: 'Go package', script: 'mage package ironbank') - uploadPackagesToGoogleBucket( - credentialsId: env.JOB_GCS_EXT_CREDENTIALS, - repo: env.REPO, - bucket: env.JOB_GCS_EXT_BUCKET, - pattern: "build/distributions/**/*") - pushDockerImages( - registry: env.DOCKER_REGISTRY, - secret: env.DOCKER_ELASTIC_SECRET, - snapshot: env.SNAPSHOT, - version: env.BEAT_VERSION, - images: [ - [ source: "beats/elastic-agent", arch: env.ARCH, target: "observability-ci/elastic-agent"], - [ source: "beats/elastic-agent-oss", arch: env.ARCH, target: "observability-ci/elastic-agent-oss"], - [ source: "beats/elastic-agent-ubi9", arch: env.ARCH, target: "observability-ci/elastic-agent-ubi9"], - [ source: "beats/elastic-agent-complete", arch: env.ARCH, target: "observability-ci/elastic-agent-complete"], - [ source: "beats-ci/elastic-agent-cloud", arch: env.ARCH, target: "observability-ci/elastic-agent-cloud"] - ] - ) - } - } - } - } - } - } - } - } - } - stage('Sync K8s') { //This stage opens a PR to kibana Repository in order to sync k8s manifests - when { - // Only on main branch - // Enable if k8s related changes. - allOf { - branch 'main' // Only runs for branch main - expression { return env.K8S_CHANGES == "true" } // If k8s changes - } - } - failFast false - agent {label 'ubuntu-22 && immutable'} - options { skipDefaultCheckout() } - stages { - stage('OpenKibanaPR') { - steps { - withGhEnv(version: '2.4.0') { - deleteDir() - unstashV2(name: 'source', bucket: "${JOB_GCS_BUCKET}", credentialsId: "${JOB_GCS_CREDENTIALS}") - dir("${BASE_DIR}/deploy/kubernetes"){ - sh(label: '[File Creation] Create-Needed-Manifest', script: """ - WITHOUTCONFIG=true make generate-k8s - ./creator_k8s_manifest.sh . """) - sh(label: '[Clone] Kibana-Repository', script: """ - make ci-clone-kibana-repository - cp Makefile ./kibana - cd kibana - make ci-create-kubernetes-templates-pull-request """) - } - } - } - post { - always { - junit(allowEmptyResults: true, keepLongStdio: true, testResults: "${BASE_DIR}/build/TEST-*.xml") - } - } - } - } - } - } - post { - cleanup { - notifyBuildResult(prComment: true, - analyzeFlakey: !isTag(), jobName: getFlakyJobName(withBranch: (isPR() ? env.CHANGE_TARGET : env.BRANCH_NAME)), - githubIssue: false, // Disable creating gh issues for build failures while the E2E tests are stabilized. - githubLabels: 'Team:Elastic-Agent-Control-Plane') - } - } -} - -// As agreed let's report the code coverage for Linux but no ARM only. -def isCodeCoverageEnabled() { - return (isUnix() && !isArm()) -} - -def withPackageEnv(platform, Closure body) { - if (isUnix()) { - if (isDarwin()) { - withPackageDarwinEnv() { - body() - } - } else { - if (isArm()) { - withPackageArmEnv() { - body() - } - } else { - withPackageLinuxEnv() { - body() - } - } - } - } else { - error 'withPackageEnv: unsupported platform' - } -} - -def withPackageLinuxEnv(Closure body) { - // Copied from https://github.com/elastic/beats/blob/e6e65aa92fe355c95789691ebf5a3bcecaf5b4ea/.ci/packaging.groovy#L126-L142 - def PLATFORMS = [ '+all', - 'linux/amd64', - 'linux/386', - 'linux/arm64', - // armv7 packaging isn't working, and we don't currently - // need it for release. Do not re-enable it without - // confirming it is fixed, you will break the packaging - // pipeline! - //'linux/armv7', - // The platforms above are disabled temporarly as crossbuild images are - // not available. See: https://github.com/elastic/golang-crossbuild/issues/71 - //'linux/ppc64le', - //'linux/mips64', - //'linux/s390x', - 'windows/amd64', - 'windows/386' - ].join(' ') - withEnv([ - "PLATFORMS=${PLATFORMS}" - ]) { - body() - } -} - -def withPackageArmEnv(Closure body) { - // Copied from https://github.com/elastic/beats/blob/e6e65aa92fe355c95789691ebf5a3bcecaf5b4ea/.ci/packaging.groovy#L126-L142 - def PLATFORMS = [ 'linux/arm64' ].join(' ') - withEnv([ - "PLATFORMS=${PLATFORMS}", - "PACKAGES=docker" - ]) { - body() - } -} - -def withPackageDarwinEnv(Closure body) { - // Copied from https://github.com/elastic/beats/blob/e6e65aa92fe355c95789691ebf5a3bcecaf5b4ea/.ci/packaging.groovy#L126-L142 - def PLATFORMS = [ '+all', - 'darwin/amd64', - 'darwin/arm64', - ].join(' ') - withEnv([ - "PLATFORMS=${PLATFORMS}" - ]) { - body() - } -} - -def runK8s(Map args=[:]) { - withGithubNotify(context: args.context) { - withMageEnv(){ - withKindEnv(args) { - dir("${BASE_DIR}"){ - sh(label: "Deploy to kubernetes",script: "make -C deploy/kubernetes test") - } - } - } - } -} - -/** -* Wrapper to know if the build should enalbe the e2e stage -*/ -def isE2eEnabled() { - return params.end_to_end_tests_ci || env.GITHUB_COMMENT?.contains('e2e tests') || matchesPrLabel(label: 'ci:end-to-end') -} - -/** -* Wrapper to know if the build should enalbe the package stage -*/ -def isPackageEnabled() { - return env.PACKAGING_CHANGES == "true" || env.GITHUB_COMMENT?.contains('package') || matchesPrLabel(label: 'ci:package') -} - -/** -* Wrapper to know if the build should enable the windows extended support -*/ -def isExtendedWindowsEnabled() { - return env.EXT_WINDOWS_CHANGES == "true" || params.extended_windows_ci || env.GITHUB_COMMENT?.contains('extended windows') || matchesPrLabel(label: 'ci:extended-windows') -} - -/** -* Wrapper to know if the build should enable the M1 extended support -*/ -def isExtendedM1Enabled() { - return env.EXT_M1_CHANGES == "true" || params.extended_m1_ci || env.GITHUB_COMMENT?.contains('extended m1') || matchesPrLabel(label: 'ci:extended-m1') -} diff --git a/.ci/jobs/defaults.yml b/.ci/jobs/defaults.yml deleted file mode 100644 index a2bcaae57e1..00000000000 --- a/.ci/jobs/defaults.yml +++ /dev/null @@ -1,19 +0,0 @@ - ---- - -##### GLOBAL METADATA - -- meta: - cluster: fleet-ci - -##### JOB DEFAULTS - -- job: - logrotate: - numToKeep: 20 - node: linux - concurrent: true - publishers: - - email: - recipients: infra-root+build@elastic.co - prune-dead-branches: true diff --git a/.ci/jobs/elastic-agent-mbp.yml b/.ci/jobs/elastic-agent-mbp.yml deleted file mode 100644 index 8947d15880a..00000000000 --- a/.ci/jobs/elastic-agent-mbp.yml +++ /dev/null @@ -1,43 +0,0 @@ ---- -- job: - name: "elastic-agent/elastic-agent-mbp" - display-name: elastic-agent - description: "Elastic agent" - project-type: multibranch - script-path: .ci/Jenkinsfile - scm: - - github: - branch-discovery: no-pr - discover-pr-forks-strategy: merge-current - discover-pr-forks-trust: permission - discover-pr-origin: merge-current - discover-tags: true - head-filter-regex: '(main|7\.17|8\.\d+|PR-.*|v\d+\.\d+\.\d+)' - notification-context: 'fleet-ci' - repo: elastic-agent - repo-owner: elastic - credentials-id: 2a9602aa-ab9f-4e52-baf3-b71ca88469c7-UserAndToken - ssh-checkout: - credentials: f6c7695a-671e-4f4f-a331-acdce44ff9ba - build-strategies: - - tags: - ignore-tags-older-than: -1 - ignore-tags-newer-than: -1 - - regular-branches: true - - change-request: - ignore-target-only-changes: true - clean: - after: true - before: true - prune: true - shallow-clone: true - depth: 4 - do-not-fetch-tags: true - submodule: - disable: false - recursive: true - parent-credentials: true - timeout: 100 - timeout: '15' - use-author: true - wipe-workspace: true diff --git a/.ci/jobs/elastic-agent-schedule-daily.yml b/.ci/jobs/elastic-agent-schedule-daily.yml deleted file mode 100644 index 2fd728a9e8c..00000000000 --- a/.ci/jobs/elastic-agent-schedule-daily.yml +++ /dev/null @@ -1,26 +0,0 @@ ---- -- job: - name: "elastic-agent/elastic-agent-schedule-daily" - display-name: Jobs scheduled daily (weekdays) - description: Jobs scheduled daily (weekdays) - view: Beats - project-type: pipeline - parameters: - - string: - name: branch_specifier - default: main - description: the Git branch specifier to build - pipeline-scm: - script-path: .ci/schedule-daily.groovy - scm: - - git: - url: git@github.com:elastic/elastic-agent.git - refspec: +refs/heads/*:refs/remotes/origin/* - wipe-workspace: 'True' - name: origin - shallow-clone: true - credentials-id: f6c7695a-671e-4f4f-a331-acdce44ff9ba - branches: - - $branch_specifier - triggers: - - timed: 'H H(2-3) * * 1-5' diff --git a/.ci/jobs/folders.yml b/.ci/jobs/folders.yml deleted file mode 100644 index cf309e259a3..00000000000 --- a/.ci/jobs/folders.yml +++ /dev/null @@ -1,5 +0,0 @@ ---- -- job: - name: "elastic-agent" - description: "Elastic Agent" - project-type: folder diff --git a/.ci/schedule-daily.groovy b/.ci/schedule-daily.groovy deleted file mode 100644 index 70ad931ecda..00000000000 --- a/.ci/schedule-daily.groovy +++ /dev/null @@ -1,48 +0,0 @@ -@Library('apm@current') _ - -pipeline { - agent none - environment { - NOTIFY_TO = credentials('notify-to') - PIPELINE_LOG_LEVEL = 'INFO' - } - options { - timeout(time: 1, unit: 'HOURS') - buildDiscarder(logRotator(numToKeepStr: '20', artifactNumToKeepStr: '20')) - timestamps() - ansiColor('xterm') - disableResume() - durabilityHint('PERFORMANCE_OPTIMIZED') - } - triggers { - cron('H H(2-3) * * 1-5') - } - stages { - stage('Nighly beats builds') { - steps { - runBuilds(quietPeriodFactor: 2000, branches: ['main', '8.', '8.']) - } - } - } - post { - cleanup { - notifyBuildResult(prComment: false) - } - } -} - -def runBuilds(Map args = [:]) { - def branches = getBranchesFromAliases(aliases: args.branches) - def quietPeriod = 0 - branches.each { branch -> - build(quietPeriod: quietPeriod, - job: "elastic-agent/elastic-agent-mbp/${branch}", - parameters: [ - booleanParam(name: 'integration_tests_ci', value: true), - // Disable running e2e until we fix the 2e2 testing suite - booleanParam(name: 'end_to_end_tests_ci', value: false) - ], - wait: false, propagate: false) - quietPeriod += args.quietPeriodFactor - } -} diff --git a/.ci/scripts/install-go.bat b/.ci/scripts/install-go.bat deleted file mode 100755 index 29448bd4f63..00000000000 --- a/.ci/scripts/install-go.bat +++ /dev/null @@ -1,57 +0,0 @@ -set GOPATH=%WORKSPACE% -set MAGEFILE_CACHE=%WORKSPACE%\.magefile - -set PATH=%WORKSPACE%\bin;C:\ProgramData\chocolatey\bin;%PATH% - -curl --version >nul 2>&1 && ( - echo found curl -) || ( - choco install curl -y --no-progress --skipdownloadcache -) - -mkdir %WORKSPACE%\bin - -IF EXIST "%PROGRAMFILES(X86)%" ( - REM Force the gvm installation. - SET GVM_BIN=gvm.exe - curl -L -o %WORKSPACE%\bin\gvm.exe https://github.com/andrewkroh/gvm/releases/download/v0.3.0/gvm-windows-amd64.exe - IF ERRORLEVEL 1 ( - REM gvm installation has failed. - del bin\gvm.exe /s /f /q - exit /b 1 - ) -) ELSE ( - REM Windows 7 workers got a broken gvm installation. - curl -L -o %WORKSPACE%\bin\gvm.exe https://github.com/andrewkroh/gvm/releases/download/v0.3.0/gvm-windows-386.exe - IF ERRORLEVEL 1 ( - REM gvm installation has failed. - del bin\gvm.exe /s /f /q - exit /b 1 - ) -) - -SET GVM_BIN=gvm.exe -WHERE /q %GVM_BIN% -%GVM_BIN% version - -REM Install the given go version -%GVM_BIN% --debug install %GO_VERSION% - -REM Configure the given go version -FOR /f "tokens=*" %%i IN ('"%GVM_BIN%" use %GO_VERSION% --format=batch') DO %%i - -go env -IF ERRORLEVEL 1 ( - REM go is not configured correctly. - rmdir %WORKSPACE%\.gvm /s /q - exit /b 1 -) - -where mage -mage -version -IF ERRORLEVEL 1 ( - go get github.com/magefile/mage - IF ERRORLEVEL 1 ( - exit /b 1 - ) -) diff --git a/.ci/scripts/install-go.sh b/.ci/scripts/install-go.sh deleted file mode 100755 index 31566c08726..00000000000 --- a/.ci/scripts/install-go.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env bash -set -exuo pipefail - -MSG="environment variable missing" -GO_VERSION=${GO_VERSION:?$MSG} -PROPERTIES_FILE=${PROPERTIES_FILE:-"go_env.properties"} -HOME=${HOME:?$MSG} -OS=$(uname -s| tr '[:upper:]' '[:lower:]') -ARCH=$(uname -m| tr '[:upper:]' '[:lower:]') -GVM_CMD="${HOME}/bin/gvm" - -if command -v go -then - set +e - echo "Found Go. Checking version.." - FOUND_GO_VERSION=$(go version|awk '{print $3}'|sed s/go//) - if [ "$FOUND_GO_VERSION" == "$GO_VERSION" ] - then - echo "Versions match. No need to install Go. Exiting." - exit 0 - fi - set -e -fi - -if [ "${ARCH}" == "aarch64" ] ; then - GVM_ARCH_SUFFIX=arm64 -elif [ "${ARCH}" == "x86_64" ] ; then - GVM_ARCH_SUFFIX=amd64 -elif [ "${ARCH}" == "i686" ] ; then - GVM_ARCH_SUFFIX=386 -else - GVM_ARCH_SUFFIX=arm -fi - -echo "UNMET DEP: Installing Go" -mkdir -p "${HOME}/bin" - -curl -sSLo "${GVM_CMD}" "https://github.com/andrewkroh/gvm/releases/download/v0.3.0/gvm-${OS}-${GVM_ARCH_SUFFIX}" -chmod +x "${GVM_CMD}" - -${GVM_CMD} "${GO_VERSION}" |cut -d ' ' -f 2|tr -d '\"' > ${PROPERTIES_FILE} - -eval "$("${GVM_CMD}" "${GO_VERSION}")" diff --git a/.ci/bump-golang.yml b/.github/updatecli-bump-golang.yml similarity index 98% rename from .ci/bump-golang.yml rename to .github/updatecli-bump-golang.yml index 470c6f4c8d5..df5d8104048 100644 --- a/.ci/bump-golang.yml +++ b/.github/updatecli-bump-golang.yml @@ -1,3 +1,4 @@ +# update-cli configuration for automated go updates --- name: Bump golang-version to latest version diff --git a/.github/workflows/bump-golang.yml b/.github/workflows/bump-golang.yml index 30b8fbd89b3..34d488b9d72 100644 --- a/.github/workflows/bump-golang.yml +++ b/.github/workflows/bump-golang.yml @@ -4,7 +4,7 @@ name: bump-golang on: workflow_dispatch: schedule: - - cron: '0 20 * * 6' + - cron: "0 20 * * 6" permissions: contents: read @@ -13,7 +13,6 @@ jobs: bump: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - uses: elastic/apm-pipeline-library/.github/actions/updatecli@current @@ -21,4 +20,4 @@ jobs: vaultUrl: ${{ secrets.VAULT_ADDR }} vaultRoleId: ${{ secrets.VAULT_ROLE_ID }} vaultSecretId: ${{ secrets.VAULT_SECRET_ID }} - pipeline: ./.ci/bump-golang.yml + pipeline: ./.github/updatecli-bump-golang.yml diff --git a/deploy/kubernetes/Makefile b/deploy/kubernetes/Makefile index 6247c9461e1..d02fc6eeb38 100644 --- a/deploy/kubernetes/Makefile +++ b/deploy/kubernetes/Makefile @@ -81,7 +81,7 @@ else echo "INFO: create pull request" @gh pr create \ --title "Update kubernetes templates for elastic-agent" \ - --body "Automated by ${BUILD_URL}" \ + --body "Automated by ${BUILDKITE_BUILD_URL}" \ --label automation \ --label release_note:skip \ --base main \ diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index c33cc700729..1129ef96e32 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -100,10 +100,7 @@ type RuntimeManager interface { Runner // Update updates the current components model. - Update(model component.Model) error - - // State returns the current components model state. - State() []runtime.ComponentComponentState + Update(model component.Model) // PerformAction executes an action on a unit. PerformAction(ctx context.Context, comp component.Component, unit component.Unit, name string, params map[string]interface{}) (map[string]interface{}, error) @@ -241,17 +238,19 @@ type Coordinator struct { // into the reported state before broadcasting -- State() will report // agentclient.Failed if one of these is set, even if the underlying // coordinator state is agentclient.Healthy. - runtimeMgrErr error // Currently unused - configMgrErr error - actionsErr error - varsMgrErr error + // Errors from the runtime manager report policy update failures and are + // stored in runtimeUpdateErr below. + configMgrErr error + actionsErr error + varsMgrErr error // Errors resulting from different possible failure modes when setting a // new policy. Right now there are three different stages where a policy // update can fail: // - in generateAST, converting the policy to an AST // - in process, converting the AST and vars into a full component model - // - while sending the final component model to the runtime manager + // - while applying the final component model in the runtime manager + // (reported asynchronously via the runtime manager error channel) // // The plan is to improve our preprocessing so we can always detect // failures immediately https://github.com/elastic/elastic-agent/issues/2887. @@ -920,7 +919,13 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { return case runtimeErr := <-c.managerChans.runtimeManagerError: - c.setRuntimeManagerError(runtimeErr) + // runtime manager errors report the result of a policy update. + // Coordinator transitions from starting to healthy when a policy update + // is successful. + c.setRuntimeUpdateError(runtimeErr) + if runtimeErr == nil { + c.setCoordinatorState(agentclient.Healthy, "Running") + } case configErr := <-c.managerChans.configManagerError: if c.isManaged { @@ -1153,12 +1158,7 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") - err = c.runtimeMgr.Update(model) - c.setRuntimeUpdateError(err) - if err != nil { - return fmt.Errorf("updating runtime: %w", err) - } - c.setCoordinatorState(agentclient.Healthy, "Running") + c.runtimeMgr.Update(model) return nil } diff --git a/internal/pkg/agent/application/coordinator/coordinator_state.go b/internal/pkg/agent/application/coordinator/coordinator_state.go index 6e645c3a06b..22394ee52e0 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_state.go +++ b/internal/pkg/agent/application/coordinator/coordinator_state.go @@ -63,10 +63,10 @@ func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) { c.upgradeDetailsChan <- upgradeDetails } -// setRuntimeManagerError updates the error state for the runtime manager. +// setRuntimeUpdateError reports a failed policy update in the runtime manager. // Called on the main Coordinator goroutine. -func (c *Coordinator) setRuntimeManagerError(err error) { - c.runtimeMgrErr = err +func (c *Coordinator) setRuntimeUpdateError(err error) { + c.runtimeUpdateErr = err c.stateNeedsRefresh = true } @@ -107,14 +107,6 @@ func (c *Coordinator) setComponentGenError(err error) { c.stateNeedsRefresh = true } -// setRuntimeUpdateError updates the error state for sending a component model -// update to the runtime manager. -// Called on the main Coordinator goroutine. -func (c *Coordinator) setRuntimeUpdateError(err error) { - c.runtimeUpdateErr = err - c.stateNeedsRefresh = true -} - // setOverrideState is the internal helper to set the override state and // set stateNeedsRefresh. // Must be called on the main Coordinator goroutine. @@ -201,9 +193,6 @@ func (c *Coordinator) generateReportableState() (s State) { } else if c.runtimeUpdateErr != nil { s.State = agentclient.Failed s.Message = fmt.Sprintf("Runtime update failed: %s", c.runtimeUpdateErr.Error()) - } else if c.runtimeMgrErr != nil { - s.State = agentclient.Failed - s.Message = fmt.Sprintf("Runtime manager: %s", c.runtimeMgrErr.Error()) } else if c.configMgrErr != nil { s.State = agentclient.Failed s.Message = fmt.Sprintf("Config manager: %s", c.configMgrErr.Error()) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 676906bbde0..49770bc1e97 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -787,6 +787,8 @@ func (f *fakeVarsManager) Vars(ctx context.Context, vars []*transpiler.Vars) { type fakeRuntimeManager struct { state []runtime.ComponentComponentState updateCallback func([]component.Component) error + result error + errChan chan error } func (r *fakeRuntimeManager) Run(ctx context.Context) error { @@ -796,11 +798,15 @@ func (r *fakeRuntimeManager) Run(ctx context.Context) error { func (r *fakeRuntimeManager) Errors() <-chan error { return nil } -func (r *fakeRuntimeManager) Update(model component.Model) error { +func (r *fakeRuntimeManager) Update(model component.Model) { + r.result = nil if r.updateCallback != nil { - return r.updateCallback(model.Components) + r.result = r.updateCallback(model.Components) + } + if r.errChan != nil { + // If a reporting channel is set, send the result to it + r.errChan <- r.result } - return nil } // State returns the current components model state. diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 956eb33b8c0..89869e48d5e 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -754,6 +754,7 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { logger := logp.NewLogger("testing") configChan := make(chan ConfigChange, 1) + updateErrChan := make(chan error, 1) const errorStr = "update failed for testing reasons" // Create a mocked runtime manager that always reports an error @@ -761,6 +762,7 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { updateCallback: func(comp []component.Component) error { return fmt.Errorf(errorStr) }, + errChan: updateErrChan, } coord := &Coordinator{ @@ -769,6 +771,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { stateBroadcaster: broadcaster.New(State{}, 0, 0), managerChans: managerChans{ configManagerUpdate: configChan, + // Give coordinator the same error channel we set on the runtime + // manager, so it receives the update result. + runtimeManagerError: updateErrChan, }, runtimeMgr: runtimeManager, vars: emptyVars(t), @@ -781,16 +786,19 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { configChan <- configChange coord.runLoopIteration(ctx) - // Make sure the failure was reported to the config manager - assert.True(t, configChange.failed, "Config change should report failure if the runtime manager returns an error") - require.Error(t, configChange.err, "Config change should get an error if runtime manager update fails") - assert.Contains(t, configChange.err.Error(), errorStr) + // Make sure the config change was acknowledged to the config manager + // (the failure is not reported here since it happens asynchronously; it + // will appear in the coordinator state afterwards.) + assert.True(t, configChange.acked, "Config change should be acknowledged to the config manager") + assert.NoError(t, configChange.err, "Config change with async error should succeed") - // Make sure the error is saved in Coordinator.runtimeUpdateErr + // Now do another run loop iteration to let the update error propagate, + // and make sure it is reported correctly. + coord.runLoopIteration(ctx) require.Error(t, coord.runtimeUpdateErr, "Runtime update failure should be saved in runtimeUpdateErr") assert.Equal(t, errorStr, coord.runtimeUpdateErr.Error(), "runtimeUpdateErr should match the error reported by the runtime manager") - // Make sure the error is reported in Coordinator state. + // Make sure the error appears in the Coordinator state. state := coord.State() assert.Equal(t, agentclient.Failed, state.State, "Failed policy update should cause failed Coordinator") assert.Contains(t, state.Message, errorStr, "Failed policy update should be reported in Coordinator state message") diff --git a/internal/pkg/agent/application/dispatcher/dispatcher.go b/internal/pkg/agent/application/dispatcher/dispatcher.go index eda450d6a0b..63d00aeeea3 100644 --- a/internal/pkg/agent/application/dispatcher/dispatcher.go +++ b/internal/pkg/agent/application/dispatcher/dispatcher.go @@ -321,7 +321,7 @@ func reportNextScheduledUpgrade(input []fleetapi.Action, detailsSetter details.O upgradeDetails := details.NewDetails(nextUpgrade.Version, details.StateScheduled, nextUpgrade.ID()) startTime, _ := nextUpgrade.StartTime() - upgradeDetails.Metadata.ScheduledAt = startTime + upgradeDetails.Metadata.ScheduledAt = &startTime detailsSetter(upgradeDetails) } diff --git a/internal/pkg/agent/application/dispatcher/dispatcher_test.go b/internal/pkg/agent/application/dispatcher/dispatcher_test.go index 212c50a3068..bf69c76ce74 100644 --- a/internal/pkg/agent/application/dispatcher/dispatcher_test.go +++ b/internal/pkg/agent/application/dispatcher/dispatcher_test.go @@ -493,6 +493,7 @@ func Test_ActionDispatcher_scheduleRetry(t *testing.T) { func TestReportNextScheduledUpgrade(t *testing.T) { now := time.Now().UTC() later := now.Add(3 * time.Hour) + laterTruncate := later.Truncate(time.Second) muchLater := later.Add(3 * time.Hour) cases := map[string]struct { @@ -522,7 +523,7 @@ func TestReportNextScheduledUpgrade(t *testing.T) { State: details.StateScheduled, ActionID: "action2", Metadata: details.Metadata{ - ScheduledAt: later.Truncate(time.Second), + ScheduledAt: &laterTruncate, }, }, }, @@ -544,7 +545,7 @@ func TestReportNextScheduledUpgrade(t *testing.T) { State: details.StateScheduled, ActionID: "action4", Metadata: details.Metadata{ - ScheduledAt: later.Truncate(time.Second), + ScheduledAt: &laterTruncate, }, }, }, diff --git a/internal/pkg/agent/application/upgrade/details/details.go b/internal/pkg/agent/application/upgrade/details/details.go index dcc990d33cd..dae27923ecf 100644 --- a/internal/pkg/agent/application/upgrade/details/details.go +++ b/internal/pkg/agent/application/upgrade/details/details.go @@ -167,13 +167,24 @@ func (d *Details) notifyObserver(observer Observer) { } func (m Metadata) Equals(otherM Metadata) bool { - return m.ScheduledAt.Equal(otherM.ScheduledAt) && + return equalTimePointers(m.ScheduledAt, otherM.ScheduledAt) && m.FailedState == otherM.FailedState && m.ErrorMsg == otherM.ErrorMsg && m.DownloadPercent == otherM.DownloadPercent && m.DownloadRate == otherM.DownloadRate } +func equalTimePointers(t, otherT *time.Time) bool { + if t == otherT { + return true + } + if t == nil || otherT == nil { + return false + } + + return t.Equal(*otherT) +} + func (dr *downloadRate) MarshalJSON() ([]byte, error) { downloadRateBytesPerSecond := float64(*dr) if math.IsInf(downloadRateBytesPerSecond, 0) { diff --git a/magefile.go b/magefile.go index 6f29cd68991..fe320754b91 100644 --- a/magefile.go +++ b/magefile.go @@ -1567,6 +1567,33 @@ func (Integration) PrepareOnRemote() { mg.Deps(mage.InstallGoTestTools) } +// Run beat serverless tests +func (Integration) TestBeatServerless(ctx context.Context, beatname string) error { + beatBuildPath := filepath.Join("..", "beats", "x-pack", beatname, "build", "distributions") + if os.Getenv("AGENT_BUILD_DIR") == "" { + err := os.Setenv("AGENT_BUILD_DIR", beatBuildPath) + if err != nil { + return fmt.Errorf("error setting build dir: %s", err) + } + } + + // a bit of bypass logic; run as serverless by default + if os.Getenv("STACK_PROVISIONER") == "" { + err := os.Setenv("STACK_PROVISIONER", "serverless") + if err != nil { + return fmt.Errorf("error setting serverless stack var: %w", err) + } + } else if os.Getenv("STACK_PROVISIONER") == "stateful" { + fmt.Printf(">>> Warning: running TestBeatServerless as stateful\n") + } + + err := os.Setenv("TEST_BINARY_NAME", beatname) + if err != nil { + return fmt.Errorf("error setting binary name: %w", err) + } + return integRunner(ctx, false, "TestBeatsServerless") +} + // TestOnRemote shouldn't be called locally (called on remote host to perform testing) func (Integration) TestOnRemote(ctx context.Context) error { mg.Deps(Build.TestBinaries) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 8462ac3c17e..850349406d6 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -93,22 +93,36 @@ type Manager struct { baseLogger *logger.Logger ca *authority.CertificateAuthority listenAddr string + listenPort int agentInfo *info.AgentInfo tracer *apm.Tracer monitor MonitoringManager grpcConfig *configuration.GRPCConfig - // netMx synchronizes the access to listener and server only - netMx sync.RWMutex - listener net.Listener - server *grpc.Server - // Set when the RPC server is ready to receive requests, for use by tests. serverReady *atomic.Bool - // updateMx protects the call to update to ensure that - // only one call to update occurs at a time - updateMx sync.Mutex + // updateChan forwards component model updates from the public Update method + // to the internal run loop. + updateChan chan component.Model + + // Component model update is run asynchronously and pings this channel when + // finished, so the runtime manager loop knows it's safe to advance to the + // next update without ever having to block on the result. + updateDoneChan chan struct{} + + // Next component model update that will be applied, in case we get one + // while a previous update is still in progress. If we get more than one, + // keep only the most recent. + // Only access from the main runtime manager goroutine. + nextUpdate *component.Model + + // Whether we're already waiting on the results of an update call. + // If this is true when the run loop finishes, we need to wait for the + // final update result before shutting down, otherwise the shutdown's + // update call will conflict. + // Only access from the main runtime manager goroutine. + updateInProgress bool // currentMx protects access to the current map only currentMx sync.RWMutex @@ -123,10 +137,9 @@ type Manager struct { errCh chan error - // upon creation the Manager is neither running not shutting down, thus both - // flags are needed. - running atomic.Bool - shuttingDown atomic.Bool + // doneChan is closed when Manager is shutting down to signal that any + // pending requests should be canceled. + doneChan chan struct{} } // NewManager creates a new manager. @@ -144,19 +157,22 @@ func NewManager( return nil, err } m := &Manager{ - logger: logger, - baseLogger: baseLogger, - ca: ca, - listenAddr: listenAddr, - agentInfo: agentInfo, - tracer: tracer, - current: make(map[string]*componentRuntimeState), - shipperConns: make(map[string]*shipperConn), - subscriptions: make(map[string][]*Subscription), - errCh: make(chan error), - monitor: monitor, - grpcConfig: grpcConfig, - serverReady: atomic.NewBool(false), + logger: logger, + baseLogger: baseLogger, + ca: ca, + listenAddr: listenAddr, + agentInfo: agentInfo, + tracer: tracer, + current: make(map[string]*componentRuntimeState), + shipperConns: make(map[string]*shipperConn), + subscriptions: make(map[string][]*Subscription), + updateChan: make(chan component.Model), + updateDoneChan: make(chan struct{}), + errCh: make(chan error), + monitor: monitor, + grpcConfig: grpcConfig, + serverReady: atomic.NewBool(false), + doneChan: make(chan struct{}), } return m, nil } @@ -169,16 +185,11 @@ func NewManager( // // Blocks until the context is done. func (m *Manager) Run(ctx context.Context) error { - m.running.Store(true) - m.shuttingDown.Store(false) - - lis, err := net.Listen("tcp", m.listenAddr) + listener, err := net.Listen("tcp", m.listenAddr) if err != nil { return fmt.Errorf("error starting tcp listener for runtime manager: %w", err) } - m.netMx.Lock() - m.listener = lis - m.netMx.Unlock() + m.listenPort = listener.Addr().(*net.TCPAddr).Port certPool := x509.NewCertPool() if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok { @@ -205,87 +216,123 @@ func (m *Manager) Run(ctx context.Context) error { grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), ) } - m.netMx.Lock() - m.server = server - m.netMx.Unlock() - proto.RegisterElasticAgentServer(m.server, m) + proto.RegisterElasticAgentServer(server, m) // start serving GRPC connections - var wg sync.WaitGroup - wg.Add(1) + var wgServer sync.WaitGroup + wgServer.Add(1) go func() { - defer wg.Done() - m.serverReady.Store(true) - for { - err := server.Serve(lis) - if err != nil { - m.logger.Errorf("control protocol failed: %s", err) - } - if ctx.Err() != nil { - // context has an error don't start again - return - } - } + defer wgServer.Done() + go m.serverLoop(ctx, listener, server) }() - <-ctx.Done() - m.running.Store(false) - m.shuttingDown.Store(true) + // Start the run loop, which continues on the main goroutine + // until the context is canceled. + m.runLoop(ctx) + + // Notify components to shutdown and wait for their response m.shutdown() + // Close the rpc listener and wait for serverLoop to return + listener.Close() + wgServer.Wait() + + // Cancel any remaining connections server.Stop() - wg.Wait() - m.netMx.Lock() - m.listener = nil - m.server = nil - m.netMx.Unlock() return ctx.Err() } +// The main run loop for the runtime manager, whose responsibilities are: +// - Accept component model updates from the Coordinator +// - Apply those updates safely without ever blocking, because a block here +// propagates to a block in the Coordinator +// - Close doneChan when the loop ends, so the Coordinator knows not to send +// any more updates +func (m *Manager) runLoop(ctx context.Context) { +LOOP: + for ctx.Err() == nil { + select { + case <-ctx.Done(): + break LOOP + case model := <-m.updateChan: + // We got a new component model from m.Update(), mark it as the + // next update to apply, overwriting any previous pending value. + m.nextUpdate = &model + case <-m.updateDoneChan: + // An update call has finished, we can initiate another when available. + m.updateInProgress = false + } + + // After each select call, check if there's a pending update that + // can be applied. + if m.nextUpdate != nil && !m.updateInProgress { + // There is a component model update available, apply it. + go func(model component.Model) { + // Run the update with tearDown set to true since this is coming + // from a user-initiated policy update + err := m.update(model, true) + + // When update is done, send its result back to the coordinator, + // unless we're shutting down. + select { + case m.errCh <- err: + case <-ctx.Done(): + } + // Signal the runtime manager that we're finished. Note that + // we don't select on ctx.Done() in this case because the runtime + // manager always reads the results of an update once initiated, + // even if it is shutting down. + m.updateDoneChan <- struct{}{} + }(*m.nextUpdate) + m.updateInProgress = true + m.nextUpdate = nil + } + } + // Signal that the run loop is ended to unblock any incoming messages. + // We need to do this before waiting on the final update result, otherwise + // it might be stuck trying to send the result to errCh. + close(m.doneChan) + + if m.updateInProgress { + // Wait for the existing update to finish before shutting down, + // otherwise the new update call closing everything will + // conflict. + <-m.updateDoneChan + m.updateInProgress = false + } +} + +func (m *Manager) serverLoop(ctx context.Context, listener net.Listener, server *grpc.Server) { + m.serverReady.Store(true) + for ctx.Err() == nil { + err := server.Serve(listener) + if err != nil && ctx.Err() == nil { + // Only log an error if we aren't shutting down, otherwise we'll spam + // the logs with "use of closed network connection" for a connection that + // was closed on purpose. + m.logger.Errorf("control protocol listener failed: %s", err) + } + } +} + // Errors returns channel that errors are reported on. func (m *Manager) Errors() <-chan error { return m.errCh } -// Update updates the currComp state of the running components. +// Update forwards a new component model to Manager's run loop. +// When it has been processed, a result will be sent on Manager's +// error channel. // Called from the main Coordinator goroutine. // -// This returns as soon as possible, the work is performed in the background. -func (m *Manager) Update(model component.Model) error { - shuttingDown := m.shuttingDown.Load() - if shuttingDown { - // ignore any updates once shutdown started - return nil - } - // teardown is true because the public `Update` method would be coming directly from - // policy so if a component was removed it needs to be torn down. - return m.update(model, true) -} - -// State returns the current component states. -func (m *Manager) State() []ComponentComponentState { - m.currentMx.RLock() - defer m.currentMx.RUnlock() - states := make([]ComponentComponentState, 0, len(m.current)) - for _, crs := range m.current { - var legacyPID string - if crs.runtime != nil { - if commandRuntime, ok := crs.runtime.(*commandRuntime); ok { - if commandRuntime != nil { - procInfo := commandRuntime.proc - if procInfo != nil { - legacyPID = fmt.Sprint(commandRuntime.proc.PID) - } - } - } - } - states = append(states, ComponentComponentState{ - Component: crs.getCurrent(), - State: crs.getLatest(), - LegacyPID: legacyPID, - }) +// If calling from a test, you should read from errCh afterwards to avoid +// blocking Manager's main loop. +func (m *Manager) Update(model component.Model) { + select { + case m.updateChan <- model: + case <-m.doneChan: + // Manager is shutting down, ignore the update } - return states } // PerformAction executes an action on a unit. @@ -658,13 +705,10 @@ func (m *Manager) Actions(server proto.ElasticAgent_ActionsServer) error { } // update updates the current state of the running components. +// It is only called by the main runtime manager goroutine in Manager.Run. // // This returns as soon as possible, work is performed in the background. func (m *Manager) update(model component.Model, teardown bool) error { - // ensure that only one `update` can occur at the same time - m.updateMx.Lock() - defer m.updateMx.Unlock() - // prepare the components to add consistent shipper connection information between // the connected components in the model err := m.connectShippers(model.Components) @@ -891,13 +935,7 @@ func (m *Manager) getRuntimeFromComponent(comp component.Component) *componentRu func (m *Manager) getListenAddr() string { addr := strings.SplitN(m.listenAddr, ":", 2) if len(addr) == 2 && addr[1] == "0" { - m.netMx.RLock() - lis := m.listener - m.netMx.RUnlock() - if lis != nil { - port := lis.Addr().(*net.TCPAddr).Port - return fmt.Sprintf("%s:%d", addr[0], port) - } + return fmt.Sprintf("%s:%d", addr[0], m.listenPort) } return m.listenAddr } diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index bb83de6fd04..51bb941bca6 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -145,7 +145,8 @@ func TestManager_SimpleComponentErr(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -237,7 +238,8 @@ func TestManager_FakeInput_StartStop(t *testing.T) { subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) } else if unit.State == client.UnitStateHealthy { // remove the component which will stop it - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -260,7 +262,8 @@ func TestManager_FakeInput_StartStop(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -384,7 +387,8 @@ func TestManager_FakeInput_Features(t *testing.T) { Fqdn: &proto.FQDNFeature{Enabled: true}, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -439,7 +443,8 @@ func TestManager_FakeInput_Features(t *testing.T) { "message": "Fake Healthy", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { t.Logf("error updating component state to health: %v", err) @@ -459,7 +464,8 @@ func TestManager_FakeInput_Features(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -609,7 +615,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: initialAPMConfig, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -653,7 +660,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: modifiedAPMConfig, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -692,7 +700,8 @@ func TestManager_FakeInput_APM(t *testing.T) { comp.Component = &proto.Component{ ApmConfig: nil, } - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthIteration, err) @@ -741,7 +750,8 @@ func TestManager_FakeInput_APM(t *testing.T) { "message": "Fake Healthy", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { t.Logf("error updating component state to health: %v", err) @@ -761,7 +771,8 @@ func TestManager_FakeInput_APM(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -902,9 +913,10 @@ func TestManager_FakeInput_Limits(t *testing.T) { GoMaxProcs: 101, }, } - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -917,9 +929,10 @@ func TestManager_FakeInput_Limits(t *testing.T) { assert.Equal(t, uint64(101), componentState.Component.Limits.GoMaxProcs) comp.Component = nil - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -945,7 +958,8 @@ func TestManager_FakeInput_Limits(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -1063,9 +1077,10 @@ func TestManager_FakeShipper_Limits(t *testing.T) { GoMaxProcs: 101, }, } - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -1078,9 +1093,10 @@ func TestManager_FakeShipper_Limits(t *testing.T) { assert.Equal(t, uint64(101), componentState.Component.Limits.GoMaxProcs) comp.Component = nil - err := m.Update(component.Model{ + m.Update(component.Model{ Components: []component.Component{comp}, }) + err := <-m.errCh if err != nil { subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", healthyIteration, err) @@ -1106,7 +1122,8 @@ func TestManager_FakeShipper_Limits(t *testing.T) { defer drainErrChan(managerErrCh) defer drainErrChan(subscriptionErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) timeout := 30 * time.Second @@ -1222,7 +1239,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { } unitBad = false - err := m.Update(component.Model{Components: []component.Component{updatedComp}}) + m.Update(component.Model{Components: []component.Component{updatedComp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1250,7 +1268,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { } } else if unit.State == client.UnitStateHealthy { // bad unit is now healthy; stop the component - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1274,7 +1293,8 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1365,7 +1385,8 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { endTimer := time.NewTimer(30 * time.Second) defer endTimer.Stop() - err = m.Update(component.Model{Components: []component.Component{healthyComp}}) + m.Update(component.Model{Components: []component.Component{healthyComp}}) + err = <-m.errCh require.NoError(t, err) // nextState tracks the stage of the test. We expect the sequence @@ -1395,7 +1416,8 @@ LOOP: if unit.State == client.UnitStateHealthy { // good unit is healthy; now make it bad t.Logf("marking good-input as having a hard-error for config") - err := m.Update(component.Model{Components: []component.Component{unhealthyComp}}) + m.Update(component.Model{Components: []component.Component{unhealthyComp}}) + err := <-m.errCh require.NoError(t, err, "Component model update should succeed") // We next expect to transition to Failed @@ -1414,7 +1436,8 @@ LOOP: if unit.State == client.UnitStateFailed { // Reached the expected state, now send an empty component model // to stop everything. - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh require.NoError(t, err, "Component model update should succeed") nextState = client.UnitStateStopped } else { @@ -1445,12 +1468,10 @@ LOOP: } func TestManager_FakeInput_NoDeadlock(t *testing.T) { - /* - NOTE: This is a long-running test that spams the runtime managers `Update` function to try and - trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: + // NOTE: This is a long-running test that spams the runtime managers `Update` function to try and + // trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: + // https://github.com/elastic/elastic-agent/issues/2691 - https://github.com/elastic/elastic-agent/issues/2691 - */ testPaths(t) ctx, cancel := context.WithCancel(context.Background()) @@ -1523,7 +1544,8 @@ func TestManager_FakeInput_NoDeadlock(t *testing.T) { } i += 1 comp = updatedComp - err := m.Update(component.Model{Components: []component.Component{updatedComp}}) + m.Update(component.Model{Components: []component.Component{updatedComp}}) + err := <-m.errCh if err != nil { updatedErr <- err return @@ -1564,7 +1586,8 @@ LOOP: case <-endTimer.C: // no deadlock after timeout (all good stop the component) updatedCancel() - _ = m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + <-m.errCh // Don't care about the result of Update, just that it runs break LOOP case err := <-errCh: require.NoError(t, err) @@ -1656,7 +1679,8 @@ func TestManager_FakeInput_Configure(t *testing.T) { "state": int(client.UnitStateDegraded), "message": "Fake Degraded", }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -1679,7 +1703,8 @@ func TestManager_FakeInput_Configure(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1797,7 +1822,8 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { } else if unit1.State == client.UnitStateHealthy { // unit1 is healthy lets remove it from the component comp.Units = comp.Units[0:1] - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh if err != nil { subErrCh <- err } @@ -1832,7 +1858,8 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -1956,7 +1983,8 @@ func TestManager_FakeInput_ActionState(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2091,7 +2119,8 @@ func TestManager_FakeInput_Restarts(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2208,7 +2237,8 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { "message": "Fake Healthy", "kill": rp[1], }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -2233,7 +2263,8 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(1 * time.Minute) @@ -2347,7 +2378,8 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { "message": fmt.Sprintf("Fake Healthy %d", lastStoppedCount), "kill_on_interval": true, }) - err := m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -2375,7 +2407,8 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(1 * time.Minute) @@ -2493,7 +2526,8 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2610,7 +2644,8 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -2807,7 +2842,8 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) { defer drainErrChan(subErrCh1) defer drainErrChan(subErrCh2) - err = m.Update(component.Model{Components: components}) + m.Update(component.Model{Components: components}) + err = <-m.errCh require.NoError(t, err) count := 0 @@ -2963,7 +2999,8 @@ func TestManager_FakeInput_LogLevel(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: []component.Component{comp}}) + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh require.NoError(t, err) endTimer := time.NewTimer(30 * time.Second) @@ -3177,7 +3214,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err = <-m.errCh if err != nil { subErrCh <- err } @@ -3206,7 +3244,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -3241,7 +3280,8 @@ func TestManager_FakeShipper(t *testing.T) { subErrCh <- err } else { // successful; turn it all off - err := m.Update(component.Model{Components: []component.Component{}}) + m.Update(component.Model{Components: []component.Component{}}) + err := <-m.errCh if err != nil { subErrCh <- err } @@ -3266,7 +3306,8 @@ func TestManager_FakeShipper(t *testing.T) { defer drainErrChan(errCh) defer drainErrChan(subErrCh) - err = m.Update(component.Model{Components: comps}) + m.Update(component.Model{Components: comps}) + err = <-m.errCh require.NoError(t, err) timeout := 2 * time.Minute @@ -3458,11 +3499,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { stateProgressionWG.Done() }() - // Wait manager start running, then check if any error happened - assert.Eventually(t, - func() bool { return m.running.Load() }, - 500*time.Millisecond, - 10*time.Millisecond) + err = waitForReady(waitCtx, m) + require.NoError(t, err, "Manager must finish initializing") select { case err := <-errCh: @@ -3471,7 +3509,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { } time.Sleep(100 * time.Millisecond) - err = m.Update(component.Model{Components: components}) + m.Update(component.Model{Components: components}) + err = <-m.errCh require.NoError(t, err) updateSleep := 300 * time.Millisecond @@ -3480,7 +3519,8 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { updateSleep = time.Second } time.Sleep(updateSleep) - err = m.Update(component.Model{Components: components2}) + m.Update(component.Model{Components: components2}) + err = <-m.errCh require.NoError(t, err) count := 0 diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 304e917d7ee..1c85ed788f3 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -212,7 +212,7 @@ func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport queryRaw := map[string]interface{}{ "query": query, "sort": map[string]interface{}{ - "timestamp": "desc", + "@timestamp": "desc", }, "size": 1, } diff --git a/testing/integration/beats_serverless_test.go b/testing/integration/beats_serverless_test.go new file mode 100644 index 00000000000..57123e9142e --- /dev/null +++ b/testing/integration/beats_serverless_test.go @@ -0,0 +1,628 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/elastic/elastic-agent-libs/mapstr" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" +) + +type BeatRunner struct { + suite.Suite + requirementsInfo *define.Info + agentFixture *atesting.Fixture + + // connection info + ESHost string + user string + pass string + kibHost string + + testUuid string + testbeatName string + + skipCleanup bool +} + +func TestBeatsServerless(t *testing.T) { + info := define.Require(t, define.Requirements{ + OS: []define.OS{ + {Type: define.Linux}, + }, + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + + suite.Run(t, &BeatRunner{requirementsInfo: info}) +} + +func (runner *BeatRunner) SetupSuite() { + runner.skipCleanup = false + + runner.testbeatName = os.Getenv("TEST_BINARY_NAME") + if runner.testbeatName == "" { + runner.T().Fatalf("TEST_BINARY_NAME must be set") + } + if runner.testbeatName == "elastic-agent" { + runner.T().Skipf("tests must be run against a beat, not elastic-agent") + } + + if runner.testbeatName != "filebeat" && runner.testbeatName != "metricbeat" && runner.testbeatName != "auditbeat" && runner.testbeatName != "packetbeat" { + runner.T().Skip("test only supports metricbeat or filebeat") + } + runner.T().Logf("running serverless tests with %s", runner.testbeatName) + + agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute*3), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) + runner.agentFixture = agentFixture + require.NoError(runner.T(), err) + + // the require.* code will fail without these, so assume the values are non-nil + runner.ESHost = os.Getenv("ELASTICSEARCH_HOST") + runner.user = os.Getenv("ELASTICSEARCH_USERNAME") + runner.pass = os.Getenv("ELASTICSEARCH_PASSWORD") + runner.kibHost = os.Getenv("KIBANA_HOST") + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + beatOutConfig := ` +output.elasticsearch: + hosts: ["{{.es_host}}"] + api_key: "{{.key_user}}:{{.key_pass}}" +setup.kibana: + host: {{.kb_host}} +processors: + - add_fields: + target: host + fields: + test-id: {{.test_id}} +{{.beat_cfg}} +` + + mbCfg := ` +metricbeat.config.modules: + path: ${path.config}/modules.d/*.yml +` + + fbCfg := ` +filebeat.modules: + - module: system + syslog: + enabled: true + auth: + enabled: true +filebeat.config.modules: + - modules: system + syslog: + enabled: true + auth: + enabled: true +` + auditbeatCfg := ` +auditbeat.modules: + +- module: file_integrity + paths: + - /bin + - /usr/bin + - /sbin + - /usr/sbin + - /etc +` + + packetbeatCfg := ` +` + + tmpl, err := template.New("config").Parse(beatOutConfig) + require.NoError(runner.T(), err) + + apiResp, err := estools.CreateAPIKey(ctx, runner.requirementsInfo.ESClient, estools.APIKeyRequest{Name: "test-api-key", Expiration: "1d"}) + require.NoError(runner.T(), err) + + // beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443 + // so try to fix that here + fixedKibanaHost := runner.kibHost + parsedKibana, err := url.Parse(runner.kibHost) + require.NoError(runner.T(), err) + if parsedKibana.Port() == "" { + fixedKibanaHost = fmt.Sprintf("%s:443", fixedKibanaHost) + } + + fixedESHost := runner.ESHost + parsedES, err := url.Parse(runner.ESHost) + require.NoError(runner.T(), err) + if parsedES.Port() == "" { + fixedESHost = fmt.Sprintf("%s:443", fixedESHost) + } + + runner.T().Logf("configuring beats with %s / %s", fixedESHost, fixedKibanaHost) + + testUuid, err := uuid.NewV4() + require.NoError(runner.T(), err) + runner.testUuid = testUuid.String() + + additionalCfg := mbCfg + if runner.testbeatName == "filebeat" { + additionalCfg = fbCfg + } else if runner.testbeatName == "auditbeat" { + additionalCfg = auditbeatCfg + } else if runner.testbeatName == "packetbeat" { + additionalCfg = packetbeatCfg + } + + tmpl_map := map[string]string{"es_host": fixedESHost, "key_user": apiResp.Id, "key_pass": apiResp.APIKey, "kb_host": fixedKibanaHost, "test_id": testUuid.String(), "beat_cfg": additionalCfg} + parsedCfg := bytes.Buffer{} + err = tmpl.Execute(&parsedCfg, tmpl_map) + require.NoError(runner.T(), err) + + err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg.String(), fmt.Sprintf("%s.yml", runner.testbeatName)) + require.NoError(runner.T(), err) +} + +// run the beat with default metricsets, ensure no errors in logs + data is ingested +func (runner *BeatRunner) TestRunAndCheckData() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) + defer cancel() + + // in case there's already a running template, delete it, forcing the beat to re-install + runner.CleanupTemplates(ctx) + + err := runner.agentFixture.RunBeat(ctx) + require.NoError(runner.T(), err) + + docs, err := estools.GetLatestDocumentMatchingQuery(ctx, runner.requirementsInfo.ESClient, map[string]interface{}{ + "match": map[string]interface{}{ + "host.test-id": runner.testUuid, + }, + }, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), docs.Hits.Hits) +} + +// tests the [beat] setup --dashboards command +func (runner *BeatRunner) TestSetupDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) //dashboards seem to take a while + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), "setup", "--dashboards"}) + assert.NoError(runner.T(), err) + runner.T().Logf("got response from dashboard setup: %s", string(resp)) + require.True(runner.T(), strings.Contains(string(resp), "Loaded dashboards")) + + dashList, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + + // interesting hack in cases where we don't have a clean environment + // check to see if any of the dashboards were created recently + found := false + for _, dash := range dashList { + if time.Since(dash.UpdatedAt) < time.Minute*5 { + found = true + break + } + } + require.True(runner.T(), found, fmt.Sprintf("could not find dashboard newer than 5 minutes, out of %d dashboards", len(dashList))) + + runner.Run("export dashboards", runner.SubtestExportDashboards) + // cleanup + if !runner.skipCleanup { + for _, dash := range dashList { + err = tools.DeleteDashboard(ctx, runner.requirementsInfo.KibanaClient, dash.ID) + if err != nil { + runner.T().Logf("WARNING: could not delete dashboards after test: %s", err) + break + } + } + } +} + +// tests the [beat] export dashboard command +func (runner *BeatRunner) SubtestExportDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + dashlist, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), dashlist) + + exportOut, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "dashboard", "--folder", outDir, "--id", dashlist[0].ID}) + + runner.T().Logf("got output: %s", exportOut) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/_meta/kibana/8/dashboard")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) +} + +// NOTE for the below tests: the testing framework doesn't guarantee a new stack instance each time, +// which means we might be running against a stack where a previous test has already done setup. +// perhaps CI should run `mage integration:clean` first? + +// tests the [beat] setup --pipelines command +func (runner *BeatRunner) TestSetupPipelines() { + if runner.testbeatName != "filebeat" { + runner.T().Skip("pipelines only available on filebeat") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + defer func() { + // cleanup + if !runner.skipCleanup { + err := estools.DeletePipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + if err != nil { + runner.T().Logf("WARNING: could not clean up pipelines: %s", err) + } + } + + }() + + // need to actually enable something that has pipelines + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), + "setup", "--pipelines", "--modules", "apache", "-M", "apache.error.enabled=true", "-M", "apache.access.enabled=true"}) + assert.NoError(runner.T(), err) + + runner.T().Logf("got response from pipeline setup: %s", string(resp)) + + pipelines, err := estools.GetPipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), pipelines) + +} + +// test beat setup --index-management with ILM disabled +func (runner *BeatRunner) TestIndexManagementNoILM() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer func() { + runner.CleanupTemplates(ctx) + }() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + assert.NoError(runner.T(), err) + // we should not print a warning if we've explicitly disabled ILM + assert.NotContains(runner.T(), string(resp), "not supported") + + tmpls, err := estools.GetIndexTemplatesForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + for _, tmpl := range tmpls.IndexTemplates { + runner.T().Logf("got template: %s", tmpl.Name) + } + require.NotEmpty(runner.T(), tmpls.IndexTemplates) + + runner.Run("export templates", runner.SubtestExportTemplates) + runner.Run("export index patterns", runner.SubtestExportIndexPatterns) + +} + +// tests setup with all default settings +func (runner *BeatRunner) TestWithAllDefaults() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer func() { + runner.CleanupTemplates(ctx) + }() + + // pre-delete in case something else missed cleanup + runner.CleanupTemplates(ctx) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + streams, err := estools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), streams.DataStreams) + +} + +// test the setup process with mismatching template and DSL names +func (runner *BeatRunner) TestCustomBadNames() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + defer func() { + runner.CleanupTemplates(ctx) + }() + + resp, err := runner.agentFixture.Exec(ctx, []string{"-e", "--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", "--E=setup.dsl.data_stream_pattern='custom-bad-name'", "--E=setup.template.name='custom-name'", "--E=setup.template.pattern='custom-name'"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + require.True(runner.T(), strings.Contains(string(resp), "Additional updates & overwrites to this config will not work.")) + +} + +func (runner *BeatRunner) TestOverwriteWithCustomName() { + //an updated policy that has a different value than the default of 7d + updatedPolicy := mapstr.M{ + "data_retention": "1d", + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer func() { + runner.CleanupTemplates(ctx) + }() + + lctemp := runner.T().TempDir() + raw, err := json.MarshalIndent(updatedPolicy, "", " ") + require.NoError(runner.T(), err) + + lifecyclePath := filepath.Join(lctemp, "dsl_policy.json") + + err = os.WriteFile(lifecyclePath, raw, 0o744) + require.NoError(runner.T(), err) + + runner.CleanupTemplates(ctx) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", "--E=setup.dsl.data_stream_pattern='custom-name'", "--E=setup.template.name='custom-name'", "--E=setup.template.pattern='custom-name'"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + runner.CheckDSLPolicy(ctx, "*custom-name*", "7d") + + resp, err = runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", "--E=setup.dsl.overwrite=true", "--E=setup.dsl.data_stream_pattern='custom-name'", + "--E=setup.template.name='custom-name'", "--E=setup.template.pattern='custom-name'", fmt.Sprintf("--E=setup.dsl.policy_file=%s", lifecyclePath)}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + runner.CheckDSLPolicy(ctx, "*custom-name*", "1d") + +} + +// TestWithCustomLifecyclePolicy uploads a custom DSL policy +func (runner *BeatRunner) TestWithCustomLifecyclePolicy() { + //create a custom policy file + dslPolicy := mapstr.M{ + "data_retention": "1d", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer func() { + runner.CleanupTemplates(ctx) + }() + + lctemp := runner.T().TempDir() + raw, err := json.MarshalIndent(dslPolicy, "", " ") + require.NoError(runner.T(), err) + + lifecyclePath := filepath.Join(lctemp, "dsl_policy.json") + + err = os.WriteFile(lifecyclePath, raw, 0o744) + require.NoError(runner.T(), err) + + runner.CleanupTemplates(ctx) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", fmt.Sprintf("--E=setup.dsl.policy_file=%s", lifecyclePath)}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + runner.CheckDSLPolicy(ctx, fmt.Sprintf("%s*", runner.testbeatName), "1d") + +} + +// tests beat setup --index-management with ILM explicitly set +// On serverless, this should fail. +func (runner *BeatRunner) TestIndexManagementILMEnabledFailure() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := estools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=true", "--E=setup.ilm.overwrite=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.Error(runner.T(), err) + assert.Contains(runner.T(), string(resp), "error creating") +} + +// tests setup with both ILM and DSL enabled, should fail +func (runner *BeatRunner) TestBothLifecyclesEnabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=true", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.Error(runner.T(), err) +} + +// disable all lifecycle management, ensure it's actually disabled +func (runner *BeatRunner) TestAllLifecyclesDisabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer func() { + runner.CleanupTemplates(ctx) + }() + + runner.CleanupTemplates(ctx) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=false", "--E=setup.dsl.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + // make sure we have data streams, but there's no lifecycles + streams, err := estools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), streams.DataStreams, "found no datastreams") + foundPolicy := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention != "" { + foundPolicy = true + break + } + } + require.False(runner.T(), foundPolicy, "Found a lifecycle policy despite disabling lifecycles. Found: %#v", streams) +} + +// the export command doesn't actually make a network connection, +// so this won't fail +func (runner *BeatRunner) TestExport() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := estools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy", "--E=setup.ilm.enabled=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), policy["policy"]) +} + +// tests beat export with DSL +func (runner *BeatRunner) TestExportDSL() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), policy["data_retention"]) +} + +func (runner *BeatRunner) SubtestExportTemplates() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + _, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "template", "--dir", outDir}) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/template")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) +} + +func (runner *BeatRunner) SubtestExportIndexPatterns() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + rawPattern, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "index-pattern"}) + assert.NoError(runner.T(), err) + + idxPattern := map[string]interface{}{} + + err = json.Unmarshal(rawPattern, &idxPattern) + require.NoError(runner.T(), err) + require.NotNil(runner.T(), idxPattern["attributes"]) +} + +// CheckDSLPolicy checks if we have a match for the given DSL policy given a template name and policy data_retention +func (runner *BeatRunner) CheckDSLPolicy(ctx context.Context, tmpl string, policy string) { + streams, err := estools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, tmpl) + require.NoError(runner.T(), err) + + foundCustom := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention == policy { + foundCustom = true + break + } + } + + require.True(runner.T(), foundCustom, "did not find our lifecycle policy. Found: %#v", streams) +} + +// CleanupTemplates removes any existing index +func (runner *BeatRunner) CleanupTemplates(ctx context.Context) { + if !runner.skipCleanup { + _ = estools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + _ = estools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, "*custom-name*") + } +}