我正在使用火花放电结构的流和读取数据从卡夫卡主题,这是Json复杂的格式。
我使用Spark Structred流格式作为Kafka格式,代码如下-
spark = SparkSession.builder \
.appName("PythonSparkStreamingKafka") \
.getOrCreate()
kafkaStreamDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "main.test.mysql.main.test_bank_data") \
.option("startingOffsets", "earliest") \
.load()
kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")
message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))
consoleOutput = kafkaStreamDF2.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()我从消息中提取数据,直到kafka json消息的Payload部分,并将其输出到控制台,如下所示-
|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|现在,我想提取后部分的数据,并以下面的方式读取数据格式的归档数据-
transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt| balance_amt
20 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL12071 | 16/08/2020 | 129000.00 | (null) | 7320950.00
21 | 409000611074 | 16/08/2020 | INDO GIBL Indiaforensic STL13071 | 16/08/2020 | 230013.00 | (null) | 7090937.00请建议我如何实现这个预期的输出在一个电火花数据?
加在卡夫卡信息的确切价值下面-
{“模式”:{“类型”:“结构”,“字段”:[{“类型”:“结构”,“字段”:{“类型”:“int32”,“可选”:“transaction_id”},{“类型”:“int64”,“可选”:false,“字段”:“account_no”}},{“类型”:“int32”,“可选”:真,“名称”:“io.debezium.time.Date”,“版本”:1,“transaction_date”},{“transaction_date”},{“类型”:“string”,“可选”:true,“transaction_details”},{“transaction_details”},{“transaction_details”:“int32”,“可选”:真,“名称”:“io.debezium.time.Date”,“版本”:1,“字段”:“value_date”},{“类型”:“字节”,“可选”:true,“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“withdrawal_amt”},{“类型”:“字节”,“可选”:true,“connect.decimal.precision”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“deposit_amt”},{“类型”:“字节”,“可选”:真,"name":"org.apache.kafka.connect.data.Decimal",“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“balance_amt”},“可选”:真,"name":"main.test.mysql.main.test_bank_data.Value",“字段”:“前面”},{“类型”:“结构”,“字段”:{“类型”:“int32”,“可选”:false,“transaction_id”},{"type":"int64",“可选”:false,"field":"account_no"},{"type":"int32",“可选”:true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{“类型”:“字符串”、“可选”:真、“字段”:“transaction_details”}、{“类型”:“int32”、“可选”:真、“名称”:“io.debezium.time.Date”、“版本”:1、“字段”:“value_date”}、{“类型”:“字节”、“可选”:true、io.debezium.time.Date "version":1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“withdrawal_amt”},{“类型”:“字节”,“可选”:true,“connect.decimal.precision”版本:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“deposit_amt”},{“类型”:“字节”,“可选”:真,"name":"org.apache.kafka.connect.data.Decimal",“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“balance_amt”},“可选”:真,"name":"main.test.mysql.main.test_bank_data.Value",“字段”:“之后”,{“类型”:“结构”、“字段”:{“类型”:“字符串”、“可选”:false、“int64”}、{"type":"string“、”可选“:false、”field“:”连接器“}、{”type“:”ts_ms“}、”false“、"field":"name"}、{ts_ms}}。{“类型”:“字符串”,“可选”:真,“名称”:“io.debezium.data.Enum”,“版本”:1,“参数”:{“允许”:“真,最后,假”},“默认”:“假”,“字段”:“快照”},{“类型”:“字符串”,“可选”:假,“字段”:“db”},{“类型”:“字符串”,“可选”:真,“字段”:“表”}{“int64”:“int64”、“可选”:false、“server_id”}、{“type”:“server_id”、“server_id”}、{“type”:“gtid”}、{"type":"int64“、”int64“。“字段”:“行”}、{“类型”:“int64”、“可选”:真、“字段”:“线程”}、{“类型”:“字符串”、“可选”:真、“字段”:“查询”}、“可选”:假、“名称”:“io.debezium.connector.mysql.Source”、“字段”:“源”}、{“类型”:“字符串”、“可选”:false、"field":"op"}{“类型”:“int64”、“可选”:真、“ts_ms”}、{“类型”:“结构”、“字段”:{“类型”:“字符串”、“可选”:假、“字段”:“id”}、{"type":"int64“、”可选“:false、"field":"total_order"}、{”int64“}、”int64“、”可选“:false、”data_collection_order“}“可选”:真,“字段”:“事务”},“可选”:false,"name":"main.test.mysql.main.test_bank_data.Envelope"},“有效载荷”:{“之前”:null,“后面”{“transaction_id”:146,"account_no":409000611076,"transaction_date":18652,“transaction_details”:“来自印度法医服务的TRF”,"value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},“源文件”:{“版本”:“1.4.0-快照”,“连接器”:“mysql”,"name":"main.test.mysql","ts_ms":1611587463000,“快照”:“false”,“false”:“main”,"table":"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,“行”:0,“线程”:1604558,“查询”:null},"op":"c","ts_ms":1611587463181,“事务”:null}
从现在开始,我已经在DF1上转换成字符串,并将部分的有效载荷转换为DF2。
-最后的工作条件注释-在Kafka连接侧的Debezium MySQL连接器中转换SMT后,我将得到PySpark结构化流中的消息值,如下面所示
Value =
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08-
229","transaction_details":"INDO GIBL Indiaforensic STL13071
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}
message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)
]
)发布于 2021-01-26 11:49:59
您可以将字符串JSON消息的架构传递给from_json函数。
收到这样的留言:
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+您可以修改代码,将json中的after字段解析为MapType,然后选择所需的键作为列:
message_schema = StructType([
StructField('before', MapType(StringType(), StringType(), True), True),
StructField('after', MapType(StringType(), StringType(), True), True),
StructField('source', MapType(StringType(), StringType(), True), True),
StructField('op', StringType(), True),
StructField('ts_ms', StringType(), True),
StructField('transaction', StringType(), True)
]
)
after_fields = [
"account_no", "balance_amt", "deposit_amt", "transaction_date",
"transaction_details", "transaction_id", "value_date", "withdrawal_amt"
]
# parse json strings using from_json and select message.after.*
kafkaStreamDF.withColumn(
"message",
F.from_json(F.col("value"), message_schema)
).select(
*[F.col("message.after").getItem(f).alias(f) for f in after_fields]
).writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination() https://stackoverflow.com/questions/65899471
复制相似问题