我使用的是Cloudera的VM CDH 5.12、spark v1.6、kafka(由yum安装) v0.10、python 2.66和scala 2.10
下面是我正在运行的一个简单的spark应用程序。它从kafka获取事件,并在map reduce之后打印出来。
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()当我使用以下命令(Local)提交上述代码时,它运行正常
spark-submit --master local[2] --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>但是当我使用下面的命令(YARN)提交相同的代码时,它不起作用
spark-submit --master yarn --deploy-mode client --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>以下是在纱线上运行时生成的日志(将它们剪短,日志可能与上面提到的spark设置不同):
INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.134.143
ApplicationMaster RPC port: 0
queue: root.cloudera
start time: 1515766709025
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1515761416282_0010/
user: cloudera
40 INFO YarnClientSchedulerBackend: Application application_1515761416282_0010 has started running.
40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53694.
40 INFO NettyBlockTransferService: Server created on 53694
53 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
54 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56220 with 534.5 MB RAM, BlockManagerId(1, quickstart.cloudera, 56220)
07 INFO ReceiverTracker: Starting 1 receivers
07 INFO ReceiverTracker: ReceiverTracker started
07 INFO PythonTransformedDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: Slide time = 10000 ms
07 INFO KafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO KafkaInputDStream: Checkpoint interval = null
07 INFO KafkaInputDStream: Remember duration = 10000 ms
07 INFO KafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.KafkaInputDStream@7137ea0e
07 INFO PythonTransformedDStream: Slide time = 10000 ms
07 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO PythonTransformedDStream: Checkpoint interval = null
07 INFO PythonTransformedDStream: Remember duration = 10000 ms
07 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@de77734
10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.8 KB, free 534.5 MB)
10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.5 KB, free 534.5 MB)
20 INFO JobScheduler: Added jobs for time 1515766760000 ms
30 INFO JobScheduler: Added jobs for time 1515766770000 ms
40 INFO JobScheduler: Added jobs for time 1515766780000 ms在此之后,作业只是开始重复下面的行(经过流上下文设置的一些延迟),并且不打印kafka的流,而master local上的作业使用完全相同的代码。
有趣的是,每次kafka事件发生时,它都会打印以下行(图片中增加了spark memory设置)
请注意:
数据在kafka中,我可以看到在消费者控制台中,我也尝试增加执行者的内存(3G)和网络超时时间(800s),但没有成功
发布于 2018-01-16 14:12:18
您是否可以通过Yarn Resource Manager UI查看应用程序标准输出日志?
在运行应用程序列表中,遵循Yarn Resource Manager link.(http://localhost:8088).
希望这能有所帮助。
发布于 2018-01-21 15:26:15
在本地模式下,应用程序在一台机器上运行,您可以看到codes.When中给出的所有打印在集群上运行所有东西都在分布式模式下,在不同的机器/内核上运行,将无法看到给定的打印尝试使用yarn logs -applicationId命令获取spark生成的日志
发布于 2019-12-14 00:11:14
可能的情况是,您的别名没有在yarn节点上定义,或者由于其他原因没有在yarn节点上解析。
https://stackoverflow.com/questions/48272945
复制相似问题