我正在尝试从一个'excel‘文件中读取一个火花DataFrame。我利用了克赖克主义的依赖性。
没有任何预定义的架构,所有行都将正确读取,但仅作为字符串类型列读取。
为了防止这种情况,我使用了自己的模式(其中我提到了某些列为Integer类型),但在本例中,大多数行在读取文件时都会删除。
Build.sbt中使用的库依赖项:
"com.crealytics" %% "spark-excel" % "0.11.1",
Scala version - 2.11.8
Spark version - 2.3.2val inputDF = sparkSession.read.excel(useHeader = true).load(inputLocation(0))上面的数据读取所有数据-大约25000行。
但,
val inputWithSchemaDF: DataFrame = sparkSession.read
.format("com.crealytics.spark.excel")
.option("useHeader" , "false")
.option("inferSchema", "false")
.option("addColorColumns", "true")
.option("treatEmptyValuesAsNulls" , "false")
.option("keepUndefinedRows", "true")
.option("maxRowsInMey", 2000)
.schema(templateSchema)
.load(inputLocation)这只给我450行。有什么办法可以防止这种情况吗?提前感谢!(编辑)
发布于 2022-01-03 12:27:51
到目前为止,我还没有找到解决这个问题的方法,,但是我尝试通过手动类型转换以不同的方式解决它。为了在代码行数方面做得更好,我求助于for循环。我的解决办法如下:
步骤1:创建我自己的模式,类型为“StructType”:
val requiredSchema = new StructType()
.add("ID", IntegerType, true)
.add("Vendor", StringType, true)
.add("Brand", StringType, true)
.add("Product Name", StringType, true)
.add("Net Quantity", StringType, true)步骤2:类型在数据从excel文件中读取(没有自定义模式)后进行转换(而不是在读取数据时使用模式):
def convertInputToDesiredSchema(inputDF: DataFrame, requiredSchema: StructType)(implicit sparkSession: SparkSession) : DataFrame =
{
var schemaDf: DataFrame = inputDF
for(i <- inputDF.columns.indices)
{
if(inputDF.schema(i).dataType.typeName != requiredSchema(i).dataType.typeName)
{
schemaDf = schemaDf.withColumn(schemaDf.columns(i), col(schemaDf.columns(i)).cast(requiredSchema.apply(i).dataType))
}
}
schemaDf
}这可能不是一个有效的解决方案,但比键入过多行代码来键入多列要好。
我仍然在为我原来的问题寻找一个解决方案。
这个解决方案是为了防止有人想尝试,并且迫切需要快速修复.。
发布于 2022-03-30 23:35:40
下面是一个解决办法,使用PySpark,使用由“字段名”和“dataType”组成的模式:
# 1st load the dataframe with StringType for all columns
from pyspark.sql.types import *
input_df = spark.read.format("com.crealytics.spark.excel") \
.option("header", isHeaderOn) \
.option("treatEmptyValuesAsNulls", "true") \
.option("dataAddress", xlsxAddress1) \
.option("setErrorCellsToFallbackValues", "true") \
.option("ignoreLeadingWhiteSpace", "true") \
.option("ignoreTrailingWhiteSpace", "true") \
.load(inputfile)# 2nd Modify the datatypes within the dataframe using a file containing column names and the expected data type.
dtypes = pd.read_csv("/dbfs/mnt/schema/{}".format(file_schema_location), header=None).to_records(index=False).tolist()
fields = [StructField(dtype[0], globals()[f'{dtype[1]}']()) for dtype in dtypes]
schema = StructType(fields)
for dt in dtypes:
colname =dt[0]
coltype = dt[1].replace("Type","")
input_df = input_df.withColumn(colname, col(colname).cast(coltype))https://stackoverflow.com/questions/70540400
复制相似问题