首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark Structured Kafka偏移管理

Spark Structured Kafka偏移管理
EN

Stack Overflow用户
提问于 2019-05-16 11:02:18
回答 2查看 1K关注 0票数 3

我正在研究在kafka中存储kafka偏移量用于Spark结构化流媒体,就像它对DStreams stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)一样,与我正在寻找的相同,但用于结构化流媒体。它是否支持结构化流媒体?如果是,我如何实现它?

我知道使用.option("checkpointLocation", checkpointLocation)的hdfs检查点,但我对内置的偏移量管理很感兴趣。

我期望kafka只在内部存储偏移量,而不使用spark hdfs检查点。

EN

回答 2

Stack Overflow用户

发布于 2019-06-13 10:50:14

我正在使用在某个地方找到的这段代码。

代码语言:javascript
复制
public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

在记录处理成功后调用SaveOffset...is,否则不存储偏移量。我使用Kafka主题作为源,所以我指定startingoffsets作为从ReadOffsets检索到的偏移量……

票数 0
EN

Stack Overflow用户

发布于 2021-01-22 00:23:12

“它是否支持结构化流?”

不,在结构化流中不支持将偏移量提交回Kafka,类似于使用火花流(DStreams)可以完成的操作。Kafka specific configurations上的Spark Structured + Kafka集成指南对此非常精确:

"Kafka源不提交任何偏移量。“

关于这个问题,我用How to manually set groupId and commit Kafka offsets in Spark Structured Streaming写了一个更全面的答案。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56160211

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档