首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何让一个参与者在一个进程上运行,将一条消息发送给另一个运行在单独进程上的参与者?

如何让一个参与者在一个进程上运行,将一条消息发送给另一个运行在单独进程上的参与者?
EN

Stack Overflow用户
提问于 2017-03-15 21:22:04
回答 1查看 486关注 0票数 2

我希望让运行在不同进程(或节点)上的参与者向其他运行在不同进程(或节点)之外的参与者发送消息,同时保持容错和负载平衡。我目前正在尝试使用Akka.群集的切分功能来完成这一任务。

但是,我不知道怎么做到这一点.

我有以下反映我的种子节点的代码:

代码语言:javascript
复制
let configurePort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              }
              serialization-bindings {
                "System.Object" = hyperion
              }
            }
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          }
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }
        }
        """)
    config.WithFallback(ClusterSingletonManager.DefaultConfig())

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored

// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2551)
let shardRegion1 = spawnSharded id system1 "shardRegion1" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)

let system2 = System.create "cluster-system" (configurePort 2552)
let shardRegion2 = spawnSharded id system2 "shardRegion2" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)

let system3 = System.create "cluster-system" (configurePort 2553)
let shardRegion3 = spawnSharded id system3 "shardRegion3" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(3000)


// NOTE: Even thou we sent all messages through single shard region,
//       some of them will be executed on the second and third one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-1", "entity-2", "hello world 2")
shardRegion1 <! ("shard-2", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

System.Threading.Thread.Sleep(1000)

let printShards shardRegion =
    async {
        let! (reply:AskResult<ShardRegionStats>) = (retype shardRegion) <? GetShardRegionStats.Instance
        let (stats: ShardRegionStats) = reply.Value
        for kv in stats.Stats do
            printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value
    } |> Async.RunSynchronously

let printNodes() =
    printfn "\nShards active on node 'localhost:2551':"
    printShards shardRegion1
    printfn "\nShards active on node 'localhost:2552':"
    printShards shardRegion2
    printfn "\nShards active on node 'localhost:2553':"
    printShards shardRegion3

printNodes()

输出如下所示:

代码语言:javascript
复制
Shards active on node 'localhost:2551':
    Shard 'shard-1' has 2 entities on it
    Shard 'shard-2' has 2 entities on it

节点‘localhost:2552’上活动的碎片:

然后我有一个单独的进程,它执行以下代码:

代码语言:javascript
复制
let configurePort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              }
              serialization-bindings {
                "System.Object" = hyperion
              }
            }
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = "0"
            }
          }
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          }
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }
        }
        """)
    config.WithFallback(ClusterSingletonManager.DefaultConfig())

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored

// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2554)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)

let system2 = System.create "cluster-system" (configurePort 2555)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)

let system3 = System.create "cluster-system" (configurePort 2556)
let shardRegion3 = spawnSharded id system3 "printer" <| props (actorOf2 consumer)

我的集群系统(运行在单独的进程上)识别正在加入的新节点:

代码语言:javascript
复制
> [INFO][3/15/2017 9:12:13 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52953] is JOINING, roles []
[INFO][3/15/2017 9:12:14 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52956] is JOINING, roles []
[INFO][3/15/2017 9:12:15 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52961] is JOINING, roles []
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52953] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52956] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52961] to [Up]

结论:

总之,我希望让运行在不同进程(或节点)上的参与者向其他运行在不同进程(或节点)之外的参与者发送消息,同时保持容错和负载平衡。我目前正在尝试使用Akka.群集的切分功能来完成这一任务。

附录:

代码语言:javascript
复制
open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r @"C:\Users\Snimrod\Documents\Visual Studio 2015\Projects\Temp\packages\Akka.FSharp.1.1.3\lib\net45\Akka.FSharp.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"


open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion
EN

回答 1

Stack Overflow用户

发布于 2017-03-15 22:03:42

为了维护碎片及其位置的一致视图,Akka.Cluster.Sharding持久后端必须指向对所有进程都可见的数据库。在您的配置中,您使用的是akka.persistence.journal.inmem,它是内存中的数据存储(只用于测试和开发)。从另一个进程中看不到它。

您需要配置一个持久后端,以便在驻留在不同机器/进程上的节点之间可以看到碎片。您可以这样做,即通过使用Akka.Persistence.SqlServer或任何其他插件。这是仅通过分片使用的持久性后端最基本的配置:

代码语言:javascript
复制
akka.persistence {
    journal {
        plugin = "akka.persistence.journal.sql-server"
        sql-server {
            connection-string = "<connection-string>"
            auto-initialize = on
        }
    }
    snapshot-store {
        plugin = "akka.persistence.snapshot-store.sql-server"
        sql-server {
            connection-string = "<connection-string>"
            auto-initialize = on
        }
    }
}

要了解更实际的内容,请参考这篇文章

还请记住,Akka.Cluster.Sharding和Akka.Persistence插件都只能在预发布模式下使用(因此您需要安装带有-pre标志的包)。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42820905

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档