Skip to content

Commit

Permalink
Fix shutdowns in test servers (#3930)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski authored Jul 14, 2024
1 parent 17df38f commit 5a51c11
Show file tree
Hide file tree
Showing 24 changed files with 103 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,14 @@ class AkkaHttpTestServerInterpreter(implicit actorSystem: ActorSystem)
AkkaHttpServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Route],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*))))

Resource
.make(
bind.map(b =>
(
b.localAddress.getPort,
IO.fromFuture(IO(b.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void
)
)
) { case (_, release) =>
release
}
.make(bind)(server => IO.fromFuture(IO(server.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void)
.map(_.localAddress.getPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import scala.concurrent.duration._

trait ArmeriaTestServerInterpreter[S <: Streams[S], F[_], OPTIONS] extends TestServerInterpreter[F, S, OPTIONS, TapirService[S, F]] {

override def serverWithStop(
override def server(
routes: NonEmptyList[TapirService[S, F]],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val (quietPeriodMs, totalDeadlineMs) = gracefulShutdownTimeout
.map(t => (t.toMillis, t.toMillis + 50))
.getOrElse((0L, 0L))
Expand All @@ -31,19 +31,11 @@ trait ArmeriaTestServerInterpreter[S <: Streams[S], F[_], OPTIONS] extends TestS
server.start().thenApply[Server](_ => server)
}
)
// Ignore future returned by stop() for fast test iterations.
// Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining
// tasks in the queue. Even if graceful shutdown timeouts are set to 0.
Resource
.make(
bind.map(b =>
(
b.activeLocalPort(),
// Ignore returned future for fast test iterations.
// Armeria server wait for 2 seconds by default to let the boss group gracefully finish all remaining
// tasks in the queue. Even if graceful shutdown timeouts are set to 0.
IO { val _ = b.stop() }
)
)
) { case (_, release) =>
release
}
.make(bind)(s => IO.blocking { val _ = s.stop() })
.map(_.activeLocalPort())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@ class FinatraTestServerInterpreter extends TestServerInterpreter[Future, Any, Fi
es.map(interpreter.toRoute).last
}

override def serverWithStop(
override def server(
routes: NonEmptyList[FinatraRoute],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] = FinatraTestServerInterpreter.serverWithStop(routes, gracefulShutdownTimeout)
}
): Resource[IO, Port] = {

object FinatraTestServerInterpreter {
def serverWithStop(
routes: NonEmptyList[FinatraRoute],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
def waitUntilHealthy(s: EmbeddedHttpServer, count: Int): IO[EmbeddedHttpServer] =
if (s.isHealthy) IO.pure(s)
else if (count > 1000) IO.raiseError(new IllegalStateException("Server unhealthy"))
Expand Down Expand Up @@ -65,15 +59,7 @@ object FinatraTestServerInterpreter {
}.flatMap(waitUntilHealthy(_, 0))

Resource
.make(
bind.map(server =>
(
server.httpExternalPort(),
IO { server.close(Duration.fromMilliseconds(gracefulShutdownTimeout.map(_.toMillis).getOrElse(50))) }
)
)
) { case (_, release) =>
release
}
.make(bind)(server => IO.blocking(server.close(Duration.fromMilliseconds(gracefulShutdownTimeout.map(_.toMillis).getOrElse(50)))))
.map(_.httpExternalPort())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,18 @@ class Http4sTestServerInterpreter extends TestServerInterpreter[IO, Fs2Streams[I
Http4sServerInterpreter(serverOptions).toWebSocketRoutes(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val service: WebSocketBuilder2[IO] => HttpApp[IO] =
wsb => routes.map(_.apply(wsb)).reduceK.orNotFound

Resource.make(
BlazeServerBuilder[IO]
.withExecutionContext(ExecutionContext.global)
.bindHttp(0, "localhost")
.withHttpWebSocketApp(service)
.resource
.allocated
.map { case (server, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676
(server.address.getPort(), release)
}
) { case (_, release) => release }
BlazeServerBuilder[IO]
.withExecutionContext(ExecutionContext.global)
.bindHttp(0, "localhost")
.withHttpWebSocketApp(service)
.resource
.map(_.address.getPort())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sttp.tapir.server.http4s.ztapir

import cats.data.NonEmptyList
import cats.effect.{IO, Resource}
import cats._
import cats.syntax.all._
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.websocket.WebSocketBuilder2
Expand All @@ -15,7 +16,6 @@ import sttp.tapir.tests._
import sttp.tapir.ztapir.ZServerEndpoint
import zio.{Runtime, Task, Unsafe}
import zio.interop.catz._
import zio.interop.catz.implicits._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
Expand All @@ -34,28 +34,22 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream
ZHttp4sServerInterpreter(serverOptions).fromWebSocket(es).toRoutes
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val service: WebSocketBuilder2[Task] => HttpApp[Task] =
wsb => routes.map(_.apply(wsb)).reduceK.orNotFound

val serverResource = BlazeServerBuilder[Task]
BlazeServerBuilder[Task]
.withExecutionContext(ExecutionContext.global)
.bindHttp(0, "localhost")
.withHttpWebSocketApp(service)
.resource
.map(_.address.getPort)

// Converting a zio.RIO-resource to an cats.IO-resource
val runtime = implicitly[zio.Runtime[Any]]
Resource
.eval(IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(serverResource.allocated)))))
.flatMap { case (port, release) => // Blaze has no graceful shutdown support https://github.com/http4s/blaze/issues/676
Resource.make(IO.pure((port, IO.fromFuture(IO(Unsafe.unsafe(implicit u => Runtime.default.unsafe.runToFuture(release))))))) {
case (_, release) => release
}
}
.mapK(new ~>[Task, IO] {
// Converting a ZIO effect to an Cats Effect IO effect
def apply[B](fa: Task[B]): IO[B] = IO.fromFuture(Unsafe.unsafe(implicit u => IO(Runtime.default.unsafe.runToFuture(fa))))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Identity, Any
JdkHttpServerInterpreter(serverOptions).toHandler(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[HttpHandler],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val server = IO.blocking {
val server = HttpServer.create(new InetSocketAddress(0), 0)

Expand All @@ -48,8 +48,7 @@ class JdkHttpTestServerInterpreter() extends TestServerInterpreter[Identity, Any
}

Resource
.make(server.map(s => (s.getAddress.getPort, IO.blocking(s.stop(gracefulShutdownTimeout.map(_.toSeconds.toInt).getOrElse(0)))))) {
case (_, release) => release
}
.make(server)(s => IO.blocking(s.stop(gracefulShutdownTimeout.map(_.toSeconds.toInt).getOrElse(0))))
.map(_.getAddress.getPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch
NettyCatsServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Route[IO]],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val config = NettyConfig.default
.eventLoopGroup(eventLoopGroup)
.randomPort
Expand All @@ -36,6 +36,7 @@ class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatch
val bind: IO[NettyCatsServerBinding[IO]] = NettyCatsServer(options, customizedConfig).addRoutes(routes.toList).start()

Resource
.make(bind.map(b => (b.port, b.stop()))) { case (_, release) => release }
.make(bind)(_.stop())
.map(_.port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic
NettyFutureServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[FutureRoute],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val config =
NettyConfig.default
.eventLoopGroup(eventLoopGroup)
Expand All @@ -34,6 +34,7 @@ class NettyFutureTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)(implic
val bind = IO.fromFuture(IO.delay(NettyFutureServer(options, customizedConfig).addRoutes(routes.toList).start()))

Resource
.make(bind.map(b => (b.port, IO.fromFuture(IO.delay(b.stop()))))) { case (_, release) => release }
.make(bind)(server => IO.fromFuture(IO.delay(server.stop())))
.map(_.port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
}
}

override def serverWithStop(
override def server(
routes: NonEmptyList[IdRoute],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, IO[Unit])] = {
): Resource[IO, Port] = {
val config =
NettyConfig.default.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose.noGracefulShutdown
val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config)
val options = NettySyncServerOptions.default
val bind = IO.blocking(NettySyncServer(options, customizedConfig).start(routes.toList))

Resource
.make(bind.map(b => (b.port, IO.blocking(b.stop())))) { case (_, stop) => stop }
Resource.make(bind)(server => IO.blocking(server.stop())).map(_.port)
}

def scopedServerWithRoutesStop(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup)
NettyZioServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Task[Route[Task]]],
gracefulShutdownTimeout: Option[FiniteDuration] = None
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val config = NettyConfig.default
.eventLoopGroup(eventLoopGroup)
.randomPort
Expand All @@ -46,8 +46,7 @@ class NettyZioTestServerInterpreter[R](eventLoopGroup: NioEventLoopGroup)
)

Resource
.make(bind.map(b => (b.port, IO.fromFuture[Unit](IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(b.stop()))))))) {
case (_, release) => release
}
.make(bind)(server => IO.fromFuture[Unit](IO(Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(server.stop())))))
.map(_.port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class NimaTestServerInterpreter() extends TestServerInterpreter[Identity, Any, N
NimaServerInterpreter(serverOptions).toHandler(es)
}

override def serverWithStop(
override def server(
nimaRoutes: NonEmptyList[Handler],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val bind = IO.blocking {
WebServer
.builder()
Expand All @@ -35,8 +35,7 @@ class NimaTestServerInterpreter() extends TestServerInterpreter[Identity, Any, N
}

Resource
.make(bind.map(b => (b.port, IO.blocking(b.stop()).map(_ => ())))) { case (_, release) =>
release
}
.make(bind)(server => IO.blocking { val _ = server.stop() })
.map(_.port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,14 @@ class PekkoHttpTestServerInterpreter(implicit actorSystem: ActorSystem)
PekkoHttpServerInterpreter(serverOptions).toRoute(es)
}

override def serverWithStop(
override def server(
routes: NonEmptyList[Route],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
): Resource[IO, Port] = {
val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*))))

Resource
.make(
bind.map(b =>
(
b.localAddress.getPort(),
IO.fromFuture(IO(b.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void
)
)
) { case (_, release) =>
release
}
.make(bind)(server => IO.fromFuture(IO(server.terminate(gracefulShutdownTimeout.getOrElse(50.millis)))).void)
.map(_.localAddress.getPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ class PlayTestServerInterpreter(implicit actorSystem: ActorSystem)
PlayServerInterpreter(serverOptions).toRoutes(es)
}

import play.core.server.PekkoHttpServer

override def serverWithStop(
override def server(
routes: NonEmptyList[Routes],
gracefulShutdownTimeout: Option[FiniteDuration]
): Resource[IO, (Port, KillSwitch)] = {
val components = new DefaultPekkoHttpServerComponents {
): Resource[IO, Port] = {
lazy val components = new DefaultPekkoHttpServerComponents {
val initialServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test)

val customConf =
Expand All @@ -59,9 +57,9 @@ class PlayTestServerInterpreter(implicit actorSystem: ActorSystem)
})
)
}
val bind = IO {
val bind = IO.blocking {
components.server
}
Resource.make(bind.map(s => (s.mainAddress.getPort, IO(s.stop())))) { case (_, release) => release }
Resource.make(bind)(s => IO.blocking(s.stop())).map(s => (s.mainAddress.getPort))
}
}
Loading

0 comments on commit 5a51c11

Please sign in to comment.