首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何提高写星火DataFrame到Redis的速度?

如何提高写星火DataFrame到Redis的速度?
EN

Stack Overflow用户
提问于 2020-05-20 20:47:21
回答 1查看 590关注 0票数 0

我正在开发一个基于Flask的图书推荐API,我发现要管理多个请求,我需要预先计算相似度矩阵,并将其存储在某个地方以供将来查询。这个矩阵是使用PySpark基于大约150万个带有图书id、名称和元数据的数据库条目创建的,结果可以用这个模式来描述(ij用于图书索引,dot用于它们的元数据的相似性):

代码语言:javascript
复制
StructType(List(StructField(i,IntegerType,true),StructField(j,IntegerType,true),StructField(dot,DoubleType,true)))

最初,我打算使用spark-redis连接器将其存储在Redis上。然而,以下命令似乎以非常慢的速度工作(即使初始图书数据库查询大小被限制为非常适中的40k批处理):

代码语言:javascript
复制
similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").option("key.column", "i").save()

在9个阶段中的3个阶段中,Spark将初始任务分成了3个阶段,大约花了6个小时。奇怪的是,Spark executors的存储内存使用率非常低,大约为20kb。典型的stage active stage由Spark Application UI描述:

代码语言:javascript
复制
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

有没有可能以某种方式加速这个过程?我的Spark会话是这样设置的:

代码语言:javascript
复制
SUBMIT_ARGS = "  --driver-memory 2G --executor-memory 2G --executor-cores 4 --packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf().set("spark.jars", "spark-redis/target/spark-redis_2.11-2.4.3-SNAPSHOT-jar-with-dependencies.jar").set("spark.executor.memory", "4g")
sc = SparkContext('local','example', conf=conf) 
sql_sc = SQLContext(sc)
EN

回答 1

Stack Overflow用户

发布于 2020-05-20 23:03:49

您可以尝试使用Append保存模式,以避免检查表中是否已存在数据:

代码语言:javascript
复制
similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").mode('append').option("key.column", "i").save()

此外,您可能想要更改

代码语言:javascript
复制
sc = SparkContext('local','example', conf=conf) 

代码语言:javascript
复制
sc = SparkContext('local[*]','example', conf=conf) 

来利用你机器上的所有内核。

顺便说一句,在Redis中使用i作为密钥是正确的吗?它不应该是ij的组合吗

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61913509

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档