首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >BufferedInputStream的交替形式

BufferedInputStream的交替形式
EN

Code Review用户
提问于 2012-07-11 14:04:48
回答 4查看 979关注 0票数 11

我在一个web应用程序中的TCP/IP套接字通信中遇到了问题,在这个web应用程序中,与应用程序交谈的服务器偶尔会发送突发的数据,使低级别流缓冲区溢出,导致数据丢失。

我想出的解决方案基本上是StreamGobbler的一种非线程形式,我一直称它为GreedyBufferedInputStream。基本思想是,对该类的任何调用都将包括耗尽源InputStream

我对Scala还很陌生,所以请告诉我如何改进这段代码。此外,在代码中有什么可以更有效地执行吗?性能是至关重要的。

代码语言:javascript
复制
package edu.stsci.util

import org.slf4j.Logger

import java.io.InputStream
import java.util

class GreedyBufferedInputStream extends InputStream {
  private var logger: Logger = null
  private var source: InputStream = null
  private val data: util.LinkedList[DataBlock] = new util.LinkedList[DataBlock]()
  private var currentBlock: DataBlock = null

  def this(logger: Logger, source: InputStream) {
    this()

    this.logger = logger
    this.source = source

    drainSource()
  }

  def this(source: InputStream) {
    this(null, source)
  }

  def read() = {
    prepareToRead()
    if (currentBlock == null) -1
    else currentBlock.read()
  }

  override def read(destination: Array[Byte], offset: Int, length: Int): Int = {
    prepareToRead()
    if (currentBlock == null) return -1

    var bytesRead = currentBlock.read(offset, length, destination)

    while (bytesRead < length) {
      prepareToRead()
      if (currentBlock == null) {// EOF
        return bytesRead
      }
      else {
        bytesRead += currentBlock.read((offset + bytesRead), (length - bytesRead), destination)
      }
    }

    bytesRead
  }

  override def read(destination: Array[Byte]) = read(destination, 0, destination.length)

  override def skip(length: Long): Long = {
    prepareToRead()
    if (currentBlock == null) return -1

    var bytesSkipped = currentBlock.skip(length)

    while (bytesSkipped < length) {
      prepareToRead()
      if (currentBlock == null) { // EOF
        return bytesSkipped
      }
      else {
        bytesSkipped += currentBlock.skip(length - bytesSkipped)
      }
    }

    bytesSkipped
  }

  override def close() {
    super.close()
  }

  override def available() = {
    drainSource()

    var result = 0

    if (currentBlock != null) result += currentBlock.available

    val it = data.iterator()
    while (it.hasNext) {
      val next = it.next()
      result += next.available
    }

    result
  }

  private def drainSource() {
    if (source == null) return // EOF

    if (source.available() > 0) {
      val raw = new Array[Byte](source.available())
      val length = source.read(raw)
      val block = new DataBlock(raw, length)
      data.add(block)
    }
  }

  private def prepareToRead() {
    drainSource()

    if (currentBlock != null) {
      val done = currentBlock.isDone
      if (done) currentBlock = null
      else return // we have a current block
    }

    if (data.isEmpty) { // no choice but to block
      if (source == null) return // have reached EOF

      val raw = new Array[Byte](1024)
      val length = source.read(raw)
      if (length < 0) {
        source = null
        return
      }
      currentBlock = new DataBlock(raw, length)

    }
    else currentBlock = data.remove()
  }
}

class DataBlock(data: Array[Byte], length: Int) {

  var readPos = -1

  def isDone: Boolean = (readPos >= length)

  def available: Int = {
    if (readPos < 0) length
    else (length - readPos)
  }

  def read(): Int = {
    if (readPos < 0) readPos = 0

    val raw = data(readPos)
    readPos += 1
    raw & 0xff
  }

  def read(offset: Int, length: Int, destination: Array[Byte]): Int = {
    if (readPos < 0) readPos = 0

    var readCount = {
      if (available >= length) length
      else available
    }

    Array.copy(data, readPos, destination, offset, readCount)
    readPos += readCount

    readCount
  }

  def skip(length: Long): Long = {
    if (readPos < 0) readPos = 0

    var readCount = {
      if (available >= length) length
      else available
    }

    readPos += readCount.toInt

    readCount
  }
}
EN

回答 4

Code Review用户

回答已采纳

发布于 2012-07-18 15:35:59

我认为LinkedList的使用不是最优的。在每次调用drainSource时,您都可能会添加一个新的DataBlock ant --链接列表的末尾;这具有性能O(n) (列表的长度)。我建议使用Vector而不是LinkedList

另一点(但这不是Scala特定的):为什么要用-1来初始化类readPos中的字段DataBlock?由于这一选择,你有很多特殊情况。我将按以下方式实现类DataBlock

代码语言:javascript
复制
class DataBlock(data: Array[Byte], length: Int) {
  require(length > 0)

  private var readPos = 0

  def isDone: Boolean = readPos >= length

  def available: Int = length - readPos

  def read(): Int = try { data(readPos) & 0xff } finally { readPos += 1 }

  def read(offset: Int, length: Int, destination: Array[Byte]): Int = {
    val readCount = length min available
    Array.copy(data, readPos, destination, offset, readCount)
    readPos += readCount
    readCount
  }

  def skip(length: Long): Long = {
    val readCount = length min available
    readPos += readCount.toInt
    readCount
  }
}
票数 3
EN

Code Review用户

发布于 2012-07-18 15:22:41

只是一些随机的反馈,实际上并不是作为一个整体来处理代码。

首先,我将按照以下方式处理这个类的构造:

代码语言:javascript
复制
class GreedyBufferedInputStream(logger: Logger = null, initialSource: InputStream)
extends InputStream {

  private var source: InputStream = initialSource
  private val data: util.LinkedList[DataBlock] = new util.LinkedList()
  private var currentBlock: DataBlock = null

  drainSource()

这消除了一个var和两个辅助构造函数。

我将重写available()方法如下:

代码语言:javascript
复制
  override def available() = {
    drainSource()
    import collection.JavaConverters._
    data.asScala.map(_.available).sum +
      Option(currentBlock).map(_.available).getOrElse(0)
  }

我会用它来代替:

代码语言:javascript
复制
var readCount = {
  if (available >= length) length
  else available
}

使用

代码语言:javascript
复制
val readCount = length min available
票数 3
EN

Code Review用户

发布于 2012-07-16 10:43:53

准确地说,我对您的问题没有真正的解决方案,但如果我考虑一下,应该有几种可能找到一个:首先,您可以考虑限制应用程序发送的数据。限制可能意味着应用程序端的双向通信和/或发送缓冲区。因此,您可以调整发送的数据,并可能压缩它以提高效率等。第二种解决方案可以是一个多流缓冲区池,在数据突发的情况下可以选择它来处理所有不可访问的数据。因为我是Python程序员,所以我不能告诉您Scala代码是否可以改进。我希望我能帮上忙。

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/13536

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档