首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >c#并行写入Azure数据湖文件

c#并行写入Azure数据湖文件
EN

Stack Overflow用户
提问于 2018-08-22 16:28:56
回答 1查看 913关注 0票数 1

在我们的Azure数据湖,我们有记录事件的每日文件和这些事件的坐标。我们需要获取这些坐标,并查找这些坐标属于哪个州、县、镇和区段。我尝试过这段代码的几个版本。

  • 我试图在U中实现这一点。我甚至上传了一个实现Microsoft.SqlServer.Types.SqlGeography方法的自定义程序集,结果发现ADLA没有设置为执行逐行操作(如地理编码)。
  • 我将所有行提取到Server中,将坐标转换为SQLGeography,并构建了执行状态、县等查找的the代码。经过多次优化后,我把这个过程降到了大约700 to /行。(有了1.33亿行的待办事项,每天增加16k行,我们将在近3年内赶上。所以我把the并行化了,事情变得更好了,但还不够。
  • 我将took代码作为控制台应用程序构建,因为SqlGeography库实际上是一个.Net库,而不是原生Server产品。我能够使单线程处理降低t0 ~500 to。添加.NET的并行性(parallel.ForEach)并将我的机器的10/20内核扔到它上做了很多工作,但仍然不够。
  • 我试图将此代码重写为Azure函数,并逐个处理数据湖文件中的文件。大多数文件超时,因为它们需要超过10分钟的处理时间。因此,我更新了代码,以读取文件,并将这些行放入Azure队列存储中。然后,我有第二个Azure函数,它为队列中的每一行触发。其思想是,Azure函数比任何一台机器都能扩展得更大。

这就是我被困的地方。我无法可靠地将行写入ADLS中的文件。这是我现在有的代码。

代码语言:javascript
复制
public static void WriteGeocodedOutput(string Contents, String outputFileName, ILogger log) {

        AdlsClient client = AdlsClient.CreateClient(ADlSAccountName, adlCreds);
        //if the file doesn't exist write the header first
        try {
            if (!client.CheckExists(outputFileName)) {
                using (var stream = client.CreateFile(outputFileName, IfExists.Fail)) {
                    byte[] headerByteArray = Encoding.UTF8.GetBytes("EventDate, Longitude, Latitude, RadarSiteID, CellID, RangeNauticalMiles, Azimuth, SevereProbability, Probability, MaxSizeinInchesInUS, StateCode, CountyCode, TownshipCode, RangeCode\r\n");
                    //stream.Write(headerByteArray, 0, headerByteArray.Length);
                    client.ConcurrentAppend(outputFileName, true, headerByteArray, 0, headerByteArray.Length);
                }
            }
        } catch (Exception e) {
            log.LogInformation("multiple attempts to create the file. Ignoring this error, since the file was created.");
        }

        //the write the data
        byte[] textByteArray = Encoding.UTF8.GetBytes(Contents);
        for (int attempt = 0; attempt < 5; attempt++) {
            try {
                log.LogInformation("prior to write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
                var offset = client.GetDirectoryEntry(outputFileName).Length;
                client.ConcurrentAppend(outputFileName, false, textByteArray, 0, textByteArray.Length);
                log.LogInformation("AFTER write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
                //if successful, stop trying to write this row
                attempt = 6;                    
            }
            catch (Exception e){
                log.LogInformation($"exception on adls write: {e}");
            }
            Random rnd = new Random();
            Thread.Sleep(rnd.Next(attempt * 60));
        }
    }

该文件将在需要时创建,但我在日志中确实收到了几条消息,其中有几个线程试图创建它。我并不总是把标题行写下来。

我也不再只获得任何数据行:

代码语言:javascript
复制
"BadRequest ( IllegalArgumentException  concurrentappend failed with error 0xffffffff83090a6f 
(Bad request. The target file does not support this particular type of append operation. 
If the concurrent append operation has been used with this file in the past, you need to append to this file using the concurrent append operation.
If the append operation with offset has been used in the past, you need to append to this file using the append operation with offset. 
On the same file, it is not possible to use both of these operations.). []

我觉得我错过了一些基本的设计理念。代码应该尝试将一行写入文件。如果该文件尚不存在,请创建它并将标题行放入其中。然后,放进这一排。

实现这种写作场景的最佳实践方法是什么?

对于如何在ADLS中处理这种并行写入工作负载,还有其他建议吗?

EN

回答 1

Stack Overflow用户

发布于 2019-01-28 21:08:23

我有点晚了,但我想问题之一可能是因为在同一个文件流中使用了"Create“和"ConcurrentAppend”?ADLS文档提到不能在同一个文件上使用它们。也许,尝试将" create“命令更改为"ConcurrentAppend”,因为如果文件不存在,可以使用后者来创建文件。

另外,如果您找到了更好的方法,请在这里张贴您的解决方案。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51971326

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档