首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Rust中使用tokio_proto和tokio_service流式传输消息

如何在Rust中使用tokio_proto和tokio_service流式传输消息
EN

Stack Overflow用户
提问于 2017-10-24 07:42:57
回答 1查看 146关注 0票数 1

在我的SmtpService中,我想要立即发送响应头,并在处理完成时发送正文。这应该遵循SMTP交换:

代码语言:javascript
复制
C: DATA
S: 354 Start mail input
C: ... data ...
C: ... more ...
C: .
S: 250 Ok

我在playground里有这么多

代码语言:javascript
复制
#[macro_use] 
extern crate log;
extern crate bytes;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;

use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream};
use futures::sync::oneshot;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;

#[derive(Eq, PartialEq, Debug)]
enum SmtpCommand {
    Data,    
}
#[derive(Eq, PartialEq, Debug)]
enum SmtpReply {
    OkInfo,
    StartMailInputChallenge,
    TransactionFailure,
    CommandNotImplementedFailure
}

pub struct SmtpService;

impl Service for SmtpService {
    // For non-streaming protocols, service errors are always io::Error
    type Error = io::Error;
    // These types must match the corresponding protocol types:
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;

    // The future for computing the response; box it for simplicity.
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    // Produce a future for computing a response from a request.
    fn call(&self, command: Self::Request) -> Self::Future {

        info!("Received {:?}", command);

        match command {
            Message::WithBody(cmd, cmd_body) => {
                match cmd {
                    SmtpCommand::Data => {
                        // start => SmtpReply::StartMailInputChallenge
                        // ok => SmtpReply::OkInfo
                        // err => SmtpReply::TransactionFailure

                        let (tx, rx) = oneshot::channel();

                        let fut = cmd_body
                                .inspect(|chunk| info!("data: {:?}", chunk))
                                .map(|_| tx.send(SmtpReply::OkInfo))
                                .map_err(|_| tx.send(SmtpReply::TransactionFailure))
                                .map(|_| Body::from(rx));

                        // ??? How to wire the fut future into the response message?

                        let msg = Message::WithBody(SmtpReply::StartMailInputChallenge, fut);

                        Box::new(future::ok(msg)) as Self::Future
                    }
                    _ => Box::new(future::ok(Message::WithoutBody(
                        SmtpReply::CommandNotImplementedFailure,
                    ))),
                }
            }
            Message::WithoutBody(cmd) => {
                Box::new(future::ok(Message::WithoutBody(match cmd {
                    _ => SmtpReply::CommandNotImplementedFailure,
                })))
            }
        }
    }
}

fn main() {
    println!("Hello, world!");
}

我想知道这是否可能,或者我是否需要生成两个消息-一个用于数据,另一个用于实际的字节流?

我得到的错误显示消息结构不匹配;body/future显然不合适:

代码语言:javascript
复制
error[E0271]: type mismatch resolving `<futures::FutureResult<tokio_proto::streaming::Message<SmtpReply, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>, std::io::Error> as futures::Future>::Item == tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
  --> src/main.rs:66:25
   |
66 |                         Box::new(future::ok(msg)) as Self::Future
   |                         ^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found struct `tokio_proto::streaming::Body`
   |
   = note: expected type `tokio_proto::streaming::Message<_, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>`
              found type `tokio_proto::streaming::Message<_, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
   = note: required for the cast to the object type `futures::Future<Error=std::io::Error, Item=tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>>`
EN

回答 1

Stack Overflow用户

发布于 2017-10-25 18:09:32

当返回Response时,call返回的未来就结束了;您不能在该将来“驱动”进一步的操作。

这意味着您需要派生一个新任务来生成(流式)正文;为此,您需要来自tokio_coreHandle

此外,还需要从mpsc通道创建Body,而不是从oneshot创建;您可以发送许多正文块。

Playground

代码语言:javascript
复制
#[macro_use] 
extern crate log;
extern crate bytes;
extern crate tokio_core;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;

use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream, Sink};
use futures::sync::mpsc;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;

#[derive(Eq, PartialEq, Debug)]
pub enum SmtpCommand {
    Data,    
}
#[derive(Eq, PartialEq, Debug)]
pub enum SmtpReply {
    OkInfo,
    StartMailInputChallenge,
    TransactionFailure,
    CommandNotImplementedFailure
}

pub struct SmtpService {
    handle: tokio_core::reactor::Handle,
}

impl Service for SmtpService {
    // For non-streaming protocols, service errors are always io::Error
    type Error = io::Error;
    // These types must match the corresponding protocol types:
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;

    // The future for computing the response; box it for simplicity.
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    // Produce a future for computing a response from a request.
    fn call(&self, command: Self::Request) -> Self::Future {

        info!("Received {:?}", command);

        match command {
            Message::WithBody(cmd, cmd_body) => {
                match cmd {
                    SmtpCommand::Data => {
                        // start => SmtpReply::StartMailInputChallenge
                        // ok => SmtpReply::OkInfo
                        // err => SmtpReply::TransactionFailure

                        let (tx, rx) = mpsc::channel::<io::Result<SmtpReply>>(1);

                        let fut = cmd_body
                            // read cmd stream; for_each results in a Future,
                            // which completes when the stream is finished
                            .for_each(|chunk| {
                                info!("data: {:?}", chunk);
                                Ok(())
                            })
                            // now send the result body
                            .then(move |r| match r {
                                Ok(_) => tx.send(Ok(SmtpReply::OkInfo)),
                                Err(_) => tx.send(Ok(SmtpReply::TransactionFailure)),
                            })
                            // could send further body messages:
                            // .and_then(|tx| tx.send(...))
                            // ignore any send errors; spawn needs a future with
                            // Item=() and Error=().
                            .then(|_| Ok(()))
                        ;

                        self.handle.spawn(fut);

                        let body : Body<SmtpReply, Self::Error> = Body::from(rx);
                        let msg : Self::Response = Message::WithBody(SmtpReply::StartMailInputChallenge, body);

                        Box::new(future::ok(msg)) as Self::Future
                    }
                    _ => Box::new(future::ok(Message::WithoutBody(
                        SmtpReply::CommandNotImplementedFailure,
                    ))),
                }
            }
            Message::WithoutBody(cmd) => {
                Box::new(future::ok(Message::WithoutBody(match cmd {
                    _ => SmtpReply::CommandNotImplementedFailure,
                })))
            }
        }
    }
}

fn main() {
    println!("Hello, world!");
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46900063

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档