我有一个工作正常的嵌入式作业,我想部署更多的同地作业。这些新增的工作将收到来自旧工作的信息,并将其发送到kafka主题。
代码如下
@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvider provider = new FunctionProvider();
binder.bindFunctionProvider( CoLocated.TYPE,provider );
binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
}
}我得到一个错误如下所示
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)这个错误很容易解释,想让我给入口下个定义。
在链接-> https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module中有一个类似定义的嵌入式模块。
新定义的模块将接收来自另一个模块的消息,并将它们发送给kafka。
,
上通信是否就足够了?
发布于 2020-05-04 16:19:13
响应是内联的,而且FYI -没有什么是你要求的-具体位置。这些属性适用于包含共存和远程混合工作负载的远程模块和作业。
,我必须为每一份同地工作定义入口吗?如果不是的话,我怎样才能做到这一点呢?
是的,每项工作(远程或并置)至少需要一个入口。入口是一个将来自外部世界的消息消耗到一个状态有趣的应用程序中的通道。想想卡夫卡或动感。如果没有入口,作业将永远不会做任何事情,因为将没有初始消息开始处理。
对于每个入口,您将绑定一个或多个路由器,这些路由器从入口接收每条消息,并根据其函数types1将它们转发到0或多个函数。
,我怎样才能得到同地办公的工作来沟通呢?使用相同的FunctionType就足够了吗?
是的,函数只是使用它们的函数类型相互传递消息。
是在入口/出口上通信的共存功能吗?
不,使用Apache运行时在函数之间传递消息,该运行库包含高度优化的网络堆栈。一旦信息被从入口拉出来,它就再也不会与入口交互了。如果感兴趣,您可以在社区撰写的一些博客文章中了解Flink的网络堆栈是如何工作的,但这并不一定要在production2中成功地使用状态乐趣。
https://stackoverflow.com/questions/61578082
复制相似问题