首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >2022年Flink的KafkaSource在Scala中的应用

2022年Flink的KafkaSource在Scala中的应用
EN

Stack Overflow用户
提问于 2022-10-06 09:22:52
回答 1查看 132关注 0票数 1

我检查了类似但有7年历史的问题,但它不适用于较新的Flink版本。

我正在尝试运行一个简单的Flink Kafka作业,并尝试了不同的版本,为每个版本获取不同的编译错误。我使用sbt来管理我的依赖关系:

代码语言:javascript
复制
val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)

尝试过的版本:

scala 2.11.12和2.12.15

flink 1.14.6

我试图编译的代码(相关的位元):

代码语言:javascript
复制
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource

...

  val env = ExecutionEnvironment.getExecutionEnvironment
  val kafkaConsumer = new KafkaSource.builder[String]
    .setBootstrapservers("localhost:9092")
    .setGroupId("flink")
    .setTopics("test")
    .build()

  val text = env.fromSource(kafkaConsumer)

我没有找到一个官方的例子,说明我们应该如何使用KafkaSource,但是我找到了这个设置,这里这里。对于我仍然非常新的Java来说,这看起来与API文档是一致的。但这两种Scala版本都无法让它正常工作:

代码语言:javascript
复制
[error] somepathwithmyfile: type builder is not a member of object org.apache.flink.connector.kafka.source.KafkaSource
[error]     val kafkaConsumer = new KafkaSource.builder[String]
[error]                                         ^
[error] somepathwithmyfile: value fromSource is not a member of org.apache.flink.api.scala.ExecutionEnvironment
[error]     val text = env.fromSource(kafkaConsumer)
[error]                    ^
[error] two errors found
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-06 10:26:19

对于第一个问题,删除new

代码语言:javascript
复制
 val kafkaConsumer = KafkaSource.builder[String]
   ...

对于第二个问题,fromSource需要三个参数:

代码语言:javascript
复制
  /** Create a DataStream using a [[Source]]. */
  @Experimental
  def fromSource[T: TypeInformation](
      source: Source[T, _ <: SourceSplit, _],
      watermarkStrategy: WatermarkStrategy[T],
      sourceName: String): DataStream[T] = {

    val typeInfo = implicitly[TypeInformation[T]]
    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
  }

另外,请注意Flink不支持Scala2.12.15。见https://issues.apache.org/jira/browse/FLINK-20969。但是,如果不包括Flink的内置Scala支持,则可以将Flink 1.15用于Scala的较新版本(包括scala 3)。有关此问题的更多信息,请参见https://flink.apache.org/2022/02/22/scala-free.html

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73971448

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档