在将数据划分到多个节点的分布式计算中,我如何计算大型向量(序列)的算术平均值。我不想使用map reduce范例。除了在每个节点上简单地计算单个和,然后在主节点上带来结果并除以向量(级数)的大小之外,是否有任何分布式算法来有效地计算平均值。
发布于 2017-03-03 09:10:52
分布式平均共识是另一种选择。
使用master的map-reduce的简单方法的问题是,如果你有一个庞大的数据集,本质上是让所有东西相互依赖,那么计算数据可能需要非常长的时间,到那时信息已经非常过时,因此是错误的,除非你锁定整个数据集-对于大量的分布式数据集来说是不切实际的。使用分布式平均共识(相同的方法适用于替代算法),您可以在不锁定数据的情况下实时获得更新、更好的对平均值的当前值的猜测。这里有一个关于它的论文的链接,但它的数学含量很高:http://web.stanford.edu/~boyd/papers/pdf/lms_consensus.pdf你可以在谷歌上搜索到许多关于它的论文。
一般概念是这样的:假设在每个节点上都有一个套接字侦听器。您计算本地和和平均值,然后将其发布到其他节点。每个节点监听其他节点,并在有意义的时间尺度上接收它们的总和和平均值。然后,您可以通过(sumForAllNodes(storedAveragenode * storedCountnode) /(sumForAllNodes(StoredCountnode)评估一个好的总平均值。如果您有一个非常大的数据集,您可以只监听新值,因为它们存储在节点中,并修改本地计数和平均值,然后发布它们。
如果这花费的时间太长,您可以对每个节点中的随机数据子集进行平均。
这里有一些c#代码给你一个想法(使用fleck在比windows -10更多的windows版本上运行-只有microsoft websockets实现)。在两个节点上运行此程序,其中一个使用
<appSettings>
<add key="thisNodeName" value="UK" />
</appSettings>在app.config中使用"EU-North“,在另一个中使用”EU-North“。下面是一些示例代码。这两个实例的交换方式是使用websockets。您只需要添加数据库的后端枚举。
using Fleck;
namespace WebSocketServer
{
class Program
{
static List<IWebSocketConnection> _allSockets;
static Dictionary<string,decimal> _allMeans;
static Dictionary<string,decimal> _allCounts;
private static decimal _localMean;
private static decimal _localCount;
private static decimal _localAggregate_count;
private static decimal _localAggregate_average;
static void Main(string[] args)
{
_allSockets = new List<IWebSocketConnection>();
_allMeans = new Dictionary<string, decimal>();
_allCounts = new Dictionary<string, decimal>();
var serverAddresses = new Dictionary<string,string>();
//serverAddresses.Add("USA-WestCoast", "ws://127.0.0.1:58951");
//serverAddresses.Add("USA-EastCoast", "ws://127.0.0.1:58952");
serverAddresses.Add("UK", "ws://127.0.0.1:58953");
serverAddresses.Add("EU-North", "ws://127.0.0.1:58954");
//serverAddresses.Add("EU-South", "ws://127.0.0.1:58955");
foreach (var serverAddress in serverAddresses)
{
_allMeans.Add(serverAddress.Key, 0m);
_allCounts.Add(serverAddress.Key, 0m);
}
var thisNodeName = ConfigurationSettings.AppSettings["thisNodeName"]; //for example "UK"
var serverSocketAddress = serverAddresses.First(x=>x.Key==thisNodeName);
serverAddresses.Remove(thisNodeName);
var websocketServer = new Fleck.WebSocketServer(serverSocketAddress.Value);
websocketServer.Start(socket =>
{
socket.OnOpen = () =>
{
Console.WriteLine("Open!");
_allSockets.Add(socket);
};
socket.OnClose = () =>
{
Console.WriteLine("Close!");
_allSockets.Remove(socket);
};
socket.OnMessage = message =>
{
Console.WriteLine(message + " received");
var parameters = message.Split('~');
var remoteHost = parameters[0];
var remoteMean = decimal.Parse(parameters[1]);
var remoteCount = decimal.Parse(parameters[2]);
_allMeans[remoteHost] = remoteMean;
_allCounts[remoteHost] = remoteCount;
};
});
while (true)
{
//evaluate my local average and count
Random rand = new Random(DateTime.Now.Millisecond);
_localMean = 234.00m + (rand.Next(0, 100) - 50)/10.0m;
_localCount = 222m + rand.Next(0, 100);
//evaluate my local aggregate average using means and counts sent from all other nodes
//could publish aggregate averages to other nodes, if you wanted to monitor disagreement between nodes
var total_mean_times_count = 0m;
var total_count = 0m;
foreach (var server in serverAddresses)
{
total_mean_times_count += _allCounts[server.Key]*_allMeans[server.Key];
total_count += _allCounts[server.Key];
}
//add on local mean and count which were removed from the server list earlier, so won't be processed
total_mean_times_count += (_localMean * _localCount);
total_count = total_count + _localCount;
_localAggregate_average = (total_mean_times_count/total_count);
_localAggregate_count = total_count;
Console.WriteLine("local aggregate average = {0}", _localAggregate_average);
System.Threading.Thread.Sleep(10000);
foreach (var serverAddress in serverAddresses)
{
using (var wscli = new ClientWebSocket())
{
var tokSrc = new CancellationTokenSource();
using (var task = wscli.ConnectAsync(new Uri(serverAddress.Value), tokSrc.Token))
{
task.Wait();
}
using (var task = wscli.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(thisNodeName+"~"+_localMean + "~"+_localCount)),
WebSocketMessageType.Text,
false,
tokSrc.Token
))
{
task.Wait();
}
}
}
}
}
}
}不要忘记添加静态锁或通过在给定时间同步来分离活动。(为简单起见,未显示)
发布于 2017-03-05 18:34:56
您可以使用两种简单的方法。
一种是,正如您正确指出的,计算每个节点上的总和,然后组合总和并除以总数据量:
avg = (sum1+sum2+sum3)/(cnt1+cnt2+cnt3)另一种可能是计算每个节点的平均值,然后使用加权平均值:
avg = (avg1*cnt1 + avg2*cnt2 + avg3*cnt3) / (cnt1+cnt2+cnt3)
= avg1*cnt1/(cnt1+cnt2+cnt3) + avg2*cnt2/(cnt1+cnt2+cnt3) + avg3*cnt3/(cnt1+cnt2+cnt3)我不认为这些琐碎的方法有任何问题,我想知道为什么您会想要使用不同的方法。
https://stackoverflow.com/questions/42428424
复制相似问题