首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >编写一个新的Finagle协议

编写一个新的Finagle协议
EN

Code Review用户
提问于 2014-08-20 04:31:48
回答 1查看 744关注 0票数 4

下面是飞鹰协议的一个相当幼稚的实现,它使用记分库进行二进制编码和解码。我将假设如下SBT设置:

代码语言:javascript
复制
scalaVersion := "2.10.4"

resolvers += Resolver.sonatypeRepo("snapshots")

libraryDependencies ++= Seq(
  "com.twitter" %% "finagle-core" % "6.20.0",
  "org.typelevel" %% "scodec-core" % "1.3.0-SNAPSHOT"
)

首先,我们需要一个计分卡的导入,贯穿整个过程:

代码语言:javascript
复制
import scodec.Codec

接下来是从scodec的Codec奈蒂编码器和解码器的转换:

代码语言:javascript
复制
trait CodecConversions {
  import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
  import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
  import org.jboss.netty.handler.codec.oneone.{OneToOneDecoder, OneToOneEncoder}
  import scodec.bits.BitVector

  /**
   * Converts an scodec codec into a Netty encoder.
   */
  protected def encoder[A: Codec] = new OneToOneEncoder {
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
      ChannelBuffers.wrappedBuffer(
        Codec.encodeValid(msg.asInstanceOf[A]).toByteBuffer
      )
  }

  /**
   * Converts an scodec codec into a Netty decoder.
   */
  protected def decoder[A: Codec] = new OneToOneDecoder {
    override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
      msg match {
        case cb: ChannelBuffer =>
          Codec.decodeValidValue[A](BitVector(cb.toByteBuffer)).asInstanceOf[Object]
        case other => other
      }
  }
}

然后是通道管道和编解码工厂:

代码语言:javascript
复制
trait Factories { this: CodecConversions =>
  import com.twitter.finagle.{Codec => FinagleCodec, CodecFactory}
  import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}

  /**
   * Creates a Netty channel pipeline factory given input and output types.
   */
  private[this] def pipeline[I: Codec, O: Codec] = new ChannelPipelineFactory {
    def getPipeline = {
      val pipeline = Channels.pipeline()
      pipeline.addLast("encoder", encoder[I])
      pipeline.addLast("decoder", decoder[O])
      pipeline
    }
  }

  /**
   * Creates a Finagle codec factory given input and output types.
   */
  protected def codecFactory[I: Codec, O: Codec] = new CodecFactory[I, O] {
    def server = Function.const {
      new FinagleCodec[I, O] { def pipelineFactory = pipeline[O, I] }
    }

    def client = Function.const {
      new FinagleCodec[I, O] { def pipelineFactory = pipeline[I, O] }
    }
  }
}

然后是创建Finagle客户端和服务器的部分:

代码语言:javascript
复制
object Finagler extends Factories with CodecConversions {
  import com.twitter.conversions.time._
  import com.twitter.finagle.Service
  import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
  import com.twitter.util.{Duration, Future}
  import java.net.InetSocketAddress

  /**
   * Creates a Finagle server from a function given that we have scodec codecs
   * for both the input and output types.
   */
  def server[I, O](port: Int)(f: I => Future[O])(implicit ic: Codec[I], oc: Codec[O]) =
    ServerBuilder()
      .name("server")
      .codec(codecFactory[I, O])
      .bindTo(new InetSocketAddress(port))
      .build(new Service[I, O] { def apply(i: I) = f(i) })

  /**
   * Creates a Finagle client given input and output types with scodec codecs.
   */
  def client[I, O](host: String, timeout: Duration = 1.second)
    (implicit ic: Codec[I], oc: Codec[O]) =
      ClientBuilder()
        .name("client")
        .codec(codecFactory[I, O])
        .hosts(host)
        .hostConnectionLimit(1)
        .timeout(timeout)
        .build()
}

最后的用法如下所示:

代码语言:javascript
复制
import scodec._, codecs._
import com.twitter.util.Future

case class Point(x: Double, y: Double)

implicit val pointCodec = (double :: double).as[Point]
implicit val pointsCodec = list(pointCodec)

def center(points: List[Point]) = {
  val Point(x, y) = points.reduce[Point] {
    case (Point(x1, y1), Point(x2, y2)) => Point(x1 + x2, y1 + y2)
  }

  Point(x / points.size, y / points.size)
}

val server = Finagler.server(9000)(center _ andThen Future.value _)

然后我们可以创建一个客户端:

代码语言:javascript
复制
val client = Finagler.client[List[Point], Point]("localhost:9000")

并称之为:

代码语言:javascript
复制
client(List(Point(0, 1), Point(1, 1), Point(1, 0), Point(0, 0))).onSuccess(println)

它将印出我们所期望的:

代码语言:javascript
复制
scala> Point(0.5,0.5)

因此,它似乎适用于这个简单的用例,但我不是Netty专家,我很好奇这种天真的方法(简单地通过一个OneToOneEncoder,等等)。会给我带来麻烦的。

EN

回答 1

Code Review用户

回答已采纳

发布于 2014-08-27 05:35:18

在解码消息(即将接收到的字节流转换为对象)时,您需要记住,单个套接字写入并不总是转换为单个套接字读取。

例如,假设您在一次写尝试中编写了4个字节。在读取器方面,在每个缓冲区包含小于-4字节数据的情况下,可以通过2次读取尝试来读取。

因此,当服务器开始处理更多的负载时,使用OneToOneDecoder将无法工作。您需要使用其他解码器类,如FrameDecoderReplayingDecoder

有关更多信息,请阅读用户指南中的“处理基于流的传输”部分。

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/60546

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档