我用协程用Kotlin做了一个简单的gRPC服务器,用Java做了一个客户机。在客户端中,我启用并配置了重试策略,但它不起作用。我花了很多时间来寻找解决方案,我相信我的客户端坏了,但问题出在服务器上。我会给你看代码。
这是我的原型文件:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "br.com.will.protoclasses";
option java_outer_classname = "NotificationProto";
package notification;
service Notification {
rpc SendPush (SendPushNotificationRequest) returns (SendPushNotificationResponse);
}
message SendPushNotificationRequest {
string title = 1;
string message = 2;
string customer_id = 3;
}
message SendPushNotificationResponse {
string message = 1;
}这是客户端:
open class NotificationClient(private val channel: ManagedChannel) {
private val stub: NotificationGrpcKt.NotificationCoroutineStub =
NotificationGrpcKt.NotificationCoroutineStub(channel)
suspend fun send() {
val request =
SendPushNotificationRequest.newBuilder().setCustomerId(UUID.randomUUID().toString()).setMessage("test")
.setTitle("test").build()
val response = stub.sendPush(request)
println("Received: ${response.message}")
}
}
suspend fun main(args: Array<String>) {
val port = System.getenv("PORT")?.toInt() ?: 50051
val retryPolicy: MutableMap<String, Any> = HashMap()
retryPolicy["maxAttempts"] = 5.0
retryPolicy["initialBackoff"] = "10s"
retryPolicy["maxBackoff"] = "30s"
retryPolicy["backoffMultiplier"] = 2.0
retryPolicy["retryableStatusCodes"] = listOf<Any>("INTERNAL")
val methodConfig: MutableMap<String, Any> = HashMap()
val name: MutableMap<String, Any> = HashMap()
name["service"] = "notification.Notification"
name["method"] = "SendPush"
methodConfig["name"] = listOf<Any>(name)
methodConfig["retryPolicy"] = retryPolicy
val serviceConfig: MutableMap<String, Any> = HashMap()
serviceConfig["methodConfig"] = listOf<Any>(methodConfig)
print(serviceConfig)
val channel = ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext()
.defaultServiceConfig(serviceConfig)
.enableRetry()
.build()
val client = NotificationClient(channel)
client.send()
}这是我的gRPC服务的一部分,我在这里测试重试策略(客户端上的重试策略不适用于此实现):
override suspend fun sendPush(request: SendPushNotificationRequest): SendPushNotificationResponse {
val count: Int = retryCounter.incrementAndGet()
log.info("Received a call on method sendPushNotification with payload -> $request")
if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
log.info("Returning stubbed INTERNAL error. count: $count")
throw Status.INTERNAL.withDescription("error").asRuntimeException()
}
log.info("Returning successful Hello response, count: $count")
return SendPushNotificationResponse.newBuilder().setMessage("success").build()
}另一个实现,但现在使用StreamObserver (此实现可以很好地工作):
override fun sendPush(
request: SendPushNotificationRequest?,
responseObserver: StreamObserver<SendPushNotificationResponse>?
) {
log.info("Received a call on method sendPushNotification with payload -> $request")
val count: Int = retryCounter.incrementAndGet()
if (random.nextFloat() < UNAVAILABLE_PERCENTAGE) {
log.info("Returning stubbed UNAVAILABLE error. count: $count")
responseObserver!!.onError(
Status.UNAVAILABLE.withDescription("error").asRuntimeException()
)
} else {
log.info("Returning successful Hello response, count: $count")
responseObserver!!.onNext(SendPushNotificationResponse.newBuilder().setMessage("success").build())
return responseObserver.onCompleted()
}
}问题是,哪里出了问题?有人能帮我吗?
发布于 2021-07-27 09:41:09
以下代码是由gRPC生成的吗:
sendPush(request: SendPushNotificationRequest): SendPushNotificationResponsegRPC依赖StreamObserver在调用responseObserver.onCompleted()或responseObserver.onError后向客户端发送响应,请确保您的代码能够正常工作。
https://stackoverflow.com/questions/68534635
复制相似问题