我想测试RMQSource类来接收来自RabbitMQ的数据,但是我不知道如何为我的交换机配置兔子虚拟主机,我认为这就是我遇到的问题。我的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object rabbitjob {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
def main (args:Array[String]){
env.execute("Test Rabbit")
}
} IntelliJ IDE中的错误:Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
Error:(10, 29) not enough arguments for method addSource: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$7. val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
知道如何解决这个问题吗?提前谢谢你。
发布于 2016-06-08 15:01:14
您所看到的错误是Scala编译时错误,它是由于一些需要的导入不存在而引起的。无论何时使用Flink Scala,都应该包括以下内容:
import org.apache.flink.api.scala._
这将解决您正在进行的编译时问题。
发布于 2016-10-08 14:57:07
事情随着时间的推移而改变。请看一下RMQConnectionConfig:在这里您可以找到通过构建器模式指定虚拟主机的方法。
发布于 2016-06-07 09:07:58
您还需要提供vhost名称。看看AMQP URI规范。
在您的例子中,整个AMQP看起来应该是"user:pass@192.168.1.11:5672/TestVHost"。
https://stackoverflow.com/questions/37673609
复制相似问题