首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Elixir中的文件读取数据时恢复Enum.each操作

从Elixir中的文件读取数据时恢复Enum.each操作
EN

Stack Overflow用户
提问于 2018-09-29 13:40:58
回答 1查看 71关注 0票数 0

我让这种Enum.each经历了很多次。

代码语言:javascript
复制
exids
|> Enum.each(fn (exid) ->
  request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
  |> Enum.sort |> Enum.each(fn (year) ->
    request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
    |> Enum.sort |> Enum.each(fn (month) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
      |> Enum.sort |> Enum.each(fn (day) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
        |> Enum.sort |> Enum.each(fn (hour) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
          |> Enum.sort |> Enum.each(fn (file) ->
            exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            save_current_directory(exid, year, month, day, hour, file)
          end)
        end)
      end)
    end)
  end)
  next_exid_index = Enum.find_index(exids, fn(x) -> x == exid end)
  File.write!("#{@root_dir}/moving_old_data", "#{Enum.at(exids, next_exid_index + 1)}")
end)

这是一个很长时间的运行循环,但它不处理任何停止和恢复逻辑。

我尝试将当前数据保存到一个文件中,并在重新启动时将其恢复为

代码语言:javascript
复制
exids
|> clean_already_completed(0)
|> Enum.each(fn (exid) ->
  request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
  |> Enum.sort |> clean_already_completed(1) |> Enum.each(fn (year) ->
    request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
    |> Enum.sort |> clean_already_completed(2) |> Enum.each(fn (month) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
      |> Enum.sort |> clean_already_completed(3) |> Enum.each(fn (day) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
        |> Enum.sort |> clean_already_completed(4) |> Enum.each(fn (hour) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
          |> Enum.sort |> clean_already_completed(5) |> Enum.each(fn (file) ->
            exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
            save_current_directory(exid, year, month, day, hour, file)
          end)
        end)
      end)
    end)
  end)
  next_exid_index = Enum.find_index(exids, fn(x) -> x == exid end)
  File.write!("#{@root_dir}/moving_old_data", "#{Enum.at(exids, next_exid_index + 1)}")
end)

clean_already_completed的工作原理是这样的。

代码语言:javascript
复制
  defp clean_already_completed(list, index), do: get_recent_value(index) |> dont_reduce?(list)

  defp dont_reduce?(nil, list), do: list
  defp dont_reduce?(last, list), do: Enum.drop_while(list, fn el -> el != last end)

  defp get_recent_value(index), do: read_recent_file() |> Enum.at(index)

  defp read_recent_file, do: File.read("#{@root_dir}/moving_old_data") |> file_is_present()

  defp file_is_present({:error, :enoent}), do: []
  defp file_is_present({:ok, ""}), do: []
  defp file_is_present({:ok, data}), do: data |> String.split(" ")

但当我停止并重新启动它时,它并没有像预期的那样工作,甚至在运行的时候也是如此。它跳过接下来的一年、几天和几个月。

在保存的文件中,我有这样的数据

代码语言:javascript
复制
1-granby-row 2017 03 18 05 43_49_000.jpg

例如,在失败时,我只想从这些值恢复每个循环。上面的代码行被解释为exid year month day hour file

有没有更好的方法呢?如果整个循环再次开始,从文件信息中恢复吗?

EN

回答 1

Stack Overflow用户

发布于 2018-09-29 14:52:44

一个更好的方法是将这个过程分成更小的步骤:

  1. 运行嵌套的each语句以构建需要移动的所有文件的清单。
  2. 将这些文件的完整路径存储到永久数据存储中,如Redis、Postgres或Mnesia。
  3. 现在会遍历所有这些路径,移动文件并在传输文件时将其标记为“完成”,无论出于何种原因,当应用程序崩溃或进程在中途停止时,您可以重新启动应用程序,并从您停止的地方继续。由于您已经有了所有文件、其路径和状态的已解析列表,因此您只需选择标记为“未完成”的下一个路径,然后继续移动它。

正如我在前面提到的in my other answer,这一切都可以使用作业处理库来完成;您所需要做的就是解析所有路径,并将它们添加为需要处理的作业。在出现错误、崩溃和其他失败的情况下,它们可以重新启动已失败的“作业”,并恢复尚未完成的作业。

如果您决定在没有作业处理库的情况下执行此操作,则代码如下:

代码语言:javascript
复制
# Steps 1 and 2
def build_manifest do
  all_files =
    exids
    |> Enum.map(fn exid -> "#{@seaweedfs}/#{exid}/snapshots/recordings/" end)
    |> Enum.flat_map(&get_dir_paths/1)         # Get Year directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Month directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Day directories
    |> Enum.flat_map(&get_dir_paths/1)         # Get Hour directories
    |> Enum.flat_map(&get_file_paths/1)        # Get files in the hour directories

  # Now store these files to a datastore
end


# Steps 3 & 4
def copy_all_files do
  case get_next_incomplete_file() do
    nil ->
      IO.puts("All files have been copied")

    path ->
      if exist_on_seaweed?(path)
        copy_file(path)
      end

      copy_all_files()
  end
end


defp get_dir_paths(path) do
  path
  |> request_from_seaweedfs("Directories", "Name")
  |> Enum.sort
  |> Enum.map(&Path.join(path, &1))
end

defp get_file_paths(path) do
  (path <> "?limit=3600")
  |> request_from_seaweedfs("Files", "Name")
  |> Enum.sort
  |> Enum.map(&Path.join(path, &1))
end

defp get_next_incomplete_file do
  # query your datastore to get the next file that is
  # marked "not done"
end

defp copy_file(path) do
  # copy the file
  # and then mark it "done" in the datastore
end
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52565477

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档