首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark --使用Kakfa消息创建

PySpark --使用Kakfa消息创建
EN

Stack Overflow用户
提问于 2021-01-26 10:18:53
回答 1查看 379关注 0票数 1

我正在使用火花放电结构的流和读取数据从卡夫卡主题,这是Json复杂的格式。

我使用Spark Structred流格式作为Kafka格式,代码如下-

代码语言:javascript
复制
spark = SparkSession.builder \
        .appName("PythonSparkStreamingKafka") \
        .getOrCreate()

kafkaStreamDF = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "main.test.mysql.main.test_bank_data") \
            .option("startingOffsets", "earliest") \
            .load()

kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")

message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))

consoleOutput = kafkaStreamDF2.writeStream \
                .outputMode("append") \
                .format("console") \
                .option("truncate", "false") \
                .start()

我从消息中提取数据,直到kafka json消息的Payload部分,并将其输出到控制台,如下所示-

代码语言:javascript
复制
|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

现在,我想提取后部分的数据,并以下面的方式读取数据格式的归档数据-

代码语言:javascript
复制
transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt|   balance_amt

20              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL12071 |  16/08/2020  |   129000.00      |    (null)      | 7320950.00

21              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL13071 |  16/08/2020  |   230013.00      |    (null)      | 7090937.00

请建议我如何实现这个预期的输出在一个电火花数据?

加在卡夫卡信息的确切价值下面-

{“模式”:{“类型”:“结构”,“字段”:[{“类型”:“结构”,“字段”:{“类型”:“int32”,“可选”:“transaction_id”},{“类型”:“int64”,“可选”:false,“字段”:“account_no”}},{“类型”:“int32”,“可选”:真,“名称”:“io.debezium.time.Date”,“版本”:1,“transaction_date”},{“transaction_date”},{“类型”:“string”,“可选”:true,“transaction_details”},{“transaction_details”},{“transaction_details”:“int32”,“可选”:真,“名称”:“io.debezium.time.Date”,“版本”:1,“字段”:“value_date”},{“类型”:“字节”,“可选”:true,“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“withdrawal_amt”},{“类型”:“字节”,“可选”:true,“connect.decimal.precision”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“deposit_amt”},{“类型”:“字节”,“可选”:真,"name":"org.apache.kafka.connect.data.Decimal",“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“balance_amt”},“可选”:真,"name":"main.test.mysql.main.test_bank_data.Value",“字段”:“前面”},{“类型”:“结构”,“字段”:{“类型”:“int32”,“可选”:false,“transaction_id”},{"type":"int64",“可选”:false,"field":"account_no"},{"type":"int32",“可选”:true,"name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{“类型”:“字符串”、“可选”:真、“字段”:“transaction_details”}、{“类型”:“int32”、“可选”:真、“名称”:“io.debezium.time.Date”、“版本”:1、“字段”:“value_date”}、{“类型”:“字节”、“可选”:true、io.debezium.time.Date "version":1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“withdrawal_amt”},{“类型”:“字节”,“可选”:true,“connect.decimal.precision”版本:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“deposit_amt”},{“类型”:“字节”,“可选”:真,"name":"org.apache.kafka.connect.data.Decimal",“版本”:1,“参数”:{“缩放”:“2”,"connect.decimal.precision":"12"},“字段”:“balance_amt”},“可选”:真,"name":"main.test.mysql.main.test_bank_data.Value",“字段”:“之后”,{“类型”:“结构”、“字段”:{“类型”:“字符串”、“可选”:false、“int64”}、{"type":"string“、”可选“:false、”field“:”连接器“}、{”type“:”ts_ms“}、”false“、"field":"name"}、{ts_ms}}。{“类型”:“字符串”,“可选”:真,“名称”:“io.debezium.data.Enum”,“版本”:1,“参数”:{“允许”:“真,最后,假”},“默认”:“假”,“字段”:“快照”},{“类型”:“字符串”,“可选”:假,“字段”:“db”},{“类型”:“字符串”,“可选”:真,“字段”:“表”}{“int64”:“int64”、“可选”:false、“server_id”}、{“type”:“server_id”、“server_id”}、{“type”:“gtid”}、{"type":"int64“、”int64“。“字段”:“行”}、{“类型”:“int64”、“可选”:真、“字段”:“线程”}、{“类型”:“字符串”、“可选”:真、“字段”:“查询”}、“可选”:假、“名称”:“io.debezium.connector.mysql.Source”、“字段”:“源”}、{“类型”:“字符串”、“可选”:false、"field":"op"}{“类型”:“int64”、“可选”:真、“ts_ms”}、{“类型”:“结构”、“字段”:{“类型”:“字符串”、“可选”:假、“字段”:“id”}、{"type":"int64“、”可选“:false、"field":"total_order"}、{”int64“}、”int64“、”可选“:false、”data_collection_order“}“可选”:真,“字段”:“事务”},“可选”:false,"name":"main.test.mysql.main.test_bank_data.Envelope"},“有效载荷”:{“之前”:null,“后面”{“transaction_id”:146,"account_no":409000611076,"transaction_date":18652,“transaction_details”:“来自印度法医服务的TRF”,"value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},“源文件”:{“版本”:“1.4.0-快照”,“连接器”:“mysql”,"name":"main.test.mysql","ts_ms":1611587463000,“快照”:“false”,“false”:“main”,"table":"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,“行”:0,“线程”:1604558,“查询”:null},"op":"c","ts_ms":1611587463181,“事务”:null}

从现在开始,我已经在DF1上转换成字符串,并将部分的有效载荷转换为DF2。

-最后的工作条件注释-在Kafka连接侧的Debezium MySQL连接器中转换SMT后,我将得到PySpark结构化流中的消息值,如下面所示

代码语言:javascript
复制
Value = 
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08- 
229","transaction_details":"INDO GIBL Indiaforensic STL13071 
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}

message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)   
]
)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-26 11:49:59

您可以将字符串JSON消息的架构传递给from_json函数。

收到这样的留言:

代码语言:javascript
复制
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

您可以修改代码,将json中的after字段解析为MapType,然后选择所需的键作为列:

代码语言:javascript
复制
message_schema = StructType([
     StructField('before', MapType(StringType(), StringType(), True), True),
     StructField('after', MapType(StringType(), StringType(), True), True),
     StructField('source', MapType(StringType(), StringType(), True), True),
     StructField('op', StringType(), True),
     StructField('ts_ms', StringType(), True),
     StructField('transaction', StringType(), True)
     ]
)

after_fields = [
    "account_no", "balance_amt", "deposit_amt", "transaction_date",
    "transaction_details", "transaction_id", "value_date", "withdrawal_amt"
]

# parse json strings using from_json and select message.after.*
 kafkaStreamDF.withColumn(
     "message",
     F.from_json(F.col("value"), message_schema)
 ).select(
     *[F.col("message.after").getItem(f).alias(f) for f in after_fields]
 ).writeStream \
  .outputMode("append") \
  .format("console") \
  .option("truncate", "false") \
  .start() \
  .awaitTermination()   
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65899471

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档