首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink Kafka :期待类型成为PojoTypeInfo

Flink Kafka :期待类型成为PojoTypeInfo
EN

Stack Overflow用户
提问于 2021-08-18 03:02:20
回答 2查看 348关注 0票数 1

我的客户类已经使用maven-avro plugin.When创建了,我尝试运行这个程序,我得到的是Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo错误。

[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name

[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.example.Customer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.我正在使用java 8

我的客户类是由maven avro插件创建的,具有特定的记录类型。

请帮帮我,我已经花了5天时间在这件事上还没有解决

我尝试了3种不同的方法,我把它们称为方法1,方法2。下面

代码语言:javascript
复制
package com.example
import com.typesafe.config.ConfigException.Generic
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.scala.createTypeInformation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema


import java.util.Properties
object flink_kafka_avro extends App  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "customer-consumer-group-v1")
    properties.put("auto.commit.enable", "false")
    properties.put("auto.offset.reset", "earliest")
    

    import org.apache.avro.Schema
    import org.apache.avro.reflect.ReflectData


    val schema = ReflectData.get.getSchema(classOf[Customer])
    // Method 1 not working
    //val ss = new FlinkKafkaConsumer[Customer]("customer-avro", AvroDeserializationSchema.forSpecific(classOf[Customer]),properties)
    val schemaRegistryUrl = "http://localhost:8081"
    //Method 2
    val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[Customer]("customer-avro",
        ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[Customer],schemaRegistryUrl), properties).setStartFromEarliest())
    userKafkaReaderResult.print()
    //Method 3

    // I tried like this it is not working even

    //val strenew = FlinkKafkaConsumer[Customer]("test_topic", AvroDeserializationSchema.forSpecific(classOf[Customer]), properties).setStartFromEarliest
    //env.addSource(ss).print()
    env.execute()


}

我的POM文件

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>org.example</groupId>
    <artifactId>kafkaavrov1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>



    <properties>
        <avro.version>1.8.2</avro.version>
        <kafka.version>0.11.0.1</kafka.version>
        <confluent.version>3.3.1</confluent.version>
    </properties>

    <!--necessary to resolve confluent dependencies-->
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>


    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.2</version>

        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro-confluent-registry</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.13.2</version>
        </dependency>



        <!--Only dependency needed for the avro part-->
        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!--dependencies needed for the kafka part-->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-runner</artifactId>
            <version>1.2.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-params</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>dd
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <stringType>String</stringType>
                            <createSetters>false</createSetters>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                            <fieldVisibility>private</fieldVisibility>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>target/generated-sources/avro</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>
EN

回答 2

Stack Overflow用户

发布于 2022-02-01 10:24:31

除非Customer扩展org.apache.avro.specific.SpecificRecordBase,否则Flink不会将其视为Avro类型,并将尝试使用其POJO序列化程序对其进行序列化。如果它失败了(就像在这里一样),它将退回到将其作为一个泛型来处理,并将使用Kryo。

票数 1
EN

Stack Overflow用户

发布于 2022-10-04 22:29:07

这是由POM中的一行引起的: false Flink Avro util将检查该类中每个字段的get/set方法。如果两个都没有找到,就会抛出错误。

我用"createSetters=true“来固定我的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68826065

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档