我们使用的是SeaweedFS,它是一个文件系统来存储(图像)文件,它作为一个rest应用程序接口工作。我们正在尝试将数据从一台服务器移动到另一台服务器。
有几个级别的数据目录。存储图像的基本模式是
http://{server}:8888/ballymore-project-wave/snapshots/recordings/{year}/{month}/{day}/{hour}/00_00_000.jpg目录的每一级都有自己的返回,返回形式为JSON,例如
{
"Path": "/ballymore-project-wave/snapshots/recordings/",
"Files": null,
"Directories": [
{
"Name": "2016",
"Id": 91874
},
{
"Name": "2017",
"Id": 1538395
}
],
"Limit": 100,
"LastFileName": "",
"ShouldDisplayLoadMore": false
}当您尝试获取录制的年份时,上面的响应是针对月、日和小时的。在获取单个小时时会有细微的变化,因为
{
"Path": "/ballymore-project-wave/snapshots/recordings/2016/11/02/01/",
"Files": [
{
"name": "00_00_000.jpg",
"fid": "29515,744a5a496b97ff98"
},
{
"name": "00_01_000.jpg",
"fid": "29514,744a5aa52ea3cf3d"
}
],
"Directories": null,
"Limit": 100,
"LastFileName": "02_15_000.jpg",
"ShouldDisplayLoadMore": true
}现在我们需要将所有这些数据从一台服务器移动到另一台服务器。我为它写了一个脚本,如下
defp move_snapshots(exids) do
exids
|> Enum.each(fn (exid) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (year) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (month) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (day) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (hour) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
|> Enum.sort |> Enum.each(fn (file) ->
exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
|> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
end)
end)
end)
end)
end)
end)
end这是主函数,exids意味着所有摄像头的字符串类型标识,对于上面的例子,它是ballymore-project-wave。
在上面的脚本中,我检查每一级,如果有东西存在,我会更深入,直到最后,我检查它是否是一个有效的图像,如下所示
defp exist_on_seaweed?(url) do
hackney = [pool: :seaweedfs_download_pool, recv_timeout: 30_000_000]
case HTTPoison.get("#{@seaweedfs}#{url}", ["Accept": "application/json"], hackney: hackney) do
{:ok, %HTTPoison.Response{status_code: 200, body: data}} -> {:ok, data}
_error ->
:not_found
end
end
defp copy_or_skip(:not_found, _path), do: :noop
defp copy_or_skip({:ok, data}, path) do
hackney = [pool: :seaweedfs_upload_pool]
case HTTPoison.post("#{@seaweedfs_new}#{path}", {:multipart, [{path, data, []}]}, [], hackney: hackney) do
{:ok, _response} -> Logger.info "[seaweedfs_save]"
{:error, error} -> Logger.info "[seaweedfs_save] [#{inspect error}]"
end
end这一切都工作得很好,但我有一个轻微的问题,当它被崩溃或由于某些原因损坏时,我需要指导/想法。正如你可以看到,如果相机exids是200,它在100或更少的时候坏了,它会恢复,但从一开始,我们不能在移动后删除旧服务器上的东西,直到完全移动,任何帮助都将不胜感激。此外,如果您认为代码中可以有一些改进,这将是有帮助的。
发布于 2018-09-26 13:51:27
除非你发布了实际的堆栈跟踪或遇到的错误的详细信息,否则不可能准确地找出问题所在。但对于初学者来说,这里有一些可能会有所帮助的建议:
move_snapshots方法分解成更容易理解的东西,也许可以使用像Enum.reduce/3这样的递归,并调用你的copy_or_skip方法作为基例。copy_or_skip方法实现包装在一个try/rescue中,拯救任何异常,记录它们并继续下一个异常。defp copy_or_skip(args,path) do # Your Implementation rescue -> Logger.error("Exception could on #{inspect(path)}\n#{inspect( error )}") end
Que或Toniq等作业处理库中添加一些"Worker“的有效路径。该库将执行所有移动操作,并将其标记为成功或失败。然后,您可以返回查看失败的操作并找出原因,或者自动重新启动失败的操作。关于提高代码可靠性和性能的更多提示:
Stream或更好的Flow来划分任务,并在单独的Task进程(理想情况下由Supervisor管理)中处理实际移动操作。(可选使用池)。https://stackoverflow.com/questions/52510360
复制相似问题