Skip to main content

ZIO Kafka

Trace4Cats ZIO Extras provides tracing for both ZIO Kafka Producers and ZIO Kafka Consumers

ZIO Kafka Producer

Kafka records that are produced are augmented with tracing headers. The following example shows how to produce a Kafka record with tracing headers:

import io.kaizensolutions.trace4cats.zio.extras.*
import io.kaizensolutions.trace4cats.zio.extras.ziokafka.*
import zio.*
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde.Serde

val producerSettings = ProducerSettings(List("localhost:9092"))
val tracedProducerLayer = ZLayer.scoped[Any](Producer.make(producerSettings)) >>> KafkaProducerTracer.layer

val program: ZIO[ZTracer, Throwable, Unit] =
ZIO.foreachDiscard(1 to 10)(i =>
Producer
.produce("test-topic", s"key-$i", s"value-$i", Serde.string, Serde.string)
.provideSome[ZTracer](tracedProducerLayer)
)

Here is an example of the trace generated for one of the messages produced:

image

ZIO Kafka Consumer

Kafka records that are consumed will automatically continue the trace given that the associated trace header is present for the Kafka record. The following example shows how to consume a Kafka record with tracing headers:

import io.kaizensolutions.trace4cats.zio.extras.*
import io.kaizensolutions.trace4cats.zio.extras.ziokafka.*
import zio.kafka.consumer.{Consumer, ConsumerSettings, Subscription}
import zio.kafka.serde.Serde
import zio.*

val consumerSettings = ConsumerSettings(List(s"localhost:9092"))
.withGroupId("example-traced-group")
.withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(Consumer.AutoOffsetStrategy.Earliest))

val consume: ZIO[ZTracer & Consumer, Throwable, Unit] =
ZIO.serviceWithZIO[ZTracer](tracer =>
KafkaConsumerTracer
.traceConsumerStream(
tracer,
Consumer
.plainStream(Subscription.topics("test-topic"), Serde.string, Serde.string)
)
.tapWithTracer(tracer, "internal") { record =>
val event = s"${record.record.topic}-${record.record.key}-${record.record.value}"
ZIO.log(s"handled an event $event")
}
.endTracingEachElement
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.tap(_.commit)
.runDrain
)

Here is an example of the trace generated for one of the messages consumed:

image