在我们的Azure数据湖,我们有记录事件的每日文件和这些事件的坐标。我们需要获取这些坐标,并查找这些坐标属于哪个州、县、镇和区段。我尝试过这段代码的几个版本。
这就是我被困的地方。我无法可靠地将行写入ADLS中的文件。这是我现在有的代码。
public static void WriteGeocodedOutput(string Contents, String outputFileName, ILogger log) {
AdlsClient client = AdlsClient.CreateClient(ADlSAccountName, adlCreds);
//if the file doesn't exist write the header first
try {
if (!client.CheckExists(outputFileName)) {
using (var stream = client.CreateFile(outputFileName, IfExists.Fail)) {
byte[] headerByteArray = Encoding.UTF8.GetBytes("EventDate, Longitude, Latitude, RadarSiteID, CellID, RangeNauticalMiles, Azimuth, SevereProbability, Probability, MaxSizeinInchesInUS, StateCode, CountyCode, TownshipCode, RangeCode\r\n");
//stream.Write(headerByteArray, 0, headerByteArray.Length);
client.ConcurrentAppend(outputFileName, true, headerByteArray, 0, headerByteArray.Length);
}
}
} catch (Exception e) {
log.LogInformation("multiple attempts to create the file. Ignoring this error, since the file was created.");
}
//the write the data
byte[] textByteArray = Encoding.UTF8.GetBytes(Contents);
for (int attempt = 0; attempt < 5; attempt++) {
try {
log.LogInformation("prior to write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
var offset = client.GetDirectoryEntry(outputFileName).Length;
client.ConcurrentAppend(outputFileName, false, textByteArray, 0, textByteArray.Length);
log.LogInformation("AFTER write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
//if successful, stop trying to write this row
attempt = 6;
}
catch (Exception e){
log.LogInformation($"exception on adls write: {e}");
}
Random rnd = new Random();
Thread.Sleep(rnd.Next(attempt * 60));
}
}该文件将在需要时创建,但我在日志中确实收到了几条消息,其中有几个线程试图创建它。我并不总是把标题行写下来。
我也不再只获得任何数据行:
"BadRequest ( IllegalArgumentException concurrentappend failed with error 0xffffffff83090a6f
(Bad request. The target file does not support this particular type of append operation.
If the concurrent append operation has been used with this file in the past, you need to append to this file using the concurrent append operation.
If the append operation with offset has been used in the past, you need to append to this file using the append operation with offset.
On the same file, it is not possible to use both of these operations.). []我觉得我错过了一些基本的设计理念。代码应该尝试将一行写入文件。如果该文件尚不存在,请创建它并将标题行放入其中。然后,放进这一排。
实现这种写作场景的最佳实践方法是什么?
对于如何在ADLS中处理这种并行写入工作负载,还有其他建议吗?
发布于 2019-01-28 21:08:23
我有点晚了,但我想问题之一可能是因为在同一个文件流中使用了"Create“和"ConcurrentAppend”?ADLS文档提到不能在同一个文件上使用它们。也许,尝试将" create“命令更改为"ConcurrentAppend”,因为如果文件不存在,可以使用后者来创建文件。
另外,如果您找到了更好的方法,请在这里张贴您的解决方案。
https://stackoverflow.com/questions/51971326
复制相似问题