如何处理函数中的“Lwt对象”?
我的代码是
Array.map (fun conn -> let* resp = (call_server conn
(RequestVoteArg({
candidateNumber = myState.myPersistentState.id;
term = myState.myPersistentState.currentTerm;
lastlogIndex = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).index;
lastlogTerm = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).term
}))) in (match resp with
| Error(s) -> Printf.printf "requestVote: connection failed: %s" s
| Ok(repl, s) ->
(match repl with
| RequestVoteRet(repl) ->
if repl.voteGranted then current_vote := !current_vote + 1;
if not (repl.term = (-1l)) then myState.myPersistentState.currentTerm <- repl.term;
Printf.printf "requestVote: status: %s" s
| _ -> failwith "Should not reach here")); conn) peers但是有一个错误:这个表达式(最后是康涅狄格)的类型是H2_lwt_unix.Client.t,但表达式的类型是'weak702 Lwt.t。
对等点是一个连接数组(类型: H2_lwt_unix.Client.t)。
call_server的定义是:
val call_server: H2_lwt_unix.Client.t -> protobufArg -> (Types.protobufRet * Grpc.Status.t, Grpc.Status.t) result Lwt.t和
let build_connection addr port =
let* addrs =
Lwt_unix.getaddrinfo addr (string_of_int port)
[ Unix.(AI_FAMILY PF_INET) ]
in
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
let* () = Lwt_unix.connect socket (List.hd addrs).Unix.ai_addr in
let error_handler _ = print_endline "error" in
let connection =
H2_lwt_unix.Client.create_connection ~error_handler socket
in connection
let call_server connection req =
let enc = Pbrt.Encoder.create() in
match req with
| RequestVoteArg(s) ->
let proto_s =
Proto.Proto_types.default_request_vote_arg ~candidate_number:s.candidateNumber ~term:s.term ~lastlog_term:s.lastlogTerm ~lastlog_index:s.lastlogIndex ()
in
Proto.Proto_pb.encode_request_vote_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"RequestVote"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
(let+ decoder = decoder in
(match decoder with
| Some (decoder) ->
let decoder = Pbrt.Decoder.of_string decoder in
let reply = Proto.Proto_pb.decode_request_vote_reply decoder in
RequestVoteRet( { term=reply.term; voteGranted=reply.vote_granted } )
| None -> RequestVoteRet( { term=(-1l); voteGranted=false } ))))) ()
| AppendEntriesArg(s) ->
let proto_s =
Proto.Proto_types.default_append_entries_arg ~term: s.term ~leader_id:s.leaderID ~next_log_index:s.nextLogIndex ~next_log_term:s.nextLogTerm ~entries:(List.map (fun lg -> Proto.Proto_types.default_log ~command: lg.command ~term:lg.term ~index: lg.index ()) (Array.to_list (s.entries)) ) ()
in
Proto.Proto_pb.encode_append_entries_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"AppendEntries"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
let+ decoder = decoder in
match decoder with
| Some(decoder) ->
let decoder = Pbrt.Decoder.of_string decoder in
let reply = Proto.Proto_pb.decode_append_entries_reply decoder in
AppendEntriesRet( { term=reply.term; success=reply.success } )
| None -> AppendEntriesRet( { term=(-1l); success=false } ))) ()发布于 2022-11-16 13:36:37
您的代码可以简化为:
let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; conn它失败了,因为conn是一个连接,而不是返回一个连接的承诺。
最简单的解决方法是将此值封装在Lwt承诺中,其中之一是
let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; Lwt.return conn或
let f (conn:H2_lwt_unix.Client.t) =
let+ resp = ... in
...; connhttps://stackoverflow.com/questions/74454270
复制相似问题