首先,对于Cowboy,特别是Websockets来说,确实缺乏文档,但是总的来说,一旦它被删除,使用起来就很好了。那么,从Erlang获得这一信息是另一步。由于有了这篇文章由7根螺柱制成,我能够获得一个功能良好的websocket来进行测试,但我无法让它同时收听并同时发送消息。我认为这是因为接收阻塞了需要发送的线程,这是与websocket连接的内在链接,所以它不能在等待接收时发送。也许这种理解是有缺陷的。我很想被纠正。我已经尝试了产卵,但没有效果,这就是为什么我认为接收是阻塞的websocket线程。
def ws do
localhost = 'localhost'
path = '/ws/app/1'
port = 5000
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
IO.inspect(conn_pid, label: "conn_pid")
{:ok, protocol} = :gun.await_up(conn_pid)
IO.inspect(protocol, label: "protocol")
# Set custom header with cookie for device id
stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
IO.inspect(stream_ref, label: "stream_ref")
receive do
{:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(conn_pid, headers, stream_ref)
{:gun_response, ^conn_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _conn_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
:ok
end
def upgrade_success(conn_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
IO.inspect(self(), label: "upgrade self")
# This one runs and message is received
run_test(conn_pid)
# This should spawn and therefore not block
listen(conn_pid, stream_ref)
# This never runs
run_test(conn_pid)
end
def listen(conn_pid, stream_ref) do
spawn receive_messages(conn_pid, stream_ref)
end
def receive_messages(conn_pid, stream_ref) do
IO.inspect conn_pid, label: "conn_pid!"
IO.inspect stream_ref, label: "stream_ref!"
IO.inspect(self(), label: "self pid")
receive do
{:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
IO.inspect(msg, label: "Message from websocket server:")
other_messages ->
IO.inspect(other_messages, label: "Other messages")
after 5000 ->
IO.puts "Receive timed out"
end
receive_messages(conn_pid, stream_ref)
end
def send_message(message, conn_pid) do
:gun.ws_send(conn_pid, {:text, message})
end
def run_test(conn_pid) do
IO.puts "Running test"
message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
send_message(message, conn_pid)
end
def stop(conn_pid) do
:gun.shutdown(conn_pid)
end发布于 2019-06-25 13:37:19
感谢示例代码和编辑的7 7stud,这些代码和编辑反映如下:
下面是我对gun的解释,给出一个基本的WebSocket客户端:
defmodule WebsocketTester.Application do
use Application
def start(_type, _args) do
path = '/ws/app/1'
port = 5000
host = 'localhost'
args = %{path: path, port: port, host: host}
children = [
{ WebSocket.Client, args }
]
Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
end
end
defmodule WebSocket.Client do
use GenServer
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
# GenServer callbacks
def init(args) do
# Set up the websocket connection
# get > upgrade
# Initial state with gun_pid and stream_ref
# %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
{:ok, init_ws(args)}
end
# Give back gun_pid from state
def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
IO.inspect(gun_pid, label: "handle call gun pid")
{:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
end
# Everything else
def handle_call(other, from, state) do
IO.inspect(other, label: "other call")
IO.inspect(from, label: "from")
{:ok, state}
end
# Client sends message to server.
def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
IO.puts message
IO.inspect(gun_pid, label: "gun_pid")
:gun.ws_send(gun_pid, {:text, message})
{:noreply, state}
end
def handle_info(message, %{from: from} = state) do
IO.inspect(message, label: "Inside handle_info(): ")
GenServer.reply(from, message)
{:noreply, state}
end
def terminate(reason, _state) do
IO.puts "Terminated due to #{reason}."
:ok
end
def code_change(_old_version, state, _extra) do
{:ok, state}
end
## Client functions
# Used for getting gun_pid from state
def send_sync(request) do
GenServer.call(__MODULE__, request)
end
# Send a message async
def send_async(request) do
GenServer.cast(__MODULE__, {:websocket_request, request})
end
# Receive a single message
def get_message(stream_ref, gun_pid) do
receive do
{^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
IO.puts("Client received gun message: #{message}")
other ->
IO.inspect(other, label: "Client received other message")
end
end
# Receive all messages recursively
def receive_loop(stream_ref, gun_pid) do
IO.puts "Listening"
get_message(stream_ref, gun_pid)
receive_loop(stream_ref, gun_pid)
end
def go() do
# Get the gun_pid from state
%{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
# Send messages manually
:ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
# Or to send just text
# :ok = send_async("yo")
# Receive messages manually
get_message(stream_ref, gun_pid)
# Start sending loop
spawn sender 1
# Start listening
receive_loop(stream_ref, gun_pid)
end
# Send messages to handle_info() every 3 secs
def sender(count) do
send_async("count is #{count}")
:timer.sleep(3000)
sender(count+1)
end
## End of client functions
# Initialize the websocket connection
def init_ws(args) do
%{ path: path, port: port, host: host} = args
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, gun_pid} = :gun.open(host, port, connect_opts)
{:ok, _protocol} = :gun.await_up(gun_pid)
# Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
receive do
{:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(gun_pid, headers, stream_ref)
{:gun_response, ^gun_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _gun_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
# stop(gun_pid)
end
def set_headers(cookie_value) do
[{"cookie", "my_cookie=#{cookie_value}"}]
end
# This just returns the gun_pid for further reference which gets stored in the GenServer state.
def upgrade_success(gun_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
%{stream_ref: stream_ref, gun_pid: gun_pid}
end
# To stop gun
def stop(gun_pid) do
:gun.shutdown(gun_pid)
end
end要使用此方法:
iex -S mix
iex> WebSocket.Client.go发布于 2019-06-22 02:58:31
来自枪械医生
接收数据 Gun为接收到的每个Websocket消息向所有者进程发送一条Erlang消息。
连接 ..。 炮连接 ..。 Gun连接是一个Erlang进程,它管理到远程端点的套接字。此枪支连接由名为连接所有者的用户进程拥有,并由枪支应用程序的监视树管理。 所有者进程通过从模块枪调用函数与枪支连接进行通信。所有函数都异步执行各自的操作。Gun连接将在需要时向所有者进程发送Erlang消息。
虽然在文档中没有具体提到它,但我确信所有者进程是调用gun:open()的进程。我的尝试还显示,所有者进程必须调用gun:ws_send()。换句话说,所有者进程必须既向服务器发送消息,又从服务器接收消息。
下面的代码使用gen_server操作gun,使gen_server既向服务器发送消息,又接收来自服务器的消息。
当gun从牛仔http服务器接收到消息时,gun将消息(即Pid ! Msg )发送给所有者进程。在下面的代码中,gen_server在init/1回调中创建了连接,这意味着枪会爆炸(!)从gen_server的牛仔那里收到的信息。gen_server处理直接用handle_info()发送到其邮箱的邮件。
在handle_cast(),gen_server用枪向牛仔发送请求。因为handle_cast()是异步的,这意味着您可以向牛仔发送异步消息。当枪收到牛仔的消息时,枪就会发出(!)发送给gen_server的消息和gen_server的handle_info()函数处理消息。在handle_info()内部,调用gen_server:reply/2将消息转发到gen_server客户端。因此,每当gen_server客户机想要检查从gun发送的服务器消息时,它就可以跳入接收子句。
-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]). %%% client functions
-export([sender/1]).
%%% client functions
%%%
start_server() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
send_sync(Requ) ->
gen_server:call(?MODULE, Requ).
send_async(Requ) ->
gen_server:cast(?MODULE, {websocket_request, Requ}).
get_message(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
io:format("Client received gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end.
receive_loop(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Client received Gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end,
receive_loop(WebSocketPid, ClientRef).
go() ->
{ok, GenServerPid} = start_server(),
io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),
[{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),
ok = send_async("ABCD"),
get_message(ConnPid, ClientRef),
spawn(?MODULE, sender, [1]),
ok = send_async("XYZ"),
get_message(ConnPid, ClientRef),
receive_loop(ConnPid, ClientRef).
sender(Count) -> %Send messages to handle_info() every 3 secs
send_async(lists:concat(["Hello", Count])),
timer:sleep(3000),
sender(Count+1).
%%%%%% gen_server callbacks
%%%
init(_Arg) ->
{ok, {no_client, ws()}}.
handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
{reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
{stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
{ok, State}.
handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
{noreply, State}.
handle_info(Msg, State={From, _WebSocketPid}) ->
io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
gen_server:reply(From, Msg),
{noreply, State}.
terminate(_Reason, _State={_From, WebSocketPid}) ->
gun:shutdown(WebSocketPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% private functions
%%%
ws() ->
{ok, _} = application:ensure_all_started(gun),
{ok, ConnPid} = gun:open("localhost", 8080),
{ok, _Protocol} = gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),
receive
{gun_ws_upgrade, ConnPid, ok, Headers} ->
io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n",
[ConnPid]),
upgrade_success_handler(ConnPid, Headers);
{gun_response, ConnPid, _, _, Status, Headers} ->
exit({ws_upgrade_failed, Status, Headers});
{gun_error, _ConnPid, _StreamRef, Reason} ->
exit({ws_upgrade_failed, Reason})
after 1000 ->
exit(timeout)
end.
upgrade_success_handler(ConnPid, _Headers) ->
io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),
ConnPid.=======
下面的答案显示了如何让服务器将数据推送到客户端.。
好吧,我知道了--在erlang。这个例子有点痛苦。你需要做几件事:
1)您需要获得运行websocket_*函数的进程的pid,这与请求的pid不一样:
升级后初始化 牛仔有单独的进程来处理连接和请求。因为Websocket接管了连接,所以Websocket协议处理发生在与请求处理不同的进程中。 这反映在不同的回调Websocket处理程序中。从临时请求进程调用init/2回调,从连接进程调用websocket_回调。 这意味着某些初始化不能从init/2中完成。任何需要当前pid或绑定到当前pid的东西都不能按预期工作。可选的websocket_init/1可用于获取运行websocket_回调的进程的pid:
下面是我使用的代码:
init(Req, State) ->
{cowboy_websocket, Req, State}. %Perform websocket setup
websocket_init(State) ->
io:format("[ME]: Inside websocket_init"),
spawn(?MODULE, push, [self(), "Hi, there"]),
{ok, State}.
push(WebSocketHandleProcess, Greeting) ->
timer:sleep(4000),
WebSocketHandleProcess ! {text, Greeting}.
websocket_handle({text, Msg}, State) ->
timer:sleep(10000), %Don't respond to client request just yet.
{
reply,
{text, io_lib:format("Server received: ~s", [Msg]) },
State
};
websocket_handle(_Other, State) -> %Ignore
{ok, State}.这将在客户端等待对客户端先前发送到服务器的请求的答复时将消息推送给客户端。
2)如果向运行websocket_*函数的进程发送消息:
Pid ! {text, Msg}然后,该消息将由websocket_info()函数处理--而不是websocket_handle()函数:
websocket_info({text, Text}, State) ->
{reply, {text, Text}, State};
websocket_info(_Other, State) ->
{ok, State}.websocket_info()函数的返回值与websocket_handle()函数的返回值一样工作。
因为您的枪支客户端现在正在接收多条消息,所以持枪客户端需要在一个循环中接收:
upgrade_success_handler(ConnPid, Headers) ->
io:format("Upgraded ~w. Success!~nHeaders:~n~p~n",
[ConnPid, Headers]),
gun:ws_send(ConnPid, {text, "It's raining!"}),
get_messages(ConnPid). %Move the receive clause into a recursive function
get_messages(ConnPid) ->
receive
{gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
io:format("~s~n", [Greeting]),
get_messages(ConnPid);
{gun_ws, ConnPid, {text, Msg} } ->
io:format("~s~n", [Msg]),
get_messages(ConnPid)
end.https://stackoverflow.com/questions/56700693
复制相似问题