下面是飞鹰协议的一个相当幼稚的实现,它使用记分库进行二进制编码和解码。我将假设如下SBT设置:
scalaVersion := "2.10.4"
resolvers += Resolver.sonatypeRepo("snapshots")
libraryDependencies ++= Seq(
"com.twitter" %% "finagle-core" % "6.20.0",
"org.typelevel" %% "scodec-core" % "1.3.0-SNAPSHOT"
)首先,我们需要一个计分卡的导入,贯穿整个过程:
import scodec.Codec接下来是从scodec的Codec到奈蒂编码器和解码器的转换:
trait CodecConversions {
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.oneone.{OneToOneDecoder, OneToOneEncoder}
import scodec.bits.BitVector
/**
* Converts an scodec codec into a Netty encoder.
*/
protected def encoder[A: Codec] = new OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
ChannelBuffers.wrappedBuffer(
Codec.encodeValid(msg.asInstanceOf[A]).toByteBuffer
)
}
/**
* Converts an scodec codec into a Netty decoder.
*/
protected def decoder[A: Codec] = new OneToOneDecoder {
override def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Object) =
msg match {
case cb: ChannelBuffer =>
Codec.decodeValidValue[A](BitVector(cb.toByteBuffer)).asInstanceOf[Object]
case other => other
}
}
}然后是通道管道和编解码工厂:
trait Factories { this: CodecConversions =>
import com.twitter.finagle.{Codec => FinagleCodec, CodecFactory}
import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
/**
* Creates a Netty channel pipeline factory given input and output types.
*/
private[this] def pipeline[I: Codec, O: Codec] = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("encoder", encoder[I])
pipeline.addLast("decoder", decoder[O])
pipeline
}
}
/**
* Creates a Finagle codec factory given input and output types.
*/
protected def codecFactory[I: Codec, O: Codec] = new CodecFactory[I, O] {
def server = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[O, I] }
}
def client = Function.const {
new FinagleCodec[I, O] { def pipelineFactory = pipeline[I, O] }
}
}
}然后是创建Finagle客户端和服务器的部分:
object Finagler extends Factories with CodecConversions {
import com.twitter.conversions.time._
import com.twitter.finagle.Service
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.util.{Duration, Future}
import java.net.InetSocketAddress
/**
* Creates a Finagle server from a function given that we have scodec codecs
* for both the input and output types.
*/
def server[I, O](port: Int)(f: I => Future[O])(implicit ic: Codec[I], oc: Codec[O]) =
ServerBuilder()
.name("server")
.codec(codecFactory[I, O])
.bindTo(new InetSocketAddress(port))
.build(new Service[I, O] { def apply(i: I) = f(i) })
/**
* Creates a Finagle client given input and output types with scodec codecs.
*/
def client[I, O](host: String, timeout: Duration = 1.second)
(implicit ic: Codec[I], oc: Codec[O]) =
ClientBuilder()
.name("client")
.codec(codecFactory[I, O])
.hosts(host)
.hostConnectionLimit(1)
.timeout(timeout)
.build()
}最后的用法如下所示:
import scodec._, codecs._
import com.twitter.util.Future
case class Point(x: Double, y: Double)
implicit val pointCodec = (double :: double).as[Point]
implicit val pointsCodec = list(pointCodec)
def center(points: List[Point]) = {
val Point(x, y) = points.reduce[Point] {
case (Point(x1, y1), Point(x2, y2)) => Point(x1 + x2, y1 + y2)
}
Point(x / points.size, y / points.size)
}
val server = Finagler.server(9000)(center _ andThen Future.value _)然后我们可以创建一个客户端:
val client = Finagler.client[List[Point], Point]("localhost:9000")并称之为:
client(List(Point(0, 1), Point(1, 1), Point(1, 0), Point(0, 0))).onSuccess(println)它将印出我们所期望的:
scala> Point(0.5,0.5)因此,它似乎适用于这个简单的用例,但我不是Netty专家,我很好奇这种天真的方法(简单地通过一个OneToOneEncoder,等等)。会给我带来麻烦的。
发布于 2014-08-27 05:35:18
在解码消息(即将接收到的字节流转换为对象)时,您需要记住,单个套接字写入并不总是转换为单个套接字读取。
例如,假设您在一次写尝试中编写了4个字节。在读取器方面,在每个缓冲区包含小于-4字节数据的情况下,可以通过2次读取尝试来读取。
因此,当服务器开始处理更多的负载时,使用OneToOneDecoder将无法工作。您需要使用其他解码器类,如FrameDecoder和ReplayingDecoder。
有关更多信息,请阅读用户指南中的“处理基于流的传输”部分。
https://codereview.stackexchange.com/questions/60546
复制相似问题