首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >以连续方式将数据从自定义源写入flink

以连续方式将数据从自定义源写入flink
EN

Stack Overflow用户
提问于 2017-07-31 14:48:37
回答 2查看 4.9K关注 0票数 1

这是我第一次使用Apache Flink (1.3.1)并有问题。更详细地说,我正在使用flink-core、flink-cep和flink-streaming库。我的应用程序是一个Akka ActorSystem,它使用来自RabbitMQ的消息,不同的参与者处理这些消息。在一些参与者中,我想从Flink实例化一个StreamExecutionEnvironment并处理传入的消息。因此,我编写了一个自定义源类来扩展RichSourceFunction类。一切正常,除了一件事:我不知道如何发送数据到我的Flink扩展。下面是我的设置:

代码语言:javascript
复制
public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

这是我的自定义源函数:

代码语言:javascript
复制
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

所以我想从外部调用FlinkExtension类中的sendData方法,以连续的方式将数据写入我的FlinkExtension。下面是我的JUnit测试,它应该将数据发送到扩展,然后将数据写入SourceContext

代码语言:javascript
复制
@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

但是如果我运行测试,什么也没有发生,应用程序在CustomSourceFunction的run方法中挂起。我还尝试在CustomSourceFunction run方法中创建一个新的无限线程。

总而言之:有人知道如何以连续的方式将数据从应用程序写入Flink实例吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-07-31 23:44:47

Flink源连接器通过让它们的run()方法在while(run)循环内调用collect() (或collectWithTimestamp())来发出连续的数据流。如果您想研究一个示例,那么Apache的NiFi源代码并不像大多数here's its run method那么复杂。

票数 2
EN

Stack Overflow用户

发布于 2018-11-13 13:05:55

问题是run方法和sendData方法使用了不同的CustomSourceFunction对象实例。因此,context对象不会在方法之间共享,添加新的ValueEvent也不起作用。

要解决此问题,请将run方法使用的对象实例存储为CustomSourceFunction类的静态成员变量。当您需要创建一个新的ValueEvent时,在这个相同的对象实例上调用sendData方法。

请参阅下面的示例代码

代码语言:javascript
复制
package RuleSources;

import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.ArrayList;

public class DynamicRuleSource extends AlertingRuleSource {
    private static DynamicRuleSource sourceObj;

    private SourceContext<Rule> ctx;

    public static DynamicRuleSource getSourceObject() {
        return sourceObj;
    }

    public void run(SourceContext<Rule> ctx) throws Exception {
        this.ctx = ctx;
        sourceObj = this;
        while(true) {
            Thread.sleep(100);
        }
    }

    public void addRule(Rule rule) {
        ctx.collect(rule);
    }

    @Override
    public void cancel() {
    }
}

添加新规则的步骤

代码语言:javascript
复制
 public static void addRule(Rule rule) throws Exception {
        AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
        sourceObject.addRule(rule);
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45409101

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档