diff --git a/CHANGES b/CHANGES index f23575b2531..5b8831ad9cb 100644 --- a/CHANGES +++ b/CHANGES @@ -14,7 +14,15 @@ New Features as an address. ``RB_ID=758862`` * finagle-core: `c.t.f.service.Retries` now supports adding delay between each automatic retry. - This is configured via the Retries.Budget. ``RB_ID=768883`` + This is configured via the `Retries.Budget`. ``RB_ID=768883`` + + * finagle-core: FailureAccrualFactory now uses a FailureAccrualPolicy to determine when to + mark an endpoint dead. The default policy, FailureAccrualPolicy.consecutiveFailures(), + mimicks existing functionality, and FailureAccrualPolicy.successRate() operates on the + exponentially weighted average success rate over a window of requests.``RB_ID=756921`` + + * finagle-core: Introduce `c.t.f.transport.Transport.Options` to configure transport-level options + (i.e., socket options `TCP_NODELAY` and `SO_REUSEADDR`). ``RB_ID=773824`` * finagle-netty4: Hello World. Introduce a `Listener` for Netty 4.1. This is still considered beta. ``RB_ID=718688`` @@ -100,11 +108,6 @@ Breaking API Changes Bug Fixes ~~~~~~~~~ - * finagle-core: FailureAccrualFactory now uses a FailureAccrualPolicy to determine when to - mark an endpoint dead. The default policy, FailureAccrualPolicy.consecutiveFailures(), - mimicks existing functionality, and FailureAccrualPolicy.successRate() operates on the - exponentially weighted average success rate over a window of requests.``RB_ID=756921`` - * finagle-thrift: `c.t.f.ThriftRichClient` scoped stats label is now threaded properly through `newServiceIface` ``RB_ID=760157`` diff --git a/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Listener.scala b/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Listener.scala index 9fa57582b08..c7b6512185c 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Listener.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Listener.scala @@ -178,11 +178,12 @@ object Netty3Listener { case Transport.Verbose(true) => Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _))) case _ => None } + val Transport.Options(noDelay, reuseAddr) = params[Transport.Options] val opts = new mutable.HashMap[String, Object]() opts += "soLinger" -> (0: java.lang.Integer) - opts += "reuseAddress" -> java.lang.Boolean.TRUE - opts += "child.tcpNoDelay" -> java.lang.Boolean.TRUE + opts += "reuseAddress" -> (reuseAddr: java.lang.Boolean) + opts += "child.tcpNoDelay" -> (noDelay: java.lang.Boolean) for (v <- backlog) opts += "backlog" -> (v: java.lang.Integer) for (v <- sendBufSize) opts += "child.sendBufferSize" -> (v: java.lang.Integer) for (v <- recvBufSize) opts += "child.receiveBufferSize" -> (v: java.lang.Integer) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Transporter.scala b/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Transporter.scala index e9f41833a04..fa3f2cac581 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Transporter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/netty3/Netty3Transporter.scala @@ -143,11 +143,12 @@ object Netty3Transporter { case Transport.Verbose(true) => Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _))) case _ => None } + val Transport.Options(noDelay, reuseAddr) = params[Transport.Options] val opts = new mutable.HashMap[String, Object]() opts += "connectTimeoutMillis" -> ((connectTimeout + compensation).inMilliseconds: java.lang.Long) - opts += "tcpNoDelay" -> java.lang.Boolean.TRUE - opts += "reuseAddress" -> java.lang.Boolean.TRUE + opts += "tcpNoDelay" -> (noDelay: java.lang.Boolean) + opts += "reuseAddress" -> (reuseAddr: java.lang.Boolean) for (v <- keepAlive) opts += "keepAlive" -> (v: java.lang.Boolean) for (s <- sendBufSize) opts += "sendBufferSize" -> (s: java.lang.Integer) for (s <- recvBufSize) opts += "receiveBufferSize" -> (s: java.lang.Integer) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/server/Listener.scala b/finagle-core/src/main/scala/com/twitter/finagle/server/Listener.scala index 436e10a5cdf..696febf11ca 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/server/Listener.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/server/Listener.scala @@ -58,20 +58,4 @@ object Listener { object TrafficClass { implicit val param = Stack.Param(TrafficClass(None)) } - - /** - * Configures TCP_NODELAY on the server socket - * @param b `true` disables Nagle's algorithm. - * @note This param only applies to netty's >= 4. - * Netty 3's socket options are still configured by - * the [[com.twitter.finagle.netty3.Netty3Listener]] instance. - */ - case class NoDelay(b: Boolean) { - def mk(): (NoDelay, Stack.Param[NoDelay]) = - (this, NoDelay.param) - } - - object NoDelay { - implicit val param = Stack.Param(NoDelay(true)) - } } diff --git a/finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala b/finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala index cfa0617091d..73c2e18151f 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala @@ -141,12 +141,12 @@ object Transport { * $param the verbosity of a `Transport`. Transport activity is * written to [[com.twitter.finagle.param.Logger]]. */ - case class Verbose(b: Boolean) { + case class Verbose(enabled: Boolean) { def mk(): (Verbose, Stack.Param[Verbose]) = (this, Verbose.param) } object Verbose { - implicit val param = Stack.Param(Verbose(false)) + implicit val param = Stack.Param(Verbose(enabled = false)) } /** @@ -171,12 +171,31 @@ object Transport { implicit val param = Stack.Param(TLSServerEngine(None)) } + /** + * $param the options (i.e., socket options) of a `Transport`. + * + * @param noDelay enables or disables `TCP_NODELAY` (Nagle's algorithm) + * option on a transport socket (`noDelay = true` means + * disabled). Default is `true` (disabled). + * + * @param reuseAddr enables or disables `SO_REUSEADDR` option on a + * transport socket. Default is `true`. + */ + case class Options(noDelay: Boolean, reuseAddr: Boolean) { + def mk(): (Options, Stack.Param[Options]) = (this, Options.param) + } + + object Options { + implicit val param: Stack.Param[Options] = + Stack.Param(Options(noDelay = true, reuseAddr = true)) + } + /** * Serializes the object stream from a `Transport` into a * [[com.twitter.io.Writer]]. * * The serialization function `f` can return `Future.None` to interrupt the - * stream to faciliate using the transport with multiple writers and vice + * stream to facilitate using the transport with multiple writers and vice * versa. * * Both transport and writer are unmanaged, the caller must close when diff --git a/finagle-core/src/test/scala/com/twitter/finagle/netty3/Netty3TransporterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/netty3/Netty3TransporterTest.scala index 21e1d11e625..938933177b1 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/netty3/Netty3TransporterTest.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/netty3/Netty3TransporterTest.scala @@ -64,7 +64,7 @@ class Netty3TransporterTest extends FunSpec with MockitoSugar { assert(transporter.channelOptions.get("connectTimeoutMillis").get == inputParams[Transporter.ConnectTimeout].howlong.inMilliseconds + inputParams[LatencyCompensation.Compensation].howlong.inMilliseconds) - assert(transporter.channelSnooper.nonEmpty == inputParams[Transport.Verbose].b) + assert(transporter.channelSnooper.nonEmpty == inputParams[Transport.Verbose].enabled) } it("newPipeline handles unresolved InetSocketAddresses") { diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/Netty4Listener.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/Netty4Listener.scala index 71ddff425b8..fd7d0ec5e6c 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/Netty4Listener.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/Netty4Listener.scala @@ -56,11 +56,10 @@ private[finagle] case class Netty4Listener[In, Out]( // transport params private[this] val Transport.Liveness(_, _, keepAlive) = params[Transport.Liveness] private[this] val Transport.BufferSizes(sendBufSize, recvBufSize) = params[Transport.BufferSizes] + private[this] val Transport.Options(noDelay, reuseAddr) = params[Transport.Options] - // socket params + // listener params private[this] val Listener.Backlog(backlog) = params[Listener.Backlog] - private[this] val Listener.NoDelay(noDelay) = params[Listener.NoDelay] - /** * Listen for connections and apply the `serveTransport` callback on connected [[Transport transports]]. @@ -90,7 +89,7 @@ private[finagle] case class Netty4Listener[In, Out]( bootstrap.childOption[JBool](ChannelOption.TCP_NODELAY, noDelay) //bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //todo: investigate pooled allocator CSL-2089 //bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, true) + bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, reuseAddr) bootstrap.option[JInt](ChannelOption.SO_LINGER, 0) backlog.foreach(bootstrap.option[JInt](ChannelOption.SO_BACKLOG, _)) sendBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_SNDBUF, _)) diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/ChannelRequestStatsHandler.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/ChannelRequestStatsHandler.scala index a99e532e215..a8103f6a96a 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/ChannelRequestStatsHandler.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/ChannelRequestStatsHandler.scala @@ -14,7 +14,8 @@ private[finagle] object ChannelRequestStatsHandler { * A channel stats handler that keeps per-connection request * statistics. This handler should be after the request codec in the * stack as it assumes messages are POJOs with request/responses. - * @param statsReceiver + * + * @param statsReceiver the [[StatsReceiver]] to which stats are reported */ private[finagle] class ChannelRequestStatsHandler(statsReceiver: StatsReceiver) extends ChannelInboundHandlerAdapter diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/Netty4ChannelInitializer.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/Netty4ChannelInitializer.scala index 0025a9da753..699cc92c03f 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/Netty4ChannelInitializer.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/channel/Netty4ChannelInitializer.scala @@ -47,7 +47,7 @@ private[netty4] class Netty4ChannelInitializer( (None, None) val channelSnooper = - if (params[Transport.Verbose].b) + if (params[Transport.Verbose].enabled) Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _))) else None