首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >发送并行请求Erlang

发送并行请求Erlang
EN

Stack Overflow用户
提问于 2018-04-13 07:04:26
回答 1查看 342关注 0票数 1

我正在用Erlang实现一个类似Twitter的应用程序。我有它的分布式和非分布式实现。我正在做一个基准测试,但我似乎找不到一种方法来向分布式实现的每个用户进程发送并行请求。我使用一个列表:foreach函数来发送"get tweet“到一个列表的客户端processes.My理解是,列表:foreach函数一次一个地步入列表的每个元素,实现一种顺序行为,这最终使我的分布式实现与非分布式实现的执行时间相等。是否可以将"get tweets“请求同时发送到不同的客户端进程?在我看来,这似乎是一个相当具体的案例,在StackOverflow内部和外部寻找解决方案都很困难。

代码语言:javascript
复制
test_get_tweets_Bench() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("timeline",
    fun () ->
        lists:foreach(fun (_) ->
            UserChoice = pick_random(UserInfos),
            server:get_tweets(element(2, UserChoice), element(1, UserChoice), 1)
        end,
        lists:seq(1, 10000))
    end,
    30).

pick_random(List) ->
lists:nth(rand:uniform(length(List)), List).

Client_process是以下形式的列表:{userId,userinfos },...

在尝试了rpc:pmap而不是list:foreach之后,我的基准测试变得大约慢了3倍。更改如下:

代码语言:javascript
复制
test_get_tweets_Bench2() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("get_tweets 2",
    fun () ->
        rpc:pmap({?MODULE,do_apply},
                 [fun (_) ->
            UserChoice = pick_random(UserInfos),
            server:get_tweets(element(2, UserChoice), element(1, UserChoice), 1)
        end],
                    lists:seq(1, 10000))
    end,
    30).


pick_random(List) ->
    lists:nth(rand:uniform(length(List)), List).

do_apply(X,F)->
    F(X).

我认为rpc:pmap会让我的基准测试更快,因为它会并行发送get_tweet请求。

下面是我的服务器模块,它是我的基准测试和类似Twitter的应用程序之间的API。API将来自我的基准测试的请求发送到我的类似Twitter的应用程序。

代码语言:javascript
复制
    %% This module provides the protocol that is used to interact with an
%% implementation of a microblogging service.
%%
%% The interface is design to be synchrounous: it waits for the reply of the
%% system.
%%
%% This module defines the public API that is supposed to be used for
%% experiments. The semantics of the API here should remain unchanged.
-module(server).

-export([register_user/1,
         subscribe/3,
         get_timeline/3,
         get_tweets/3,
         tweet/3]).

%%
%% Server API
%%

% Register a new user. Returns its id and a pid that should be used for
% subsequent requests by this client.
-spec register_user(pid()) -> {integer(), pid()}.
register_user(ServerPid) ->
    ServerPid ! {self(), register_user},
    receive
        {ResponsePid, registered_user, UserId} -> {UserId, ResponsePid}
    end.

% Subscribe/follow another user.
-spec subscribe(pid(), integer(), integer()) -> ok.
subscribe(ServerPid, UserId, UserIdToSubscribeTo) ->
    ServerPid ! {self(), subscribe, UserId, UserIdToSubscribeTo},
    receive
        {_ResponsePid, subscribed, UserId, UserIdToSubscribeTo} -> ok
    end.

% Request a page of the timeline of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_timeline(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_timeline(ServerPid, UserId, Page) ->
    ServerPid ! {self(), get_timeline, UserId, Page},
    receive
        {_ResponsePid, timeline, UserId, Page, Timeline} ->
            Timeline
    end.

% Request a page of tweets of a particular user.
% Request results can be 'paginated' to reduce the amount of data to be sent in
% a single response. This is up to the server.
-spec get_tweets(pid(), integer(), integer()) -> [{tweet, integer(), erlang:timestamp(), string()}].
get_tweets(ServerPid, UserId, Page) ->
    ServerPid ! {self(), get_tweets, UserId, Page},
    receive
        {_ResponsePid, tweets, UserId, Page, Tweets} ->
            Tweets
    end.

% Submit a tweet for a user.
% (Authorization/security are not regarded in any way.)
-spec tweet(pid(), integer(), string()) -> erlang:timestamp(). 
tweet(ServerPid, UserId, Tweet) ->
    ServerPid ! {self(), tweet, UserId, Tweet},
    receive
        {_ResponsePid, tweet_accepted, UserId, Timestamp} ->
            Timestamp
    end.
EN

回答 1

Stack Overflow用户

发布于 2018-04-13 22:37:54

在Erlang中,消息从进程A交换到进程B。没有像广播或选择性广播这样的可用特性。在您的应用程序中,我看到了3个步骤:

  1. 发送请求以获取用户的tweet,
  2. 用户进程准备答案并将其发送回请求者
  3. 初始进程收集答案

向用户进程发送请求并收集tweet(步骤1和3)不能使用并行性。当然,您可以使用多个进程发送请求并收集答案,每个用户最多1个,但我猜这不是您问题的主题。

可行的是确保每个用户进程的3个步骤不是按顺序完成的,而是并行完成的。我猜server:get_tweets函数负责发送请求并收集答案。如果我是正确的(我不知道,因为你没有提供代码,你忽略了返回值),你可以通过将这个函数分成2部分来使用并行性,第一个函数发送请求,第二个函数收集答案。(这里是一个代码示例,我没有尝试过,甚至没有编译过,所以请仔细考虑:o)

代码语言:javascript
复制
test_get_tweets_Bench() ->
{ServerPid, UserInfos} = initializeForBench_server(),
run_benchmark("timeline",
    fun () ->
        % send the requests
        List = lists:map(fun (_) ->
            {UserId,Pid} = pick_random(UserInfos),
            Ref = server:request_tweets(Pid,UserId),
            {Ref,UserId}
            end,
            lists:seq(1, 10000)),
        % collects the answers
        collect(L,[])
    end,
    30).

collect([],Result) -> {ok,Result};
collect(List,ResultSoFar) ->
    receive
        {Ref,UserId,Tweets} ->
            {ok,NewList} = remove_pending_request(Ref,UserId,List),
            collect(Newlist,[{UserId,Tweets}|ResultSoFar])
    after ?TIMEOUT
        {error,timeout,List,ResultSoFar}
    end.

remove_pending_request(Ref,UserId,List) ->
    {value,{Ref,UserId},NewList} = lists:keytake(Ref,1,List),
    {ok,NewList}. 

pick_random(List) ->
lists:nth(rand:uniform(length(List)), List).

这是我实现并行基准测试的另一次尝试,但没有实现任何加速。

代码语言:javascript
复制
get_tweets(Sender, UserId, Node) ->
server:get_tweets(Node, UserId, 0),
Sender ! done_get_tweets.

test_get_tweets3() ->
    {_ServerId, UserInfos} = initializeForBench_server(),
    run_benchmark("parallel get_tweet", 
        fun () ->
            lists:foreach(
                fun (_) ->
                    {UserId,Pid} = pick_random(UserInfos),
                    spawn(?MODULE, get_tweets, [self(), UserId, Pid])
                end,
                lists:seq(1, ?NUMBER_OF_REQUESTS)),
            lists:foreach(fun (_) -> receive done_get_tweets -> ok end end, lists:seq(1, ?NUMBER_OF_REQUESTS))
        end,
        ?RUNS).
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49807065

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档