我检查了这类似但有7年历史的问题,但它不适用于较新的Flink版本。
我正在尝试运行一个简单的Flink Kafka作业,并尝试了不同的版本,为每个版本获取不同的编译错误。我使用sbt来管理我的依赖关系:
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)尝试过的版本:
scala 2.11.12和2.12.15
flink 1.14.6
我试图编译的代码(相关的位元):
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
...
val env = ExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = new KafkaSource.builder[String]
.setBootstrapservers("localhost:9092")
.setGroupId("flink")
.setTopics("test")
.build()
val text = env.fromSource(kafkaConsumer)我没有找到一个官方的例子,说明我们应该如何使用KafkaSource,但是我找到了这个设置,这里和这里。对于我仍然非常新的Java来说,这看起来与API文档是一致的。但这两种Scala版本都无法让它正常工作:
[error] somepathwithmyfile: type builder is not a member of object org.apache.flink.connector.kafka.source.KafkaSource
[error] val kafkaConsumer = new KafkaSource.builder[String]
[error] ^
[error] somepathwithmyfile: value fromSource is not a member of org.apache.flink.api.scala.ExecutionEnvironment
[error] val text = env.fromSource(kafkaConsumer)
[error] ^
[error] two errors found发布于 2022-10-06 10:26:19
对于第一个问题,删除new
val kafkaConsumer = KafkaSource.builder[String]
...对于第二个问题,fromSource需要三个参数:
/** Create a DataStream using a [[Source]]. */
@Experimental
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
}另外,请注意Flink不支持Scala2.12.15。见https://issues.apache.org/jira/browse/FLINK-20969。但是,如果不包括Flink的内置Scala支持,则可以将Flink 1.15用于Scala的较新版本(包括scala 3)。有关此问题的更多信息,请参见https://flink.apache.org/2022/02/22/scala-free.html。
https://stackoverflow.com/questions/73971448
复制相似问题