首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何用sttp读取响应作为Observable[String]

如何用sttp读取响应作为Observable[String]
EN

Stack Overflow用户
提问于 2019-02-04 14:44:42
回答 1查看 562关注 0票数 4

我用的是sttp客户端。我希望将响应作为字符串除以行(如Observable[String] )。

在这里,sttp流api:

代码语言:javascript
复制
import java.nio.ByteBuffer

import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable

implicit val sttpBackend = OkHttpMonixBackend()

val res: Task[Response[Observable[ByteBuffer]]] = sttp
  .post(uri"someUri")
  .response(asStream[Observable[ByteBuffer]])
  .send()

那我怎么能得到Observable[String]呢?

这里有一些想法:

1.有一个简单的方法可以通过线观察到split吗?

2.,或者也许我可以从响应中获得原始的InputStream,这样我就可以轻松地拆分它,但是我无法找到使用类似asStream[InputStream]的方法。

3.还是只使用http后端witout sttp层?

EN

回答 1

Stack Overflow用户

发布于 2019-02-05 12:20:37

您的基本问题是如何将Observable[ByteBuffer]转换为Observable[String],其中每个String都是一行,对吗?

您可以使用bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]方法。这种方法将缓冲可观测到的,直到选择器可观测发射一个元素。

我用Int做了一个小例子:

代码语言:javascript
复制
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes

buffered.foreach(println)

试试看!

当然,这有一个主要的缺点:底层可观察到的source将被评估两次。您可以通过修改上面的示例来看到这一点:

代码语言:javascript
复制
// Start writing your ScalaFiddle code here

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}  // <------------------

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

这个会把每个数字打印两次。

要解决这个问题,您必须将可观察到的source转换为可观察的热点:

代码语言:javascript
复制
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}
  .publish // <-----------------------------

// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

source.connect() // <---------------------------

试试看!

唯一需要做的是修改选择器,使其仅在遇到行提要时才发出项。

我建议先将Observable[ByteBuffer]分解为一个Observable[Byte] (使用flatMap)以避免头痛。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54518580

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档