发布于 2019-06-10 11:09:06
发布于 2021-10-12 14:40:04
没有必要将Retrofit和SSE混为一谈。使用改造获得一个输入流,然后查找(或写入)一个inputstream解析器,该解析器可以对SSE事件进行分块。
在改造中我有这样的想法:
public interface NotificationAPI {
@GET("notifications/sse")
Call<InputStream> getNotificationsStream(@retrofit2.http.Query("Last-Event-ID") String lastEventId);
}我为InputStream编写了一个快速转换器工厂
public class InputStreamConverterFactory extends Converter.Factory {
private static class InputStreamConverter implements Converter<ResponseBody, InputStream> {
@Nullable
@Override
public InputStream convert(ResponseBody value) throws IOException {
return value.byteStream();
}
}
@Override
public @Nullable
Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
if (type.equals(InputStream.class)) {
return new InputStreamConverter();
}
return null;
}
}我的客户代码如下所示:
var cinputStream = api.getNotificationsStream(null);
var inputStream = cinputStream.execute().body();
try(var sseStream = new MySSEStreamParser(inputStream)) {
//handle the stream here...
}有一个OkHttp SSE解析器,您可能可以使用它。然而:
发布于 2021-12-18 12:35:03
我知道这是个老问题。但是我没有找到完整的例子,现在我试着用我的代码来提供它。我们只使用retrofit 和 coroutines
1.在retrofit API interface中需要添加代码。注意我们使用的@Streaming和返回类型的Call<ResponseBody>
@POST("/v1/calc/group-prices")
@Streaming
fun calculateGroupPrices(@Body listOptions: List<GroupCalculatorOptions>): Call<ResponseBody>2.在您的repository类中需要添加以下代码。注意我们使用flow的内容,并阅读stream。要理解带有有效负载的消息已经到达,它必须以"data:"开头
fun loadGroupDeliveryRateInfos(listOptions: List<GroupCalculatorOptions>) = flow {
coroutineScope {
val response = restApi.calculateGroupPrices(listOptions).execute()
if (response.isSuccessful) {
val input = response.body()?.byteStream()?.bufferedReader() ?: throw Exception()
try {
while (isActive) {
val line = input.readLine() ?: continue
if (line.startsWith("data:")) {
try {
val groupDeliveryRateInfo = gson.fromJson(
line.substring(5).trim(),
GroupDeliveryRateInfo::class.java
)
emit(groupDeliveryRateInfo)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
} catch (e: IOException) {
throw Exception(e)
} finally {
input.close()
}
} else {
throw HttpException(response)
}
}
}3.最后一步,我们需要用ViewModel收集数据。我们只需要从repository调用该方法
repository.loadGroupDeliveryRateInfos(it.values.toList())
.collect { info ->
handleGroupDeliveryRateInfo(info)
}仅此而已,不需要额外的库。
https://stackoverflow.com/questions/32126564
复制相似问题