首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ObjectInputStream返回ObjectStreamClass

ObjectInputStream返回ObjectStreamClass
EN

Stack Overflow用户
提问于 2014-11-15 19:15:29
回答 1查看 605关注 0票数 0

我有点麻烦。下面的行在客户端第一次读取时执行得很好,但是在第二次读取时失败。

WorkerNode.java:72 Message task = (Message) in.readObject();

in是一个私有的ObjectInputStream。收到的例外情况如下

代码语言:javascript
复制
Exception in thread "main" java.lang.ClassCastException: java.io.ObjectStreamClass cannot be cast to parallelprogramming.Message
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
    at parallelprogramming.WorkerNode.computeTillEndOfWork(WorkerNode.java:139)
    at parallelprogramming.Worker.main(Worker.java:24)
Nov 15, 2014 11:07:15 PM parallelprogramming.WorkerNode receiveTask
SEVERE: null
java.io.StreamCorruptedException: invalid type code: 00
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
    at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59)
    at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

Exception in thread "WorkListener" java.lang.ClassCastException: parallelprogramming.MatMulTask cannot be cast to parallelprogramming.Message
    at parallelprogramming.WorkerNode.receiveTask(WorkerNode.java:72)
    at parallelprogramming.WorkerNode.lambda$startListeningForWork$0(WorkerNode.java:59)
    at parallelprogramming.WorkerNode$$Lambda$1/798154996.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)
Java Result: 1

消息具有以下结构。

代码语言:javascript
复制
public class Message implements IMessage{
    private final MessageType type;
    private final Object payload;

IMessage扩展了可序列化。我在客户端和服务器端都使用相同的ObjectInputStream和ObjectOutputStream。我试着四处寻找,但没有运气。还有人发现类似的东西吗?

EDIT2:向工作人员发送消息的代码:

代码语言:javascript
复制
private final Map<String, WorkerConn> nodes;
//nodes initialized in constructor
private void sendTaskToNode(ITask task, String node) {
        if(task == null){
            return;
        }        
        try{   
            Message msg = new Message(MessageType.task, task);
            nodes.get(node).sendObject(msg);

            nodes.get(node).incrementworkCount();

            System.out.println("Sent work to "+node);
        } catch (IOException ex) {
            Logger.getLogger(MasterNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

在WorkerConn中发送对象:

代码语言:javascript
复制
public WorkerConn(Socket socket, String name) throws IOException {
        this.socket = socket;
        this.out = new ObjectOutputStream(socket.getOutputStream());
        this.in = new ObjectInputStream(socket.getInputStream());
        this.name = name;
        workCount = 0;
    }

void sendObject(Message msg) throws IOException {
        out.writeObject(msg);
    }

接收员工信息的部分:

代码语言:javascript
复制
public void receiveTask() throws NotConnectedToMasterException {
        try {
            Message task = (Message) in.readObject();
            if(task.getMessageType() == MessageType.task){
                tasks.add((ITask) task.getPayload());
                System.out.println("Received Task");
            }else if(task.getMessageType() == MessageType.endOfWork){
                ITask t = new AbstractTask() {

                    @Override
                    public Object call() throws Exception {
                        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                    }
                };
                t.setDeathPill();
                tasks.add(t);
                System.out.println("added deathpill");
            }else{
                System.out.println("Received "+task.getMessageType());
            }
        } catch(EOFException ex) {
            return;
        } catch (IOException ex) {
            Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex);
            return;
        } catch (ClassNotFoundException ex) {
            Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

编辑:发现我的问题了。我创建了一个线程来侦听传入的任务,读取输入流,当任务队列为空时,我也从主线程读取相同的输入流。在员工节点中:

代码语言:javascript
复制
private void startListeningForWork(){
        workListener = new Thread(() -> {
            while(!master.isClosed()){
                try {
                    receiveTask();
                } catch (NotConnectedToMasterException ex) {
                    break;
                }
            }
        });
        workListener.setName("WorkListener");
        workListener.start();
    }

代码语言:javascript
复制
while(!task.isDeathPill()){
            try {                
                results.addToResult(task.call());
                sendACKtoMaster();                
                task = tasks.remove();
            } catch (Exception ex) {
                try {
                    receiveTask();
                } catch (NotConnectedToMasterException ex1) {
                    Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex1);
                    break;
                }
            }   
        }

这导致其中一个线程在另一个线程之前读取对象,并将事情搞砸。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-11-15 23:17:39

发现了我的问题。我创建了一个线程来侦听传入的任务,读取输入流,当任务队列为空时,我也从主线程读取相同的输入流。在员工节点中:

代码语言:javascript
复制
private void startListeningForWork(){
        workListener = new Thread(() -> {
            while(!master.isClosed()){
                try {
                    receiveTask();
                } catch (NotConnectedToMasterException ex) {
                    break;
                }
            }
        });
        workListener.setName("WorkListener");
        workListener.start();
    }

代码语言:javascript
复制
while(!task.isDeathPill()){
            try {                
                results.addToResult(task.call());
                sendACKtoMaster();                
                task = tasks.remove();
            } catch (Exception ex) {
                try {
                    receiveTask();
                } catch (NotConnectedToMasterException ex1) {
                    Logger.getLogger(WorkerNode.class.getName()).log(Level.SEVERE, null, ex1);
                    break;
                }
            }   
        }

这导致其中一个线程在另一个线程之前读取对象,并将事情搞砸。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26949684

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档