首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >对象FlinkKafkaConsumer010不是package org.apache.flink.streaming.connectors.kafka的成员

对象FlinkKafkaConsumer010不是package org.apache.flink.streaming.connectors.kafka的成员
EN

Stack Overflow用户
提问于 2020-06-01 02:37:52
回答 2查看 347关注 0票数 0

我正在尝试汇编一个小程序,以连接到卡夫卡主题使用apache flink。我需要使用FlinkKafkaConsumer010。

代码语言:javascript
复制
package uimp
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}
import java.util.Properties

object Silocompro {
  def main(args: Array[String]): Unit = {
 // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val propertiesTopicDemographic = new Properties()
    propertiesTopicDemographic.setProperty("bootstrap.servers", "bigdata.dataspartan.com:19093")
    propertiesTopicDemographic.setProperty("group.id", "demographic")

    val myConsumerDemographic = new FlinkKafkaConsumer010[String]("topic_demographic", new 
    SimpleStringSchema(), propertiesTopicDemographic)

    val messageStreamDemographic = env
      .addSource(myConsumerDemographic)
      .print()


    env.execute("Flink Scala API Skeleton")

   }
 }

我的问题是,当试图用这个包汇编我的程序时,编译器返回一个错误“build.sbt FlinkKafkaConsumer010不是包org.apache.flink.streaming.connectors.kafka的成员”:

代码语言:javascript
复制
      ThisBuild / resolvers ++= Seq("Apache Development Snapshot Repository" at 
      "https://repository.apache.org/content/repositories/snapshots/",Resolver.mavenLocal)

      name := "silocompro"

      version := "1.0"

      organization := "uimp"

      ThisBuild / scalaVersion := "2.12.11"

      val flinkVersion = "1.9.0"

      val flinkDependencies = Seq(
         "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
         "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
         "org.apache.flink" %% "flink-core"% flinkVersion % "provided",
         "org.apache.flink" %% "flink-connector-kafka-base" % flinkVersion % "provided",
         "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
         "org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "provided")

      lazy val root = (project in file(".")).
      settings( libraryDependencies ++= flinkDependencies)


      assembly / mainClass := Some("uimp.Silocompro")

      Compile / run  := Defaults.runTask(Compile / fullClasspath,
                               Compile / run / mainClass,
                               Compile / run / runner
                              ).evaluated

 
      Compile / run / fork := true
      Global / cancelable := true

      assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

这个依赖错误的原因是什么?

EN

回答 2

Stack Overflow用户

发布于 2020-06-01 04:11:49

连接器不是flink-binary的一部分,这意味着您需要在compile作用域中具有连接器,因此这基本上意味着您需要从这些依赖项中删除provided。在此设置中,应用程序将在集群上运行。

但是,如果你想在不启动集群的情况下在本地运行它,那么你应该在compile作用域中拥有所有的provided依赖项,即删除所有flink作用域声明。

票数 0
EN

Stack Overflow用户

发布于 2020-06-01 16:31:04

最后,我遇到了依赖关系的问题。我做了一些动作:

  1. 我已经添加了一个新的解析器https:/oss.sonatype.org/content/repositories
  2. 我已经从VS Code
  3. 卸载了插件金属(Scala)我已经添加了“org.apache.flink”%% "flink-connector-kafka-0.10"%flinkVersion - tomy

在这些操作之后,我解决了我的库依赖问题。谢谢

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62120851

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档