Skip to content

Commit

Permalink
fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiuszkierat committed Jan 16, 2025
1 parent b543190 commit c8f0be9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//> using dep org.apache.pekko::pekko-stream:1.1.2
//> using dep org.typelevel::cats-effect:3.5.7
//> using dep com.softwaremill.sttp.client3::core:3.10.2
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2

package sttp.tapir.examples.streaming

Expand All @@ -28,14 +28,22 @@ import scala.concurrent.duration.FiniteDuration
object longLastingClient extends IOApp:
implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient")

private val givenLength: Long = 10000
private val chunkSize = 100
private val noChunks = givenLength / chunkSize

private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] =
val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem =>
println(s"$elem ${java.time.LocalTime.now()}"); elem
}
val stream: Source[ByteString, Any] =
Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte)))
.zipWithIndex
.take(noChunks)
.map { case (chunk, idx) =>
println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk
}

basicRequest
.post(uri"http://localhost:9000/chunks")
.header(Header(HeaderNames.ContentLength, "10000"))
.header(Header(HeaderNames.ContentLength, givenLength.toString))
.streamBody(PekkoStreams)(stream)
.send(backend)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import sttp.tapir.*
import scala.concurrent.{ExecutionContext, Future}
import scala.util.*
import org.apache.pekko
import pekko.stream.scaladsl.{Flow, Source}
import pekko.stream.scaladsl.{Flow, Source, Sink}
import pekko.util.ByteString
import sttp.tapir.server.play.PlayServerOptions

Expand All @@ -41,18 +41,26 @@ def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] =
Success(Left(e.getMessage))
}

def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = {
def logic(s: (Long, Source[ByteString, Any])): Future[String] = {
val (length, stream) = s
println(s"Received $length bytes, ${stream.map(_.length)} bytes in total")
Future.successful((length, stream))
println(s"Transmitting $length bytes...")
val result = stream
.runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS)
.map(_.reduce(_ ++ _).decodeString("UTF-8"))
result.onComplete {
case Failure(ex) =>
println(s"Stream failed with exception: $ex" )
case Success(s) =>
println(s"Stream finished: ${s.length}/$length transmitted")
}
result
}

val e = endpoint.post
.in("chunks")
.in(header[Long](HeaderNames.ContentLength))
.in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.out(stringBody)
.errorOut(plainBody[ErrorInfo])
.serverLogic(logic.andThen(handleErrors))

Expand Down

0 comments on commit c8f0be9

Please sign in to comment.