首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Iron and Hyper中重用hyper::client和tokio_core

在Iron and Hyper中重用hyper::client和tokio_core
EN

Stack Overflow用户
提问于 2017-07-15 05:21:59
回答 1查看 425关注 0票数 1

我在一个Iron处理程序中发出一个客户端请求。如何重用Tokio的Core和Hyper的Client?我使用的是hyper 0.11.0和tokio-core 0.1。

代码语言:javascript
复制
fn get_result(req: &mut Request) -> IronResult<Response> {
    let mut payload = String::new();
    req.body.read_to_string(&mut payload).unwrap();

    // can we re-use core and client somehow. Making then global with lazy_static!() does not work.
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://host:port/getResult".parse().unwrap();
    let mut req: hyper::Request = hyper::Request::new(hyper::Method::Post, uri);
    req.headers_mut().set(ContentType::json());
    req.headers_mut().set(ContentLength(payload.len() as u64));
    req.set_body(payload);

    let mut results: Vec<RequestFormat> = Vec::new();
    let work = client.request(req).and_then(|res| {
        res.body().for_each(|chunk| {
            let re: ResultFormat = serde_json::from_slice(&chunk).unwrap();
            results.push(re);
            Ok(())
        })
    });

    Ok(Response::with(
        (iron::status::Ok, serde_json::to_string(&results).unwrap()),
    ))
}
EN

回答 1

Stack Overflow用户

发布于 2017-07-18 01:14:58

我创建了一个包装客户端和核心的Downloader类。下面是代码片段。

代码语言:javascript
复制
use hyper;
use tokio_core;
use std::sync::{mpsc};
use std::thread;
use futures::Future;
use futures::stream::Stream;
use std::time::Duration;
use std::io::{self, Write};
use time::precise_time_ns;
use hyper::Client;

pub struct Downloader {
    sender : mpsc::Sender<(hyper::Request, mpsc::Sender<hyper::Chunk>)>,
    #[allow(dead_code)]
    tr : thread::JoinHandle<hyper::Request>,
}
impl Downloader {
    pub fn new() -> Downloader {
        let (sender, receiver) = mpsc::channel::<(hyper::Request,mpsc::Sender<hyper::Chunk>)>();
        let tr = thread::spawn(move||{
            let mut core = tokio_core::reactor::Core::new().unwrap();
            let client =  Client::new(&core.handle());
            loop {
                let (req , sender) = receiver.recv().unwrap();
                let begin = precise_time_ns();
                let work = client.request(req)   
                .and_then(|res| {
                    res.body().for_each(|chunk| {

                        sender.send(chunk)
                        .map_err(|e|{
                            //io::sink().write(&chunk).unwrap();
                            io::Error::new(io::ErrorKind::Other, e)
                        })?;
                        Ok(())
                    })
                    //sender.close();
                //res.body().concat2()
                });
            core.run(work).map_err(|e|{println!("Error Is {:?}", e);});
           //This time prints same as all request processing time. 
            debug!("Time taken In Download {:?} ms", (precise_time_ns() - begin) / 1000000);
            }
        });
        Downloader{sender,
                tr,
        }
    }

    pub fn download(&self, req : hyper::Request, results:  mpsc::Sender<Vec<u8>>){
        self.sender.send((req, results)).unwrap();
    }
}

现在这个类的客户端可以有一个静态变量了。

代码语言:javascript
复制
lazy_static!{
    static ref DOWNLOADER : Mutex<downloader::Downloader> = 
Mutex::new(downloader::Downloader::new());
}
let (sender, receiver) = mpsc::channel();
DOWNLOADER.lock().unwrap().download(payload, sender);

然后通过接收通道读取。可能需要使用sender.drop()关闭发送方通道

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45111783

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档