首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >pyspark结构化流式处理不使用query.lastProgress或其他标准指标更新查询指标

pyspark结构化流式处理不使用query.lastProgress或其他标准指标更新查询指标
EN

Stack Overflow用户
提问于 2020-11-18 09:41:16
回答 1查看 228关注 0票数 2

我正在尝试将日志记录添加到我的pyspark结构化流应用程序中,以便查看每个微批处理的进度和统计数据。writestream方法使用foreach编写器将数据帧中的行写入postgres数据库。我正在使用.lastProgress和其他由pyspark提供的标准指标来记录日志。writestream方法和我的日志尝试如下所示。

代码语言:javascript
复制
query_1 = eventsDF \
    .writeStream \
    .foreach(writer) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint_a/") \
    .trigger(processingTime="5 seconds") \
    .start()


query_progress =  query_1.lastProgress
print("progress ", query_progress)
print("status ", query_1.status)
print("active ", query_1.isActive)

query_1.awaitTermination()

我的第一个循环的结果是:

代码语言:javascript
复制
progress  None
status  {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
active  True

但是,当事件数据到达时,进一步的批处理不会导致更多的日志记录消息。我预计日志记录消息将在流式作业中的每个微批处理后发出。

我很感谢任何建议或指导。谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-08 04:12:42

startawaitTermination之间的所有代码只执行一次。只有loadstart之间的代码会在每个查询触发器上连续执行。

根据"Spark - the definitive Guide“一书的说法,这种监控方式是在你的应用程序内部运行的。但是,对于独立应用程序,您通常没有附加shell来运行任意代码。在书中,他们建议“通过实现监视服务器来公开查询状态,例如侦听端口并在收到请求时返回query.status的小型HTTP服务器。

因此,您需要创建一个专用的可运行线程,该线程频繁调用查询的监视API。我真的不熟悉Python,但它基本上看起来如下所示:

代码语言:javascript
复制
# import the threading module 
import threading  
  
class thread(threading.Thread):  
    def __init__(self, query):  
        threading.Thread.__init__(self)  
        self.query = query  
  
        # helper function to execute the threads 
    def run(self):  
        print("progress ", query.lastProgress);  

完成此操作后,您需要将其放在startawaitTermination之间

代码语言:javascript
复制
query_1 = eventsDF \
    [...]
    .start()

monitoring = thread(query_1)

query_1.awaitTermination()

您还可以使用while(query_1.isActive)遍历查询的状态,而不是使用专用线程。

对于Scala用户:

How to get progress of streaming query after awaitTermination?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64885800

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档