首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark如何将CSV读入数据帧,并对其进行操作

PySpark如何将CSV读入数据帧,并对其进行操作
EN

Stack Overflow用户
提问于 2016-10-30 17:56:48
回答 1查看 38.3K关注 0票数 7

我对pyspark非常陌生,正在尝试使用它来处理一个保存为csv文件的大型数据集。我想将CSV文件读入spark dataframe,删除一些列,然后添加新列。我该怎么做呢?

我在将这些数据放入数据帧时遇到了问题。这是我目前所学内容的精简版本:

代码语言:javascript
复制
def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write \
        .format("com.databricks.spark.redshift") \
        .option("url", "jdbc:redshift://<...>") \
        .option("dbtable", "my_table_copy") \
        .option("tempdir", "s3n://path/for/temp/data") \
        .mode("append") \
        .save()

    sc.stop()

这会在reduce步骤中产生一个错误TypeError: 'JavaPackage' object is not callable

有可能做到这一点吗?reducing to a dataframe的想法是能够将结果数据写入数据库(Redshift,使用spark-redshift包)。

我也尝试过在partial()中使用unionAll()map(),但都不能正常工作。

我使用亚马逊的EMR、spark-redshift_2.10:2.0.0和亚马逊的JDBC driver RedshiftJDBC41-1.1.17.1017.jar来运行它。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-10-30 20:13:23

更新-在评论中回答你的问题:

将数据从CSV读取到数据帧:您似乎只尝试将CSV文件读取到spark数据帧中。

如果是这样的话--我的答案是:https://stackoverflow.com/a/37640154/5088142覆盖这个。

下面的代码应该将CSV读入spark-data-frame

代码语言:javascript
复制
import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)

df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("/path/to_csv.csv"))

// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv")

drop列

您可以使用" drop (col)“https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html删除列

drop(col)

代码语言:javascript
复制
Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

添加列您可以使用"withColumn“https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colName,col)

代码语言:javascript
复制
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters: 

    colName – string, name of the new column.
    col – a Column expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

注意: spark还有很多其他的功能可以使用(例如,你可以使用"select“而不是"drop")

票数 11
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40327859

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档