首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RetryPolicy不能与协程一起工作

RetryPolicy不能与协程一起工作
EN

Stack Overflow用户
提问于 2021-07-27 02:09:49
回答 1查看 131关注 0票数 1

我用协程用Kotlin做了一个简单的gRPC服务器,用Java做了一个客户机。在客户端中,我启用并配置了重试策略,但它不起作用。我花了很多时间来寻找解决方案,我相信我的客户端坏了,但问题出在服务器上。我会给你看代码。

这是我的原型文件:

代码语言:javascript
复制
syntax = "proto3";
option java_multiple_files = true;
option java_package = "br.com.will.protoclasses";
option java_outer_classname = "NotificationProto";

package notification;

service Notification {
  rpc SendPush (SendPushNotificationRequest) returns (SendPushNotificationResponse);
}

message SendPushNotificationRequest {
  string title = 1;
  string message = 2;
  string customer_id = 3;
}

message SendPushNotificationResponse {
  string message = 1;
}

这是客户端:

代码语言:javascript
复制
open class NotificationClient(private val channel: ManagedChannel) {
    private val stub: NotificationGrpcKt.NotificationCoroutineStub =
        NotificationGrpcKt.NotificationCoroutineStub(channel)

    suspend fun send() {
        val request =
            SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("test")
                .setTitle("test").build()
        val response =  stub.sendPush(request)
        println("Received: ${response.message}")
    }

}

suspend fun main(args: Array<String>) {
    val port = System.getenv("PORT")?.toInt() ?: 50051

    val retryPolicy: MutableMap<String, Any> = HashMap()
    retryPolicy["maxAttempts"] = 5.0
    retryPolicy["initialBackoff"] = "10s"
    retryPolicy["maxBackoff"] = "30s"
    retryPolicy["backoffMultiplier"] = 2.0
    retryPolicy["retryableStatusCodes"] = listOf<Any>("INTERNAL")

    val methodConfig: MutableMap<String, Any> = HashMap()

    val name: MutableMap<String, Any> = HashMap()
    name["service"] = "notification.Notification"
    name["method"] = "SendPush"
    methodConfig["name"] = listOf<Any>(name)
    methodConfig["retryPolicy"] = retryPolicy

    val serviceConfig: MutableMap<String, Any> = HashMap()
    serviceConfig["methodConfig"] = listOf<Any>(methodConfig)

    print(serviceConfig)

    val channel = ManagedChannelBuilder.forAddress("localhost", port)
        .usePlaintext()
        .defaultServiceConfig(serviceConfig)
        .enableRetry()
        .build()

    val client = NotificationClient(channel)

    client.send()
}

这是我的gRPC服务的一部分,我在这里测试重试策略(客户端上的重试策略不适用于此实现):

代码语言:javascript
复制
override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
    val count: Int = retryCounter.incrementAndGet()
    log.info("Received a call on method sendPushNotification with payload -> $request")

    if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
        log.info("Returning stubbed INTERNAL error. count: $count")
        throw Status.INTERNAL.withDescription("error").asRuntimeException()
    }

    log.info("Returning successful Hello response, count: $count")
    return SendPushNotificationResponse.newBuilder().setMessage("success").build()

}

另一个实现,但现在使用StreamObserver (此实现可以很好地工作):

代码语言:javascript
复制
override fun sendPush(
        request: SendPushNotificationRequest?,
        responseObserver: StreamObserver<SendPushNotificationResponse>?
    ) {
        log.info("Received a call on method sendPushNotification with payload -> $request")

        val count: Int = retryCounter.incrementAndGet()
        if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
            log.info("Returning stubbed UNAVAILABLE error. count: $count")
            responseObserver!!.onError(
                Status.UNAVAILABLE.withDescription("error").asRuntimeException()
            )
        } else {
            log.info("Returning successful Hello response, count: $count")

            responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
            return responseObserver.onCompleted()
        }
    }

问题是,哪里出了问题?有人能帮我吗?

EN

回答 1

Stack Overflow用户

发布于 2021-07-27 09:41:09

以下代码是由gRPC生成的吗:

代码语言:javascript
复制
sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse

gRPC依赖StreamObserver在调用responseObserver.onCompleted()responseObserver.onError后向客户端发送响应,请确保您的代码能够正常工作。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68534635

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档