首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >hsync()不适用于SequenceFile编写器

hsync()不适用于SequenceFile编写器
EN

Stack Overflow用户
提问于 2015-03-09 23:53:46
回答 1查看 626关注 0票数 2

我有一个小程序,它每秒向HDFS上的块压缩同步写入10条记录,然后每5分钟运行一次SequenceFile (),以确保超过5分钟的所有内容都可用于处理。

因为我的代码只有几行,所以我只提取了重要的部分:

代码语言:javascript
复制
// initialize

Configuration hdfsConfig = new Configuration();

CompressionCodecFactory codecFactory = new CompressionCodecFactory(hdfsConfig);
CompressionCodec compressionCodec = codecFactory.getCodecByName("default");

SequenceFile.Writer writer = SequenceFile.createWriter(
    hdfsConfig,
    SequenceFile.Writer.file(path),
    SequenceFile.Writer.keyClass(LongWritable.class),
    SequenceFile.Writer.valueClass(Text.class),
    SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK;, compressionCodec)
);

// ...


// append

LongWritable key = new LongWritable((new Date).getTime());
Text val = new Text("Some value");
writer.append(key, val);

// ...

// then every 5 minutes...

logger.info("about to sync...");
writer.hsync();
logger.info("synced!");

仅从日志来看,同步操作似乎就像预期的那样工作,但是,HDFS上的文件仍然很小。一段时间后,可能会添加一些头部和一些事件,但频率甚至接近我的hsync()。一旦文件关闭,所有内容都会立即刷新。

在每次预期的同步之后,还会尝试手动检查文件的内容以查看数据是否存在,但是,文件在此处也显示为空: hdfs dfs -text filename

是否有任何已知的原因导致writer.hsync()无法工作,如果是这样,是否有解决方法?

此问题的进一步测试用例:

代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.IOException;

import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;

public class WriteTest {
    private static final Logger LOG = LoggerFactory.getLogger(WriteTest.class);

    public static void main(String[] args) throws Exception {

        SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        CompressionCodec compressionCodec;
        String compressionCodecStr = "default";
        CompressionCodecFactory codecFactory;
        Configuration hdfsConfig = new Configuration();

        codecFactory = new CompressionCodecFactory(hdfsConfig);
        compressionCodec = codecFactory.getCodecByName(compressionCodecStr);

        String hdfsURL = "hdfs://10.0.0.1/writetest/";

        Date date = new Date();

        Path path = new Path(
            hdfsURL,
            "testfile" + date.getTime()
        );

        SequenceFile.Writer writer = SequenceFile.createWriter(
            hdfsConfig,
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class),
            SequenceFile.Writer.compression(compressionType, compressionCodec),
            SequenceFile.Writer.file(path)
        );

        for(int i=0;i<10000000;i++) {

            Text value = new Text("New value!");
            LongWritable key = new LongWritable(date.getTime());

            writer.append(key, value);
            writer.hsync();

            Thread.sleep(1000);
        }

        writer.close();
    }
}

结果是在开始写入sequencefile时有一个fsync,然后没有更多的fsync。文件关闭后,内容将写入光盘。

EN

回答 1

Stack Overflow用户

发布于 2016-06-07 02:10:42

这里有多个问题。

  1. 块压缩

当您对序列文件使用块压缩时,这意味着许多条目将在内存中缓冲,然后在达到限制或手动调用sync时以块压缩的形式写入。

当您在编写器上调用hsync时,它会在其底层FSDataOutputStream上调用hsync。但是,这不会写入位于内存压缩缓冲区中的数据。因此,要将数据可靠地发送到Datanode,必须先调用sync,然后再调用hsync

请注意,这样做意味着发送到Datanode的块压缩部分包含的条目比通常要少。这会对压缩质量产生负面影响,并可能导致更多的光盘使用。(我猜这就是hsync不在内部调用sync的原因。)

报告给Namenode的

  1. 文件大小

调用fsync会将数据发送到数据节点,但不会向名称节点报告新的文件大小。有关这方面的技术讨论可以在herehere上找到。显然,每次更新长度都不利于性能。有一个特殊版本的hsync,它允许更新名称节点信息,但它不会被SequenceFile.Writer公开。

代码语言:javascript
复制
    * @param syncFlags
    *          Indicate the semantic of the sync. Currently used to specify
    *          whether or not to update the block length in NameNode.
    */
    public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
        flushOrSync(true, syncFlags);
    }

另一方面,对于压缩类型RecordNone,SequenceFile.Reader中存在一个错误。对于这些压缩类型,阅读器使用长度信息来确定要读取的距离。由于此长度信息不会由hsync更新,因此即使数据实际上可用,它也会错误地停止读取。Block压缩阅读显然不使用长度信息,也不会受到此错误的影响。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28946308

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档