From 2f0f52431155694b70a62f4e64f70fbc1f3a25b5 Mon Sep 17 00:00:00 2001 From: Michael Nedokushev Date: Sat, 19 Aug 2023 12:11:39 +0100 Subject: [PATCH] Initial commit --- .../context/ContextStorage.scala | 110 +++++++------- .../opentelemetry/logging/Logging.scala | 64 +++++++++ .../opentelemetry/logging/LoggingTest.scala | 136 ++++++++++++++++++ .../opentelemetry/tracing/TracingTest.scala | 12 +- 4 files changed, 262 insertions(+), 60 deletions(-) create mode 100644 opentelemetry/src/main/scala/zio/telemetry/opentelemetry/logging/Logging.scala create mode 100644 opentelemetry/src/test/scala/zio/telemetry/opentelemetry/logging/LoggingTest.scala diff --git a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/context/ContextStorage.scala b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/context/ContextStorage.scala index 9cbb13ae..2507a55c 100644 --- a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/context/ContextStorage.scala +++ b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/context/ContextStorage.scala @@ -3,7 +3,7 @@ package zio.telemetry.opentelemetry.context import io.opentelemetry.context.Context import zio._ -trait ContextStorage { +sealed trait ContextStorage { def get(implicit trace: Trace): UIO[Context] @@ -20,6 +20,59 @@ trait ContextStorage { object ContextStorage { + final class FiberRefContextStorage(private[zio] val ref: FiberRef[Context]) extends ContextStorage { + + override def get(implicit trace: Trace): UIO[Context] = + ref.get + + override def set(context: Context)(implicit trace: Trace): UIO[Unit] = + ref.set(context) + + override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] = + ref.getAndSet(context) + + override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] = + ref.updateAndGet(f) + + override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit + trace: Trace + ): ZIO[R, E, A] = + ref.locally(context)(zio) + + override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = + ref.locallyScoped(context) + } + + final class OpenTelemetryContextStorage extends ContextStorage { + + override def get(implicit trace: Trace): UIO[Context] = + ZIO.succeed(Context.current()) + + override def set(context: Context)(implicit trace: Trace): UIO[Unit] = + ZIO.succeed(context.makeCurrent()).unit + + override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] = + ZIO.succeed { + val old = Context.current() + val _ = context.makeCurrent() + old + }.uninterruptible + + override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] = + ZIO.succeed { + val updated = f(Context.current()) + val _ = updated.makeCurrent() + updated + }.uninterruptible + + override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, A] = + ZIO.acquireReleaseWith(get <* set(context))(set)(_ => zio) + + override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = + ZIO.acquireRelease(get <* set(context))(set).unit + + } + /** * The main one. Uses [[FiberRef]] as a [[ContextStorage]]. */ @@ -28,29 +81,7 @@ object ContextStorage { FiberRef .make[Context](Context.root()) .flatMap { ref => - ZIO.succeed { - new ContextStorage { - override def get(implicit trace: Trace): UIO[Context] = - ref.get - - override def set(context: Context)(implicit trace: Trace): UIO[Unit] = - ref.set(context) - - override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] = - ref.getAndSet(context) - - override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] = - ref.updateAndGet(f) - - override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit - trace: Trace - ): ZIO[R, E, A] = - ref.locally(context)(zio) - - override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = - ref.locallyScoped(context) - } - } + ZIO.succeed(new FiberRefContextStorage(ref)) } ) @@ -59,35 +90,6 @@ object ContextStorage { * [[https://github.com/open-telemetry/opentelemetry-java-instrumentation OTEL instrumentation agent]] is used. */ val openTelemetryContext: ULayer[ContextStorage] = - ZLayer.succeed { - new ContextStorage { - override def get(implicit trace: Trace): UIO[Context] = - ZIO.succeed(Context.current()) - - override def set(context: Context)(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(context.makeCurrent()).unit - - override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] = - ZIO.succeed { - val old = Context.current() - val _ = context.makeCurrent() - old - }.uninterruptible - - override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] = - ZIO.succeed { - val updated = f(Context.current()) - val _ = updated.makeCurrent() - updated - }.uninterruptible - - override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, A] = - ZIO.acquireReleaseWith(get <* set(context))(set)(_ => zio) - - override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = - ZIO.acquireRelease(get <* set(context))(set).unit - - } - } + ZLayer.succeed(new OpenTelemetryContextStorage) } diff --git a/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/logging/Logging.scala b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/logging/Logging.scala new file mode 100644 index 00000000..cf8150b7 --- /dev/null +++ b/opentelemetry/src/main/scala/zio/telemetry/opentelemetry/logging/Logging.scala @@ -0,0 +1,64 @@ +package zio.telemetry.opentelemetry.logging + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.logs.{Logger, LoggerProvider} +import io.opentelemetry.context.Context +import zio._ +import zio.telemetry.opentelemetry.context.ContextStorage + +object Logging { + + def live( + instrumentationScopeName: String, + logLevel: LogLevel = LogLevel.Info + ): ZLayer[ContextStorage with LoggerProvider, Nothing, Unit] = + ZLayer + .fromZIO( + for { + loggerProvider <- ZIO.service[LoggerProvider] + contextStorage <- ZIO.service[ContextStorage] + logger <- ZIO.succeed( + zioLogger(instrumentationScopeName)(contextStorage, loggerProvider) + .filterLogLevel(l => l >= logLevel) + ) + } yield logger + ) + .flatMap(env => Runtime.addLogger(env.get)) + + private def zioLogger(instrumentationScopeName: String)( + contextStorage: ContextStorage, + loggerProvider: LoggerProvider + ): ZLogger[String, Unit] = + new ZLogger[String, Unit] { + + val logger: Logger = loggerProvider.get(instrumentationScopeName) + + override def apply( + trace: Trace, + fiberId: FiberId, + logLevel: LogLevel, + message: () => String, + cause: Cause[Any], + context: FiberRefs, + spans: List[LogSpan], + annotations: Map[String, String] + ): Unit = { + val builder = logger.logRecordBuilder() + + builder.setBody(message()) + builder.setSeverityText(logLevel.label) + annotations.foreach { case (k, v) => builder.setAttribute(AttributeKey.stringKey(k), v) } + + contextStorage match { + case cs: ContextStorage.FiberRefContextStorage => + context.get(cs.ref).foreach(builder.setContext) + case _: ContextStorage.OpenTelemetryContextStorage => + builder.setContext(Context.current()) + } + + builder.emit() + } + + } + +} diff --git a/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/logging/LoggingTest.scala b/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/logging/LoggingTest.scala new file mode 100644 index 00000000..bcbe49dd --- /dev/null +++ b/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/logging/LoggingTest.scala @@ -0,0 +1,136 @@ +package zio.telemetry.opentelemetry.logging + +import io.opentelemetry.api.logs.LoggerProvider +import io.opentelemetry.sdk.logs.SdkLoggerProvider +import io.opentelemetry.sdk.logs.`export`.SimpleLogRecordProcessor +import io.opentelemetry.sdk.logs.data.LogRecordData +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter +import zio._ +import zio.telemetry.opentelemetry.context.ContextStorage +import zio.telemetry.opentelemetry.tracing.{Tracing, TracingTest} +import zio.test._ +import zio.test.Assertion._ + +import scala.jdk.CollectionConverters._ + +object LoggingTest extends ZIOSpecDefault { + + val inMemoryLogLoggerProvider = + for { + logRecordExporter <- ZIO.succeed(InMemoryLogRecordExporter.create()) + logRecordProcessor <- ZIO.succeed(SimpleLogRecordProcessor.create(logRecordExporter)) + loggerProvider <- ZIO.succeed(SdkLoggerProvider.builder().addLogRecordProcessor(logRecordProcessor).build()) + } yield (logRecordExporter, loggerProvider) + + val inMemoryLoggerProviderLayer: ULayer[InMemoryLogRecordExporter with LoggerProvider] = + ZLayer.fromZIOEnvironment(inMemoryLogLoggerProvider.map { case (inMemoryLogRecordExporter, loggerProvider) => + ZEnvironment(inMemoryLogRecordExporter).add(loggerProvider) + }) + + def loggingMockLayer( + instrumentationScopeName: String + ): URLayer[ContextStorage, InMemoryLogRecordExporter with LoggerProvider] = + inMemoryLoggerProviderLayer >>> ( + Logging.live(instrumentationScopeName) ++ inMemoryLoggerProviderLayer + ) + + def getFinishedLogRecords: ZIO[InMemoryLogRecordExporter, Nothing, List[LogRecordData]] = + ZIO.service[InMemoryLogRecordExporter].map(_.getFinishedLogRecordItems.asScala.toList) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("zio opentelemetry")( + suite("Logging")( + test("works with empty tracing context") { + for { + _ <- ZIO.logInfo("test") + logRecords <- getFinishedLogRecords + } yield { + val r = logRecords.head + val body = r.getBody.asString() + val severityNumber = r.getSeverity.getSeverityNumber + val severityText = r.getSeverityText + val instrumentationScopeName = r.getInstrumentationScopeInfo.getName + val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString } + val traceId = r.getSpanContext.getTraceId + val spanId = r.getSpanContext.getSpanId + + assert(logRecords.length)(equalTo(1)) && + assert(body)(equalTo("test")) && + assert(severityNumber)(equalTo(0)) && // TODO: set it + assert(severityText)(equalTo("INFO")) && + assert(instrumentationScopeName)(equalTo("test1")) && + assert(attributes)(equalTo(Map.empty[String, String])) && + assert(traceId)(equalTo("00000000000000000000000000000000")) && + assert(spanId)(equalTo("0000000000000000")) + } + }.provide(Runtime.removeDefaultLoggers >>> loggingMockLayer("test1"), ContextStorage.fiberRef), + test("works in a tracing context (fiberRef)") { + ZIO.serviceWithZIO[Tracing] { tracing => + tracing.root("ROOT")( + for { + spanCtx <- tracing.getCurrentSpanContextUnsafe + _ <- ZIO.logInfo("test") + logRecords <- getFinishedLogRecords + } yield { + val r = logRecords.head + val body = r.getBody.asString() + val severityNumber = r.getSeverity.getSeverityNumber + val severityText = r.getSeverityText + val instrumentationScopeName = r.getInstrumentationScopeInfo.getName + val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString } + val traceId = r.getSpanContext.getTraceId + val spanId = r.getSpanContext.getSpanId + + assert(logRecords.length)(equalTo(1)) && + assert(body)(equalTo("test")) && + assert(severityNumber)(equalTo(0)) && // TODO: set it + assert(severityText)(equalTo("INFO")) && + assert(instrumentationScopeName)(equalTo("test2")) && + assert(attributes)(equalTo(Map.empty[String, String])) && + assert(traceId)(equalTo(spanCtx.getTraceId)) && + assert(spanId)(equalTo(spanCtx.getSpanId)) + } + ) + } + }.provide( + Runtime.removeDefaultLoggers >>> loggingMockLayer("test2"), + TracingTest.tracingMockLayer, + ContextStorage.fiberRef + ), + test("works in a tracing context (openTelemtryContext") { + ZIO.serviceWithZIO[Tracing] { tracing => + tracing.root("ROOT")( + for { + spanCtx <- tracing.getCurrentSpanContextUnsafe + _ <- ZIO.logInfo("test") + logRecords <- getFinishedLogRecords + } yield { + val r = logRecords.head + val body = r.getBody.asString() + val severityNumber = r.getSeverity.getSeverityNumber + val severityText = r.getSeverityText + val instrumentationScopeName = r.getInstrumentationScopeInfo.getName + val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString } + val traceId = r.getSpanContext.getTraceId + val spanId = r.getSpanContext.getSpanId + + assert(logRecords.length)(equalTo(1)) && + assert(body)(equalTo("test")) && + assert(severityNumber)(equalTo(0)) && // TODO: set it + assert(severityText)(equalTo("INFO")) && + assert(instrumentationScopeName)(equalTo("test3")) && + assert(attributes)(equalTo(Map.empty[String, String])) && + assert(traceId)(equalTo(spanCtx.getTraceId)) && + assert(spanId)(equalTo(spanCtx.getSpanId)) + } + ) + } + }.provide( + Runtime.removeDefaultLoggers >>> loggingMockLayer("test3"), + TracingTest.tracingMockLayer, + ContextStorage.openTelemetryContext + ) + ) + ) + +} diff --git a/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/tracing/TracingTest.scala b/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/tracing/TracingTest.scala index 55fe299f..75cb5cd2 100644 --- a/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/tracing/TracingTest.scala +++ b/opentelemetry/src/test/scala/zio/telemetry/opentelemetry/tracing/TracingTest.scala @@ -26,12 +26,12 @@ object TracingTest extends ZIOSpecDefault { tracer = tracerProvider.get("TracingTest") } yield (spanExporter, tracer) - val inMemoryTracerLayer: ULayer[InMemorySpanExporter with Tracer with ContextStorage] = + val inMemoryTracerLayer: ULayer[InMemorySpanExporter with Tracer] = ZLayer.fromZIOEnvironment(inMemoryTracer.map { case (inMemorySpanExporter, tracer) => ZEnvironment(inMemorySpanExporter).add(tracer) - }) ++ ContextStorage.fiberRef + }) - val tracingMockLayer: ULayer[Tracing with InMemorySpanExporter with Tracer] = + val tracingMockLayer: URLayer[ContextStorage, Tracing with InMemorySpanExporter with Tracer] = inMemoryTracerLayer >>> (Tracing.live ++ inMemoryTracerLayer) def getFinishedSpans: ZIO[InMemorySpanExporter, Nothing, List[SpanData]] = @@ -55,7 +55,7 @@ object TracingTest extends ZIOSpecDefault { _ <- ZIO.scoped(Tracing.live.build) finishedSpans <- getFinishedSpans } yield assert(finishedSpans)(hasSize(equalTo(0))) - }.provideLayer(inMemoryTracerLayer) + }.provide(inMemoryTracerLayer, ContextStorage.fiberRef) ) private val spansSpec = @@ -554,7 +554,7 @@ object TracingTest extends ZIOSpecDefault { } yield assert(ko)(isSome(failureAssertion)) && assert(ok)(isSome(successAssertion)) } } - ).provideLayer(tracingMockLayer) + ).provide(tracingMockLayer, ContextStorage.fiberRef) private val spanScopedSpec = suite("scoped spans")( @@ -656,5 +656,5 @@ object TracingTest extends ZIOSpecDefault { } yield assert(tags.get(AttributeKey.stringKey("string")))(equalTo("bar")) } } - ).provideLayer(tracingMockLayer) + ).provide(tracingMockLayer, ContextStorage.fiberRef) }