ZIO Kafka
Trace4Cats ZIO Extras provides tracing for both ZIO Kafka Producers and ZIO Kafka Consumers
ZIO Kafka Producer
Kafka records that are produced are augmented with tracing headers. The following example shows how to produce a Kafka record with tracing headers:
import io.kaizensolutions.trace4cats.zio.extras.*
import io.kaizensolutions.trace4cats.zio.extras.ziokafka.*
import zio.*
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde.Serde
val producerSettings = ProducerSettings(List("localhost:9092"))
val tracedProducerLayer = ZLayer.scoped[Any](Producer.make(producerSettings)) >>> KafkaProducerTracer.layer
val program: ZIO[ZTracer, Throwable, Unit] =
ZIO.foreachDiscard(1 to 10)(i =>
Producer
.produce("test-topic", s"key-$i", s"value-$i", Serde.string, Serde.string)
.provideSome[ZTracer](tracedProducerLayer)
)
Here is an example of the trace generated for one of the messages produced:
ZIO Kafka Consumer
Kafka records that are consumed will automatically continue the trace given that the associated trace header is present for the Kafka record. The following example shows how to consume a Kafka record with tracing headers:
import io.kaizensolutions.trace4cats.zio.extras.*
import io.kaizensolutions.trace4cats.zio.extras.ziokafka.*
import zio.kafka.consumer.{Consumer, ConsumerSettings, Subscription}
import zio.kafka.serde.Serde
import zio.*
val consumerSettings = ConsumerSettings(List(s"localhost:9092"))
.withGroupId("example-traced-group")
.withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(Consumer.AutoOffsetStrategy.Earliest))
val consume: ZIO[ZTracer & Consumer, Throwable, Unit] =
ZIO.serviceWithZIO[ZTracer](tracer =>
KafkaConsumerTracer
.traceConsumerStream(
tracer,
Consumer
.plainStream(Subscription.topics("test-topic"), Serde.string, Serde.string)
)
.tapWithTracer(tracer, "internal") { record =>
val event = s"${record.record.topic}-${record.record.key}-${record.record.value}"
ZIO.log(s"handled an event $event")
}
.endTracingEachElement
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.tap(_.commit)
.runDrain
)
Here is an example of the trace generated for one of the messages consumed: