首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >纱线上的Spring XD :无法将kafka源文件流式传输到hdfs接收器

纱线上的Spring XD :无法将kafka源文件流式传输到hdfs接收器
EN

Stack Overflow用户
提问于 2015-07-29 00:37:47
回答 1查看 254关注 0票数 0

纱线上的Spring XD :无法将kafka源文件流式传输到hdfs接收器

我已经启动并运行了一个合适的HDFS资源管理器。我能够在YARN中成功地启动管理服务器和容器。

但我无法将Kafka(源)流式传输到HDFS(接收器)

我配置了为Kafka(源)和hdfs(宿)提供的自定义模块。但是,当我为一个主题生成一条kafka消息时,YARN集群中什么也没有发生。

设置详细信息:

HDFS / YARN apache版本2.6.0

纱线上的弹簧XD - Spring -xd-1.2.0 yarn.zip YARN.zip

EN

回答 1

Stack Overflow用户

发布于 2015-08-05 15:48:07

我刚刚用ambari部署了XD集群,然后在YARN上部署了XD,这两种方法都运行得很好。对于yarn,我使用spring-xd-1.2.1.RELEASE-yarn.zip,并且只使用ambari部署的xd-shell。我只是从集群中获取设置,并使用ambari的postgred db,其中我与springxd用户一起创建了xdjob数据库。

我的config/servers.yml在bin/xd-yarn push之前是这样的。

代码语言:javascript
复制
xd:
  appmasterMemory: 512M
  adminServers: 1
  adminMemory: 512M
  adminJavaOpts: -XX:MaxPermSize=128m
  adminLocality: false
  containers: 3
  containerMemory: 512M
  containerJavaOpts: -XX:MaxPermSize=128m
  containerLocality: false

spring:
  yarn:
    applicationBaseDir: /xd/yarn/
---
xd:
  container:
    groups: yarn
---
spring:
  yarn:
    siteYarnAppClasspath: "/etc/hadoop/conf,/usr/hdp/current/hadoop-client/*,/usr/hdp/current/hadoop-client/lib/*,/usr/hdp/current/hadoop-hdfs-client/*,/usr/hdp/current/hadoop-hdfs-client/lib/*,/usr/hdp/current/hadoop-yarn-client/*,/usr/hdp/current/hadoop-yarn-client/lib/*"
    siteMapreduceAppClasspath: "/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*"
    config:
      mapreduce.application.framework.path: '/hdp/apps/2.2.6.0-2800/mapreduce/mapreduce.tar.gz#mr-framework'
      mapreduce.application.classpath: '$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/2.2.6.0-2800/hadoop/lib/hadoop-lzo-0.6.0.2.2.4.2-2.jar:/etc/hadoop/conf/secure'
---
spring:
  hadoop:
    fsUri: hdfs://ambari-2.localdomain:8020
    resourceManagerHost: ambari-3.localdomain
    resourceManagerPort: 8050
    resourceManagerSchedulerAddress: ambari-3.localdomain:8030
    jobHistoryAddress: ambari-3.localdomain:10020
---
zk:
  namespace: xd
  client:
    connect: ambari-3.localdomain:2181
    sessionTimeout: 60000
    connectionTimeout: 30000
    initialRetryWait: 1000
    retryMaxAttempts: 3

---
xd:
  customModule:
    home: ${spring.hadoop.fsUri}/xd/yarn/custom-modules

---
xd:
  transport: kafka
  messagebus:
    kafka:
      brokers: ambari-2.localdomain:6667
      zkAddress: ambari-3.localdomain:2181

---
spring:
  datasource:
    url: jdbc:postgresql://ambari-1.localdomain/xdjob
    username: springxd
    password: springxd
    driverClassName: org.postgresql.Driver
    validationQuery: select 1
---
server:
  port: 0
---
spring:
  profiles: admin
management:
  port: 0

你可以使用time源代码,但我使用的是http源代码。查找正确的地址使用runtime containersruntime modules查看http源程序的运行位置。在xd中,您可以使用admininfo来查询xd-yarn,以查看xd管理在何处运行。这是必需的,以便您可以将xd-shell连接到在yarn上运行的管理员。

代码语言:javascript
复制
xd:>admin config server http://ambari-2.localdomain:50254
Successfully targeted http://ambari-2.localdomain:50254

然后让我们使用sink/source来创建kafka流。

代码语言:javascript
复制
xd:>stream create httpToKafkaStream --definition "http | kafka --topic=mytopic --brokerList=ambari-2.localdomain:6667" --deploy
xd:>stream create kafkaToHdfsStream --definition "kafka --zkconnect=ambari-3.localdomain:2181 --topic=mytopic --outputType=text/

xd:>http post --target http://ambari-5.localdomain:9000 --data "message1"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message2"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message3"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message4"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message5"

xd:>hadoop fs cat /xd/kafkaToHdfsStream/kafkaToHdfsStream-0.txt
message1
message2
message3
message4
message5

注意:您需要创建mytopic,目前kafka源码如果不存在将无法启动。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31682304

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档