我在处理坏记录和文件(CSV)时遇到了一些问题。这是我的CSV文件
+------+---+---+----+
| Name| ID|int|int2|
+------+---+---+----+
| Sohel| 1| 4| 33|
| Sohel| 2| 5| 56|
| Sohel| 3| 6| 576|
| Sohel| a| 7| 567|
|Sohel2| c| 7| 567|
+------+---+---+----+我正在使用预定义架构读取此文件
schema = StructType([
StructField("Name",StringType(),True),
StructField("ID",IntegerType(),True),
StructField("int",IntegerType(),True),
StructField("int2",IntegerType(),True),
StructField("_corrupt_record", StringType(),True)
])
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema,
columnNameOfCorruptRecord='_corrupt_record')结果就是
+------+----+---+----+---------------+
| Name| ID|int|int2|_corrupt_record|
+------+----+---+----+---------------+
| Sohel| 1| 4| 33| null|
| Sohel| 2| 5| 56| null|
| Sohel| 3| 6| 576| null|
| Sohel|null| 7| 567| Sohel,a,7,567|
|Sohel2|null| 7| 567| Sohel2,c,7,567|
+------+----+---+----+---------------+它给了我我所期望的结果,但问题从这里开始,我只想访问那些"_corrupt_record“并创建一个新的df。我确实在df中过滤了"_corrupt_record“,但它似乎原始的CSV文件没有"_corrupt_record”列,这就是为什么它给我错误的原因。
badRows = df.filter("_corrupt_record不为空“).show()
错误消息
Error while reading file dbfs:/tmp/test_file/test_csv.csv.
Caused by: java.lang.IllegalArgumentException: _corrupt_record does not exist. Available: Name, ID, int, int2我是流动数据库文档,https://docs.databricks.com/data/data-sources/read-csv.html#read-files,但他们也有同样的错误,然后为什么他们甚至添加到文档上!!
我只想访问“_corrupt_record”列并创建新的DF。任何帮助或建议都将是感恩的。
发布于 2021-07-08 12:45:52
您需要添加enforceSchema=True。
df = spark.read.csv('dbfs:/tmp/test_file/test_csv.csv', header=True, schema=schema,
enforceSchema=True, columnNameOfCorruptRecord='_corrupt_record')这应该会显示corrupt record列。
发布于 2021-07-08 14:17:34
试试这个,因为我可以看到DF已经被创建了-
df = df.filter(F.col("_corrupt_record").isNotNull())
发布于 2021-11-08 16:36:41
问题在于您创建的dataframe df不是增量表。
尝试将df的内容存储到增量表中:
%sql
CREATE TABLE IF NOT EXISTS df_delta_temp
USING delta AS
SELECT *
FROM df;如果您现在要查询您的增量表df_delta_temp,您的数据将会出现。
您的增量表将在本地创建到您的工作区中(它将出现在左侧菜单的数据刀片中)。为了保持环境的整洁,您可以在精化结束时删除增量表(以这种方式,它将用作临时表)。
%sql
DROP TABLE IF EXISTS df_delta_temp我相信这篇有洞察力的文章会给你一些关于这个话题的有趣的知识:https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1
https://stackoverflow.com/questions/68294839
复制相似问题