首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何访问pyspark中的"_corrupt_record“列?

如何访问pyspark中的"_corrupt_record“列?
EN

Stack Overflow用户
提问于 2021-07-08 10:18:38
回答 3查看 127关注 0票数 2

我在处理坏记录和文件(CSV)时遇到了一些问题。这是我的CSV文件

代码语言:javascript
复制
+------+---+---+----+
|  Name| ID|int|int2|
+------+---+---+----+
| Sohel|  1|  4|  33|
| Sohel|  2|  5|  56|
| Sohel|  3|  6| 576|
| Sohel|  a|  7| 567|
|Sohel2|  c|  7| 567|
+------+---+---+----+

我正在使用预定义架构读取此文件

代码语言:javascript
复制
schema = StructType([
  StructField("Name",StringType(),True),
  StructField("ID",IntegerType(),True),
  StructField("int",IntegerType(),True),
  StructField("int2",IntegerType(),True),
  StructField("_corrupt_record", StringType(),True) 
  ])
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema, 
columnNameOfCorruptRecord='_corrupt_record')

结果就是

代码语言:javascript
复制
+------+----+---+----+---------------+
|  Name|  ID|int|int2|_corrupt_record|
+------+----+---+----+---------------+
| Sohel|   1|  4|  33|           null|
| Sohel|   2|  5|  56|           null|
| Sohel|   3|  6| 576|           null|
| Sohel|null|  7| 567|  Sohel,a,7,567|
|Sohel2|null|  7| 567| Sohel2,c,7,567|
+------+----+---+----+---------------+

它给了我我所期望的结果,但问题从这里开始,我只想访问那些"_corrupt_record“并创建一个新的df。我确实在df中过滤了"_corrupt_record“,但它似乎原始的CSV文件没有"_corrupt_record”列,这就是为什么它给我错误的原因。

badRows = df.filter("_corrupt_record不为空“).show()

错误消息

代码语言:javascript
复制
Error while reading file dbfs:/tmp/test_file/test_csv.csv.
Caused by: java.lang.IllegalArgumentException: _corrupt_record does not exist. Available: Name, ID, int, int2

我是流动数据库文档,https://docs.databricks.com/data/data-sources/read-csv.html#read-files,但他们也有同样的错误,然后为什么他们甚至添加到文档上!!

我只想访问“_corrupt_record”列并创建新的DF。任何帮助或建议都将是感恩的。

EN

回答 3

Stack Overflow用户

发布于 2021-07-08 12:45:52

您需要添加enforceSchema=True

代码语言:javascript
复制
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema, 
enforceSchema=True, columnNameOfCorruptRecord='_corrupt_record')

这应该会显示corrupt record列。

票数 0
EN

Stack Overflow用户

发布于 2021-07-08 14:17:34

试试这个,因为我可以看到DF已经被创建了-

df = df.filter(F.col("_corrupt_record").isNotNull())

票数 0
EN

Stack Overflow用户

发布于 2021-11-08 16:36:41

问题在于您创建的dataframe df不是增量表

尝试将df的内容存储到增量表中:

代码语言:javascript
复制
%sql
CREATE TABLE IF NOT EXISTS df_delta_temp
 USING delta AS
   SELECT *
   FROM df;

如果您现在要查询您的增量表df_delta_temp,您的数据将会出现。

您的增量表将在本地创建到您的工作区中(它将出现在左侧菜单的数据刀片中)。为了保持环境的整洁,您可以在精化结束时删除增量表(以这种方式,它将用作临时表)。

代码语言:javascript
复制
%sql
DROP TABLE IF EXISTS df_delta_temp

我相信这篇有洞察力的文章会给你一些关于这个话题的有趣的知识:https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68294839

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档