首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >实现hyper::client::Connect进行测试

实现hyper::client::Connect进行测试
EN

Stack Overflow用户
提问于 2017-08-16 19:19:32
回答 1查看 492关注 0票数 1

我正在尝试通过使用静态响应实现我自己的hyper::Client来测试一些使用hyper::client::Connect的代码。我已经弄清楚了类型,但是找不出运行时的问题,tokio-proto抱怨说是request / response mismatch。下面是我的代码的一个简化版本,它演示了这个失败:

代码语言:javascript
复制
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio_io;

use futures::{future, Future, Stream};
use std::str::from_utf8;
use std::io::Cursor;

struct Client<'a, C: 'a> {
    client: &'a hyper::Client<C>,
    url: &'a str,
}

impl<'a, C: hyper::client::Connect> Client<'a, C> {
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> {
        Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| {
            let body = Vec::new();
            res.body()
                .fold(body, |mut acc, chunk| {
                    acc.extend_from_slice(chunk.as_ref());
                    Ok::<_, hyper::Error>(acc)
                })
                .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap())))
        }))
    }
}

struct StaticConnector<'a> {
    body: &'a [u8],
}

impl<'a> StaticConnector<'a> {
    fn new(body: &'a [u8]) -> StaticConnector {
        StaticConnector { body: body }
    }
}

impl<'a> hyper::server::Service for StaticConnector<'a> {
    type Request = hyper::Uri;
    type Response = Cursor<Vec<u8>>;
    type Error = std::io::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, _: Self::Request) -> Self::Future {
        Box::new(future::ok(Cursor::new(self.body.to_vec())))
    }
}

fn main() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    // My StaticConnector for testing
    let hyper_client = hyper::Client::configure()
        .connector(StaticConnector::new(
            b"\
                 HTTP/1.1 200 OK\r\n\
                 Content-Length: 8\r\n\
                 \r\n\
                 Maldives\
                 ",
        ))
        .build(&handle);

    // Real Connector
    /*
    let hyper_client = hyper::Client::configure().build(&handle);
    */

    let client = Client {
        client: &hyper_client,
        url: "http://ifconfig.co/country",
    };
    let result = core.run(client.get()).unwrap();
    println!("{}", result);
}

Playground

我猜是我对IoCursor的使用在某种程度上是不完整的,但我无法进行调试并取得进展。一种想法是,hyper::Client对此Cursor的写入可能不会像预期的那样工作。也许我需要一个用于写入的sink和用于读取的静态内容的组合?所有的想法我都没能用来取得进展!

EN

回答 1

Stack Overflow用户

发布于 2017-08-25 01:00:06

原始代码不起作用的原因是因为阅读器端在客户端发送请求之前提供了响应,因此tokio-proto使用request / response mismatch出错。事实证明,修复方法并不简单,因为首先我们需要安排阅读器阻塞,或者更具体地说,使用std::io::ErrorKind::WouldBlock输出错误,以便向事件循环指示还没有任何内容,但不要将其视为EOF。此外,一旦我们得到指示请求已经发送的写操作,并且tokio-proto机器正在等待响应,我们就使用futures::task::current.notify来解除读操作的阻塞。下面是一个更新的实现,它可以像预期的那样工作:

代码语言:javascript
复制
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio_io;

use futures::{future, Future, Stream, task, Poll};
use std::str::from_utf8;
use std::io::{self, Cursor, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};

struct Client<'a, C: 'a> {
    client: &'a hyper::Client<C>,
    url: &'a str,
}

impl<'a, C: hyper::client::Connect> Client<'a, C> {
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> {
        Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| {
            let body = Vec::new();
            res.body()
                .fold(body, |mut acc, chunk| {
                    acc.extend_from_slice(chunk.as_ref());
                    Ok::<_, hyper::Error>(acc)
                })
                .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap())))
        }))
    }
}

struct StaticStream {
    wrote: bool,
    body: Cursor<Vec<u8>>,
}

impl Read for StaticStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if self.wrote {
            self.body.read(buf)
        } else {
            Err(io::ErrorKind::WouldBlock.into())
        }
    }
}

impl Write for StaticStream {
    fn write<'a>(&mut self, buf: &'a [u8]) -> io::Result<usize> {
        self.wrote = true;
        task::current().notify();
        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl AsyncRead for StaticStream {}

impl AsyncWrite for StaticStream {
    fn shutdown(&mut self) -> Poll<(), io::Error> {
        Ok(().into())
    }
}

struct StaticConnector<'a> {
    body: &'a [u8],
}

impl<'a> StaticConnector<'a> {
    fn new(body: &'a [u8]) -> StaticConnector {
        StaticConnector { body: body }
    }
}

impl<'a> hyper::server::Service for StaticConnector<'a> {
    type Request = hyper::Uri;
    type Response = StaticStream;
    type Error = std::io::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, _: Self::Request) -> Self::Future {
        Box::new(future::ok(StaticStream {
            wrote: false,
            body: Cursor::new(self.body.to_vec()),
        }))
    }
}

fn main() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    // My StaticConnector for testing
    let hyper_client = hyper::Client::configure()
        .connector(StaticConnector::new(
            b"\
                 HTTP/1.1 200 OK\r\n\
                 Content-Length: 8\r\n\
                 \r\n\
                 Maldives\
                 ",
        ))
        .build(&handle);

    // Real Connector
    /*
    let hyper_client = hyper::Client::configure().build(&handle);
    */

    let client = Client {
        client: &hyper_client,
        url: "http://ifconfig.co/country",
    };
    let result = core.run(client.get()).unwrap();
    println!("{}", result);
}

Playground

注意:此实现适用于简单的情况,但我没有测试更复杂的情况。例如,我不确定的一件事是,当请求/响应可能涉及多个读/写调用时,它们的行为会有多大。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45712156

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档