首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SocketChannel问题

SocketChannel问题
EN

Stack Overflow用户
提问于 2015-01-27 15:33:07
回答 1查看 1.4K关注 0票数 0

我编写了一个应用程序,它通过TCP与SocketChannel连接到服务器,但我有两个问题:

  1. 第一种是次要的--有时出于某种未知的原因,我会发送连接的消息和
  2. 第二个是关键--应用程序定期停止发送/接收消息。

知道是怎么回事吗?

代码语言:javascript
复制
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketSelectorWorker extends Thread {
  private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
  private ExecutorService executorService = Executors.newFixedThreadPool(3);
  private final Queue<byte[]> messages;
  private Selector selector;

  public SocketSelectorWorker(Queue messages, Selector selector) {
    super();
    this.selector = selector;
    this.session = session;
    this.messages = messages;
  }

  @Override
  public void run() {
    super.run();
    while (isConnectionAlive()) {
      try {
        // Wait for an event
        selector.select();
      } catch (IOException e) {
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
        session.closeConnection();
        break;
      }
      handleSelectorkeys(selector.selectedKeys());
    }
    executorService.shutdown();
    log.debug("worker stopped");
  }

  private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
    for (SelectionKey selKey : selector.selectedKeys()) {
      selector.selectedKeys().remove(selKey);
      try {
        processSelectionKey(selKey);
      } catch (IOException e) {
        // Handle error with channel and unregister
        selKey.cancel();
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
      }
    }
  }

  public void processSelectionKey(SelectionKey selKey) throws IOException {

    // Since the ready operations are cumulative,
    // need to check readiness for each operation
    if (selKey.isValid() && selKey.isConnectable()) {
      log.debug("connectable");
      // Get channel with connection request
      SocketChannel sChannel = (SocketChannel) selKey.channel();

      boolean success = sChannel.finishConnect();
      if (!success) {
        // An error occurred; handle it
        log.error("Error on finish");
        // Unregister the channel with this selector
        selKey.cancel();
      }
    }

    if (selKey.isValid() && selKey.isReadable()) {
      log.debug("readable");
      readMessage(selKey);
    }

    if (selKey.isValid() && selKey.isWritable()) {
      log.debug("writable");
      writeMessage(selKey);
    }

    if (selKey.isValid() && selKey.isAcceptable()) {
      log.debug("Acceptable");
    }

  }

  private void writeMessage(SelectionKey selKey) throws IOException {
    byte[] message = messages.poll();
    if (message == null) {
      return;
    }

    // Get channel that's ready for more bytes
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Writing to a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.

    ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);

    // try {
    // Fill the buffer with the bytes to write;
    // see Putting Bytes into a ByteBuffer
    // buf.put((byte)0xFF);

    buf.clear();
    buf.put(new byte[] { 0x02 });
    buf.put(message);
    buf.put(new byte[] { 0x03 });
    // Prepare the buffer for reading by the socket
    buf.flip();

    // Write bytes
    int numBytesWritten = socketChannel.write(buf);
    log.debug("Written: {}", numBytesWritten);

    while (buf.hasRemaining()) {
      numBytesWritten = socketChannel.write(buf);
      log.debug("Written remining: {}", numBytesWritten);
    }
  }

  private void readMessage(SelectionKey selKey) throws IOException {
    // Get channel with bytes to read
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Reading from a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.
    ByteBuffer buf = ByteBuffer.allocateDirect(2048);
    Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
    CharsetDecoder decoder = charset.newDecoder();

    // try {
    // Clear the buffer and read bytes from socket
    buf.clear();
    int numBytesRead = socketChannel.read(buf);

    if (numBytesRead == -1) {
      // No more bytes can be read from the channel
      // socketChannel.close();
      return;
    }
    log.debug("Read bytes: {}", numBytesRead);
    // To read the bytes, flip the buffer
    buf.flip();

    String result = decoder.decode(buf).toString();

    log.debug("Read string: {}", result);
    //processMessage(result.getBytes());
  }

}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-01-27 15:42:00

  1. TCP没有消息边界。它是一个字节流协议.任何信息的界限都取决于你。
  2. 您没有正确地处理选择键。您必须在迭代时通过迭代器,而不是通过集合删除。这意味着你不能使用增强的for-循环。可能你在跳钥匙。
  3. 当您从read()获得-1时,您必须关闭频道.
  4. 当您得到一个IOException时,只取消键是不够的。您应该关闭通道,其中NB取消他们自动键。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28174128

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档