我在一个web应用程序中的TCP/IP套接字通信中遇到了问题,在这个web应用程序中,与应用程序交谈的服务器偶尔会发送突发的数据,使低级别流缓冲区溢出,导致数据丢失。
我想出的解决方案基本上是StreamGobbler的一种非线程形式,我一直称它为GreedyBufferedInputStream。基本思想是,对该类的任何调用都将包括耗尽源InputStream。
我对Scala还很陌生,所以请告诉我如何改进这段代码。此外,在代码中有什么可以更有效地执行吗?性能是至关重要的。
package edu.stsci.util
import org.slf4j.Logger
import java.io.InputStream
import java.util
class GreedyBufferedInputStream extends InputStream {
private var logger: Logger = null
private var source: InputStream = null
private val data: util.LinkedList[DataBlock] = new util.LinkedList[DataBlock]()
private var currentBlock: DataBlock = null
def this(logger: Logger, source: InputStream) {
this()
this.logger = logger
this.source = source
drainSource()
}
def this(source: InputStream) {
this(null, source)
}
def read() = {
prepareToRead()
if (currentBlock == null) -1
else currentBlock.read()
}
override def read(destination: Array[Byte], offset: Int, length: Int): Int = {
prepareToRead()
if (currentBlock == null) return -1
var bytesRead = currentBlock.read(offset, length, destination)
while (bytesRead < length) {
prepareToRead()
if (currentBlock == null) {// EOF
return bytesRead
}
else {
bytesRead += currentBlock.read((offset + bytesRead), (length - bytesRead), destination)
}
}
bytesRead
}
override def read(destination: Array[Byte]) = read(destination, 0, destination.length)
override def skip(length: Long): Long = {
prepareToRead()
if (currentBlock == null) return -1
var bytesSkipped = currentBlock.skip(length)
while (bytesSkipped < length) {
prepareToRead()
if (currentBlock == null) { // EOF
return bytesSkipped
}
else {
bytesSkipped += currentBlock.skip(length - bytesSkipped)
}
}
bytesSkipped
}
override def close() {
super.close()
}
override def available() = {
drainSource()
var result = 0
if (currentBlock != null) result += currentBlock.available
val it = data.iterator()
while (it.hasNext) {
val next = it.next()
result += next.available
}
result
}
private def drainSource() {
if (source == null) return // EOF
if (source.available() > 0) {
val raw = new Array[Byte](source.available())
val length = source.read(raw)
val block = new DataBlock(raw, length)
data.add(block)
}
}
private def prepareToRead() {
drainSource()
if (currentBlock != null) {
val done = currentBlock.isDone
if (done) currentBlock = null
else return // we have a current block
}
if (data.isEmpty) { // no choice but to block
if (source == null) return // have reached EOF
val raw = new Array[Byte](1024)
val length = source.read(raw)
if (length < 0) {
source = null
return
}
currentBlock = new DataBlock(raw, length)
}
else currentBlock = data.remove()
}
}
class DataBlock(data: Array[Byte], length: Int) {
var readPos = -1
def isDone: Boolean = (readPos >= length)
def available: Int = {
if (readPos < 0) length
else (length - readPos)
}
def read(): Int = {
if (readPos < 0) readPos = 0
val raw = data(readPos)
readPos += 1
raw & 0xff
}
def read(offset: Int, length: Int, destination: Array[Byte]): Int = {
if (readPos < 0) readPos = 0
var readCount = {
if (available >= length) length
else available
}
Array.copy(data, readPos, destination, offset, readCount)
readPos += readCount
readCount
}
def skip(length: Long): Long = {
if (readPos < 0) readPos = 0
var readCount = {
if (available >= length) length
else available
}
readPos += readCount.toInt
readCount
}
}发布于 2012-07-18 15:35:59
我认为LinkedList的使用不是最优的。在每次调用drainSource时,您都可能会添加一个新的DataBlock ant --链接列表的末尾;这具有性能O(n) (列表的长度)。我建议使用Vector而不是LinkedList。
另一点(但这不是Scala特定的):为什么要用-1来初始化类readPos中的字段DataBlock?由于这一选择,你有很多特殊情况。我将按以下方式实现类DataBlock:
class DataBlock(data: Array[Byte], length: Int) {
require(length > 0)
private var readPos = 0
def isDone: Boolean = readPos >= length
def available: Int = length - readPos
def read(): Int = try { data(readPos) & 0xff } finally { readPos += 1 }
def read(offset: Int, length: Int, destination: Array[Byte]): Int = {
val readCount = length min available
Array.copy(data, readPos, destination, offset, readCount)
readPos += readCount
readCount
}
def skip(length: Long): Long = {
val readCount = length min available
readPos += readCount.toInt
readCount
}
}发布于 2012-07-18 15:22:41
只是一些随机的反馈,实际上并不是作为一个整体来处理代码。
首先,我将按照以下方式处理这个类的构造:
class GreedyBufferedInputStream(logger: Logger = null, initialSource: InputStream)
extends InputStream {
private var source: InputStream = initialSource
private val data: util.LinkedList[DataBlock] = new util.LinkedList()
private var currentBlock: DataBlock = null
drainSource()这消除了一个var和两个辅助构造函数。
我将重写available()方法如下:
override def available() = {
drainSource()
import collection.JavaConverters._
data.asScala.map(_.available).sum +
Option(currentBlock).map(_.available).getOrElse(0)
}我会用它来代替:
var readCount = {
if (available >= length) length
else available
}使用
val readCount = length min available发布于 2012-07-16 10:43:53
准确地说,我对您的问题没有真正的解决方案,但如果我考虑一下,应该有几种可能找到一个:首先,您可以考虑限制应用程序发送的数据。限制可能意味着应用程序端的双向通信和/或发送缓冲区。因此,您可以调整发送的数据,并可能压缩它以提高效率等。第二种解决方案可以是一个多流缓冲区池,在数据突发的情况下可以选择它来处理所有不可访问的数据。因为我是Python程序员,所以我不能告诉您Scala代码是否可以改进。我希望我能帮上忙。
https://codereview.stackexchange.com/questions/13536
复制相似问题