首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkStreaming应用程序太慢

SparkStreaming应用程序太慢
EN

Stack Overflow用户
提问于 2017-05-02 13:13:54
回答 1查看 539关注 0票数 0

在开发SparkStreaming应用程序(python)时,我不太清楚它是如何工作的。我只需读取一个json文件流(在目录中弹出)并对每个json对象和一个引用执行连接操作,然后将其写回文本文件。这是我的代码:

代码语言:javascript
复制
config = configparser.ConfigParser()
config.read("config.conf")

def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
    globals()["sparkSessionSingletonInstance"] = SparkSession \
        .builder \
        .config(conf=sparkConf) \
        .getOrCreate()
return globals()["sparkSessionSingletonInstance"]

# Création du contexte
sc = SparkContext()
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"]))
sqlCtxt = getSparkSessionInstance(sc.getConf())
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"])
df_ref.createOrReplaceTempView("REF")
df_ref.cache()
output = config["Paths"]["path_DATAs_enri"]


# Fonction de traitement des DATAs
def process(rdd):
        if rdd.count() > 0:
                #print(rdd.toDebugString)
                df_DATAs = sqlCtxt.read.json(rdd)
                df_DATAs.createOrReplaceTempView("DATAs")
                df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID")
                df_enri.createOrReplaceTempView("DATAs_enri")
                df_enri.write.mode('append').json("file://" + output)
                if(df_enri.count() < df_DATAs.count()):
                        df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)")
                        df_fail.show()


# Configuration du stream et lancement
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"])
files.foreachRDD(process)
print("[GO]")
ssc.start()
ssc.awaitTermination()

这里是我的星火配置:

代码语言:javascript
复制
spark.master                    local[*]
spark.executor.memory           3g
spark.driver.memory             3g
spark.python.worker.memory      3g
spark.memory.fraction           0.9
spark.driver.maxResultSize      3g
spark.memory.storageFraction    0.9
spark.eventLog.enabled          true

嗯,这是可行的,但我有一个问题:这个过程是缓慢的,而且过程的延迟也在增加。我在当地工作*,恐怕没有家长制.在监视UI中,我一次只看到一个执行器和一个作业。有更简单的方法吗?就像DStream上的转换函数一样?我缺少一个配置变量吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-05-02 14:04:14

好吧,有几个原因可以说明您的代码是缓慢的。

关于工人,正如我所看到的,我没有看到你设定工人人数的任何地方。因此,它将从默认的工作人员数开始,这意味着可能有1人。在另一边,您正在读取的文件可能没有那么大,而且side没有执行并行操作。

另一方面,您需要取消代码的几个步骤:

  1. 您有很多计数:if rdd.count() > 0:; if(df_enri.count() < df_DATAs.count()):,计数昂贵,流数据是减少阶段,您正在做3倍的计数。
  2. 连接也很昂贵,在流过程中做一个连接也不是很好,你做了正确的df_ref.cache(),但是,join做了洗牌,而且代价很高。

我建议您不要执行失败步骤,将其从代码中删除。它不起作用,只是不要保存数据。另一件事是,设置更多的工作人员或更多的核心来执行:spark.executor.cores=2,如您所能看到的这里

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43738857

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档