首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Spark读取文本文件数据,并使用comma - python拆分数据

使用Spark读取文本文件数据,并使用comma - python拆分数据
EN

Stack Overflow用户
提问于 2019-03-18 05:47:10
回答 3查看 3.8K关注 0票数 0

我有以下格式的数据。

代码语言:javascript
复制
abc, x1, x2, x3  
def, x1, x3, x4,x8,x9   
ghi, x7, x10, x11  

我想要的输出是

代码语言:javascript
复制
0,abc, [x1, x2, x3]  
1,def, [x1, x3, x4,x8,x9]  
2,ghi, [x7, x10, x11]
EN

回答 3

Stack Overflow用户

发布于 2019-03-18 06:19:16

您的数据不是CSV格式。CSV表示具有固定架构的逗号分隔的文本文件。您的数据的CSV为:

代码语言:javascript
复制
abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,

请注意数据中没有的第1行和第3行中的尾随逗号。

因为你有一个不是CSV的文本文件,所以在Spark中获得你想要的模式的方法是用Python语言读取整个文件,解析成你想要的内容,然后使用spark.crateDataFrame()。或者,如果在一个目录中有多个这样的文件,可以使用SparkContext.wholeTextFiles,然后使用flatMap解析函数。

假设您已经完成了像open("Your File.txt").readlines这样的操作,剩下的就很简单了:

代码语言:javascript
复制
import re
from pyspark.sql import *

lines = [
  "abc, x1, x2, x3",
  "def, x1, x3, x4,x8,x9",
  "ghi, x7, x10, x11"
]

split = re.compile("\s*,\s*")
Line = Row("id", "first", "rest")

def parse_line(id, line):
  tokens = split.split(line.strip)
  return Line(id, tokens[0], tokens.pop(0))

def parse_lines(lines):
  return [parse_line(i, x) for i,x in enumerate(lines)]

spark.createDataFrame(parse_lines(lines))
票数 2
EN

Stack Overflow用户

发布于 2019-03-18 20:14:03

您可以做的是首先使用zipWithIndex生成id,然后在map函数中使用r[0].split(",")[0]获取字符串的第一部分,使用r[0].split(",")[1:]获取第二部分。

下面是上面描述的代码:

代码语言:javascript
复制
from pyspark.sql.types import StringType

lines = ["abc, x1, x2, x3",
        "def, x1, x3, x4,x8,x9",
        "ghi, x7, x10, x11"]

df = spark.createDataFrame(lines, StringType())
df = df.rdd.zipWithIndex() \
           .map(lambda (r, indx): (indx, r[0].split(",")[0], r[0].split(",")[1:])) \
           .toDF(["id", "name", "x_col"])

df.show(10, False)

和输出:

代码语言:javascript
复制
+---+----+-----------------------+
|id |name|x_col                  |
+---+----+-----------------------+
|0  |abc |[ x1,  x2,  x3]        |
|1  |def |[ x1,  x3,  x4, x8, x9]|
|2  |ghi |[ x7,  x10,  x11]      |
+---+----+-----------------------+
票数 1
EN

Stack Overflow用户

发布于 2019-03-18 22:47:06

如果数据是以文件形式出现的,可以这样实现:

  1. 以CSV格式读取文件;
  2. 添加第一列为CSV的索引列,其余所有列均为数组。

在Scala上可以这样实现:

代码语言:javascript
复制
val df = spark.read.option("header", "false").csv("non-csv.txt")
val remainingColumns = df.columns.tail
df.withColumn("id", monotonically_increasing_id).
  select(
    col("id"),
    col(df.columns(0)),
    array(remainingColumns.head, remainingColumns.tail: _*)
  ).show(false)

输出:

代码语言:javascript
复制
+---+---+--------------------+
|id |_c0|array(_c1, _c2, _c3)|
+---+---+--------------------+
|0  |abc|[ x1,  x2,  x3]     |
|1  |def|[ x1,  x3,  x4]     |
|2  |ghi|[ x7,  x10,  x11]   |
+---+---+--------------------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55212255

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档