首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >逐行阅读Alternative Spark

逐行阅读Alternative Spark
EN

Stack Overflow用户
提问于 2021-05-06 04:37:55
回答 3查看 67关注 0票数 0

我想要逐行读取一个用例,我需要从header读取数据,并将数据发送到Header和Trailer之间的所有记录Spark不允许逐行读取-我们如何实现这样的用例。

代码语言:javascript
复制
*H,TextStart,1244
I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1235
I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1244
I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd
*H,TextStart,1236
I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,
I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,
*T,TextEnd

预期的输出是从头部获取数据,并附加到头部和尾部记录。我不知道该怎么做

代码语言:javascript
复制
1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU,    
1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU ,    
1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU ,    
1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU,    
1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU, 

我使用了zip索引并填充了行号。

代码语言:javascript
复制
df = spark.read.text('/hdfsData/file.csv')
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_11 = spark.sql("select value.value ,index from linenumber where value.value like '*H,%' or value.value like '*T,%'")

现在,我计划连接df_11和df_1,并执行一些范围连接逻辑来选择值。但是,有没有其他有效的方法来达到同样的效果呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-05-07 04:07:38

我已经使用zip索引和窗口滞后函数实现了同样的功能。然而,这将在单个分区中完成,因为数据的性质要求它。提供适当的执行器内存。

代码语言:javascript
复制
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import *
df = spark.read.text('MQ_out.csv')
#Adding index column each row get its row numbers 
df_1 = df.rdd.map(lambda r: r).zipWithIndex().toDF(['value', 'index'])
df_1.createOrReplaceTempView("linenumber")
#zipindex creates array making back to string
df_2 = spark.sql("select value.value as value , index  from linenumber")
df_2.createOrReplaceTempView("linenumber1")
df_3 = spark.sql("select * from linenumber1 where value is not null or trim(value)!=''")
df_3.createOrReplaceTempView("linenumber2")
#Splitting and extracting the location value from header and assigning null
df_new = spark.sql("select value,case when value like '*H,%' then split(value,',')[2] else null end as location,index from linenumber2")
#Assign previous row value to next row if the current row is null
df_new=df_new.withColumn('newlocation',F.when(df_new.location>0,df_new.location).otherwise(F.last(df_new.location,ignorenulls=True).over(Window.orderBy("index"))))
#counting the number of , in data
df_new.withColumn('Comma_Count', size(split(col("value"), r",")) - 1)
df_new.createOrReplaceTempView("FinalData")
#remove Header and tralierrows which doesnt have needed number of commans
df_final=spark.sql("select value,newlocation as location from FinalData where Comma_Count = 11")
df_final.createOrReplaceTempView("FinalData_1")
#concat to bring the required data
df_write = spark.sql("select concat(value,newlocation) from FinalData_1")
df_write.text('Filename')
票数 0
EN

Stack Overflow用户

发布于 2021-05-14 05:14:28

我通过使用两个lag函数成功地做到了这一点

代码语言:javascript
复制
(df
    .withColumn('id', F.monotonically_increasing_id())            # make fake ID for window functions below
    .withColumn('lag_1', F.lag('value', 1).over(W.orderBy('id'))) # grab the previous row (offset 1)
    .withColumn('lag_2', F.lag('value', 2).over(W.orderBy('id'))) # grab the previous row (offset 2)
    .withColumn('heading', F                                      # make heading column based on availability of both lag_1 and lag_2
        .when(F.col('lag_1').startswith('*H,TextStart'), F.col('lag_1'))
        .when(F.col('lag_2').startswith('*H,TextStart'), F.col('lag_2'))
    )
    .where(F.col('heading').isNotNull())                          # remove header and trailer rows, which does not have proper heading
    .withColumn('value', F.concat(F.split(F.col('heading'), ',')[2], F.lit(','), F.col('value')))
                                                                  # concatenate heading number with current value
    .drop('id', 'lag_1', 'lag_2', 'heading')                      # remove temporary columns
    .show(20, False)
)

# +-------------------------------------------------------------------------------------------------------+
# |value                                                                                                  |
# +-------------------------------------------------------------------------------------------------------+
# |1244,I,000000001,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000062,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000002,GOOD,-000000001,DMGD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1235,I,000000035,GOOD,+000000004,DMGD,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# |1244,I,000000004,GOOD,-000000001,CRT,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       , |
# |1244,I,000000062,GOOD,+000000004,DPT,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       , |
# |1236,I,000000005,GOOD,-000000001,ABCD,+000000000,SOM ,00002,+000000000,ONHAND              ,LCU       ,|
# |1236,I,000000035,GOOD,+000000004,EFGF,+000000000,SOM ,00001,+000000000,ONHAND              ,LCU       ,|
# +-------------------------------------------------------------------------------------------------------+
票数 1
EN

Stack Overflow用户

发布于 2021-05-06 15:58:03

将文件读入RDD

代码语言:javascript
复制
val rdd = sc.textFile("/FileStore/tables/sf0.txt")

然后用逗号分隔文件的每一行,并过滤掉头部和尾部。

代码语言:javascript
复制
val rdd2 = rdd.map(x => x.split(",")).filter(x => x(0) != "*H" && x(0) != "*T")

接下来,将数组转换为可转换为dataframe的元组。如果我们跳过这一步,dataframe将只有一列,其中包含数组中的所有内容。

代码语言:javascript
复制
val finalRDD = rdd2.map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10)))

将RDD转换为dataframe

代码语言:javascript
复制
val myDF = finalRDD.toDF()

检查数据帧内容:

代码语言:javascript
复制
myDF.show()

+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
| _1|       _2|  _3|        _4|  _5|        _6|  _7|   _8|        _9|                 _10|       _11|
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
|  I|000000001|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000002|GOOD|-000000001|DMGD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|DMGD|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000004|GOOD|-000000001| CRT|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000062|GOOD|+000000004| DPT|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
|  I|000000005|GOOD|-000000001|ABCD|+000000000|SOM |00002|+000000000|ONHAND              |LCU       |
|  I|000000035|GOOD|+000000004|EFGF|+000000000|SOM |00001|+000000000|ONHAND              |LCU       |
+---+---------+----+----------+----+----------+----+-----+----------+--------------------+----------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67408448

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档