FS2 Streams
In addition to supporting ZStream
where each element of the stream has its own span, we also support fs2.Stream
in the same way. For example:
import fs2.Stream
import zio.*
import zio.interop.catz.*
import trace4cats.*
import io.kaizensolutions.trace4cats.zio.extras.fs2.*
import io.kaizensolutions.trace4cats.zio.extras.*
type Effect[A] = RIO[ZTracer, A]
val tracedStream: Stream[Effect, Spanned[Int]] =
Stream
.range(1, 100)
.covary[Effect]
.evalMap(i => ZTracer.withSpan(s"name-$i")(span => ZIO.succeed((i, span.extractHeaders(ToHeaders.standard)))))
.traceEachElement("in-begin") { case (_, headers) => headers }
.mapThrough(_._1)
.evalMapTraced("Plus 1")(e =>
ZTracer.span(s"plus 1 for $e")(ZIO.succeed(println(s"Adding ${e} + 1 = ${e + 1}")) *> ZIO.succeed(e + 1))
)
.parEvalMapTraced("Plus 2")(8)(e =>
ZTracer.span(s"plus 2 for $e")(
ZIO
.succeed(println(s"Adding ${e} + 2 = ${e + 2}"))
.delay(500.millis) *>
ZIO.succeed(e + 2)
)
)
.parEvalMapTraced("Plus 4")(3)(e =>
ZTracer.span(s"plus 4 for $e")(
ZTracer.spanSource()(
ZIO
.succeed(println(s"Adding ${e} + 4 = ${e + 4}"))
.delay(1.second)
) *> ZIO.succeed(e + 2)
)
)
val tracedWorkflow: RIO[ZTracer, Unit] = tracedStream.compile.drain
This generates the following set of spans: