首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pandas的PySpark环境设置

Pandas的PySpark环境设置
EN

Stack Overflow用户
提问于 2021-07-15 20:31:01
回答 1查看 202关注 0票数 2

-编辑-

这个简单的例子只显示了3条记录,但我需要对数十亿条记录这样做,所以我需要使用Pandas,而不是仅仅将Spark转换为Pandas并使用简单的应用程序。

输入数据

期望输出

-END编辑-

我一直把头撞在墙上试图解决这个问题,我希望有人能帮我解决这个问题。我正在尝试将PySpark数据中的纬度/经度值转换为优步的H3十六进制系统。这是对函数h3.geo_to_h3(lat=lat, lng=lon, resolution=7)的非常直接的使用。但是,我的PySpark集群一直存在问题。

我正在使用以下命令设置PySpark集群,如databricks文章这里中所述:

  1. conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas h3 numpy python=3.7 conda-pack
  2. 然后conda init --all关闭并重新打开终端窗口
  3. conda activate pyspark_conda_env
  4. conda pack -f -o pyspark_conda_env.tar.gz

我将创建火花集群时创建的tar.gz文件包括在我的jupyter笔记本中,如spark = SparkSession.builder.master("yarn").appName("test").config("spark.yarn.dist.archives","<path>/pyspark_conda_env.tar.gz#environment").getOrCreate()

我的熊猫udf是这样设置的,我可以在一个单一节点星火集群上工作,但是现在在有多个工作节点的集群上遇到了麻烦:

代码语言:javascript
复制
#create udf to convert lat lon to h3 hex
def convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
    import h3 as h3
    import numpy as np
    if ((None in [lat, lon]) | (np.isnan(lat))):
        return None
    else:
        return (h3.geo_to_h3(lat=lat, lng=lon, resolution=7))

@f.pandas_udf('string', f.PandasUDFType.SCALAR)
def udf_convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
    import pandas as pd
    import numpy as np
    df = pd.DataFrame({'lat' : lat, 'lon' : lon})
    df['h3_res7'] = df.apply(lambda x : convert_to_h3(x['lat'], x['lon']), axis = 1)
    return df['h3_res7']

在使用熊猫udf创建新列并试图查看该列之后:

代码语言:javascript
复制
trip_starts = trip_starts.withColumn('h3_res7', udf_convert_to_h3(f.col('latitude'), f.col('longitude')))

我得到以下错误:

代码语言:javascript
复制
21/07/15 20:05:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 139 for reason Container marked as failed: container_1626376534301_0015_01_000158 on host: ip-xx-xxx-xx-xxx.aws.com. Exit status: -100. Diagnostics: Container released on a *lost* node.

我不知道在这里该怎么做,因为我已经尝试将记录的数量缩小到一个更易于管理的数目,并且仍然遇到这个问题。理想情况下,我想知道如何使用我链接的databricks博客文章中描述的PySpark环境,而不是在集群中运行引导脚本,因为公司的策略使得引导脚本更难运行。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-04 19:46:27

我最终解决了这个问题,将我的数据重新划分为更小的分区,每个分区中的记录更少。这为我解决了这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68400430

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档