首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >阿帕奇光束不支持Kotlin Iterable?

阿帕奇光束不支持Kotlin Iterable?
EN

Stack Overflow用户
提问于 2019-04-29 18:32:14
回答 5查看 1.1K关注 0票数 10

Apache似乎拒绝承认Kotlin的Iterable。下面是一个示例代码:

代码语言:javascript
复制
@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}

我得到了以下奇怪的错误:

代码语言:javascript
复制
java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

当然,如果我用Iterable替换java.lang.Iterable,那么相同的代码就能正常工作。我做错了什么?

退行性疾病的版本:

  • kotlin-jvm:1.3.21
  • org.apache.beam:2.11.0

下面是一个包含完整代码和堆栈跟踪的要点:

更新

经过一些尝试和错误之后,我发现虽然List<String>抛出了类似的异常,但MutableList<String>实际上是有效的:

代码语言:javascript
复制
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
        val output = input.key + "|" + input.value.toString()
        println("output: $output")
        receiver.output(output)
    }
}

因此,这提醒我,Kotlin的不可变集合实际上只是接口,底层集合仍然是可变的。但是,试图将Iterable替换为MutableIterable将继续引发错误。

更新2

我使用上面的MutableList per部署了我的Kotlin作业,作业失败了:

代码语言:javascript
复制
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

我不得不切换回使用java.lang.Iterable

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2020-03-16 04:00:28

我在使用ParDo后的GroupByKey时也遇到了这个问题。结果表明,在编写接受@JvmWildcard结果的转换时,Iterable泛型中需要一个GroupByKey注释。

请参阅下面的人为示例,该示例按每一行的第一个字符读取文件和组。

代码语言:javascript
复制
class BeamPipe {
  class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
      receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
  }

  fun pipe(options: PipelineOptions) {
    val file =
        "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
        .apply("Key lines by first character",
            WithKeys.of { line: String -> line[0].toString() }
                .withKeyType(TypeDescriptors.strings()))
        .apply("Group lines by first character", GroupByKey.create<String, String>())
        .apply("Concatenate lines", ParDo.of(ConcatLines()))
        .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
            .by { it.key }
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
            .to("whatever")
            .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
        )
    p.run()
  }
}
票数 6
EN

Stack Overflow用户

发布于 2019-05-08 09:39:24

这看起来像中的一个bug。您的@ProcessElement方法的反射分析不能正常工作。您可能可以通过使用ProcessContext ctx而不是使用@Element参数来解决这个问题。

票数 3
EN

Stack Overflow用户

发布于 2019-04-29 23:03:26

我对kotlin不是很熟悉,但是在代码中使用import java.lang.Iterable之前,您似乎需要导入它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55908999

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档