首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >药剂中的同步障碍?

药剂中的同步障碍?
EN

Stack Overflow用户
提问于 2015-11-28 13:53:42
回答 2查看 557关注 0票数 2

对于Elixir来说,(循环)屏障的最优雅实现是什么?一种要实现的算法(顶点着色)有一个循环,该循环具有生成进程的等待阶段(“并行执行.同步”,然后使用所有进程的结果检查终止条件),它是分布式计算原理中的算法5“6色”。1.

大多数引用都是针对.NET、p线程和其他与线程相关的计算,所以我不确定屏障是否是我所追求的正确模式。也许,还有更多的“灵丹妙药”方式。

我还没有任何代码(搜索模式),但下面是代码,实现了相同问题的“慢速”版本:https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir

我的想法是让顶级进程(在每个图节点上产生一个进程)发送和接收消息,这将同步节点进程。必须指出的是,节点进程也相互通信:父进程在一个顶级循环迭代期间向子节点发送消息。然而,复杂的是,在顶层接收节点消息之后,以及在所有节点进行迭代之前,不应该继续任何进程(很可能我会使用尾递归)。这就是我为什么想到障碍机制的原因。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-12-02 19:38:35

我不确定这是否正是您所要寻找的,但是这里有一个基于java中的java.util.concurrent.CyclicBarrier类并发::ruby中的CyclicBarrier类的循环屏障。

代码语言:javascript
复制
defmodule CyclicBarrier do

  require Record
  Record.defrecordp :barrier, CyclicBarrier,
    pid: nil

  def start(parties, action \\ nil)
      when (is_integer(parties) and parties > 0)
      and (action === nil or is_function(action, 0)),
    do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))

  def stop(barrier(pid: pid)) do
    call(pid, :stop)
    true
  end

  def alive?(barrier(pid: pid)) do
    Process.alive?(pid)
  end

  def broken?(barrier(pid: pid)) do
    case call(pid, :status) do
      :waiting ->
        false
      _ ->
        true
    end
  end

  def number_waiting(barrier(pid: pid)) do
    case call(pid, :number_waiting) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def parties(barrier(pid: pid)) do
    case call(pid, :parties) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def reset(barrier(pid: pid)) do
    case call(pid, :reset) do
      :reset ->
        true
      :broken ->
        true
      _ ->
        false
    end
  end

  def wait(barrier = barrier()),
    do: wait(nil, barrier)

  def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
    case call(pid, :wait, timeout) do
      :fulfilled ->
        true
      :broken ->
        false
      :timeout ->
        reset(barrier)
        false
      _ ->
        false
    end
  end

  defp call(pid, request, timeout \\ nil) do
    case Process.alive?(pid) do
      false ->
        {:EXIT, pid, :normal}
      true ->
        trap_exit = Process.flag(:trap_exit, true)
        Process.link(pid)
        ref = make_ref()
        send(pid, {ref, self(), request})
        case timeout do
          nil ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            end
          _ ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            after
              timeout ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                :timeout
            end
        end

    end
  end

  defmodule Server do

    require Record
    Record.defrecordp :state_data,
      waiting: 0,
      parties: nil,
      action:  nil,
      q:       :queue.new()

    def init(parties, action),
      do: loop(:waiting, state_data(parties: parties, action: action))

    defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
      do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
    defp loop(state_name, sd) do
      receive do
        {ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
          handle(state_name, request, {ref, pid}, sd)
      end
    end

    defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
      do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
    defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:waiting, :reset, from, sd = state_data(q: q)),
      do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
    defp handle(:broken, :reset, from, sd = state_data(q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:broken, :wait, from, sd) do
      cast(from, :broken)
      loop(:broken, sd)
    end
    defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
      cast(from, number_waiting)
      loop(state_name, sd)
    end
    defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
      cast(from, parties)
      loop(state_name, sd)
    end
    defp handle(state_name, :status, from, sd) do
      cast(from, state_name)
      loop(state_name, sd)
    end
    defp handle(_state_name, :stop, _from, _sd) do
      exit(:normal)
    end

    defp broadcast(q, message),
      do: for from <- :queue.to_list(q),
        do: cast(from, message)

    defp cast({ref, pid}, message),
      do: send(pid, {ref, message})

    defp done(state, action, q, continue \\ true) do
      run(action)
      broadcast(q, state)
      case continue do
        true ->
          :waiting
        false ->
          state
      end
    end

    defp run(nil),
      do: nil
    defp run(action),
      do: action.()

  end

end

下面是在CyclicBarrier shell中使用IEx的一个例子:

代码语言:javascript
复制
iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]

进程执行的确切顺序是不确定的。

CyclicBarrier上的其他函数示例如下:

代码语言:javascript
复制
iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false
票数 3
EN

Stack Overflow用户

发布于 2019-11-07 17:39:54

我已经在Hex,原语上发表了一个库,它就是这样做的!GitHub上的源代码和广泛的单元测试(100%的测试覆盖率)。

看一看,让我知道,如果我可以在任何时候改进它!

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33972112

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档