首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink + Kafka + JSON - java示例

Flink + Kafka + JSON - java示例
EN

Stack Overflow用户
提问于 2016-09-03 04:21:28
回答 2查看 11.3K关注 0票数 3

我正在尝试使用以下代码从Kafka主题中获取JSON:

代码语言:javascript
复制
public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}

问题是:

1)此程序未到期运行

代码语言:javascript
复制
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2)以上问题可能与DataStream没有类型有关。但是如果我试着做:

DataStream<String> messageStream = env.addSource(...

代码将不会因cannot resolve constructor FlinkKafkaConsumer09 ...而编译

pom.xml (重要部分):

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

我一直在寻找一些在Flink中使用JSON DeserializationSchema的代码,但没有成功。我刚刚在这个link找到了JSONKeyValueDeserializationSchema的单元测试

有没有人知道如何做正确的事情?

谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-09-03 14:02:21

您的错误在messageStream.map(new MapFunction<String, String>()行。您定义的mapFunction期望类型为String的输入和类型String的输出,但是由于您使用的是将String转换为com.fasterxml.jackson.databind.node.ObjectNodeJSONKeyValueDeserializationSchema,因此您的MapFunction实际上应该期望相同类型ObjectNode的输入。尝试下面的代码。

代码语言:javascript
复制
messageStream.map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode node) throws Exception {
            return "Kafka and Flink says: " + node.get(0);
        }
    });
票数 3
EN

Stack Overflow用户

发布于 2016-09-05 22:34:52

我跟踪了JSON answer,但是JSONKeyValueDeserializationSchema解析器步骤中引发了一个异常,即使对于简单的作为{"name":"John Doe"}也是如此。

抛出的代码是:

代码语言:javascript
复制
DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(ObjectNode node) throws Exception {
        return "Kafka and Flink says: " + node.get(0);
    }
}).print();

输出:

代码语言:javascript
复制
09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
    at java.lang.Thread.run(Thread.java:745)

我成功地使用了另一个反序列化模式JSONDeserializationSchema

代码语言:javascript
复制
        DataStream<ObjectNode> messageStream = env.addSource(
            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                    , new JSONDeserializationSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode value) throws Exception {
            return "Kafka and Flink says: " + value.get("key").asText();
        }
    }).print();
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39300183

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档