简单地说,有没有可能将DStream流式传输到Kafka主题?
我有火花流作业,它做了所有的数据处理,现在我想把数据推送到卡夫卡主题。在pyspark中可以做到这一点吗?
发布于 2018-06-11 14:34:31
最好在写入kafka之前转换为json,否则请指定要写入kafka的键列和值列。
query = jdf.selectExpr("to_json(struct(*)) AS value")\
.writeStream\
.format("kafka")\
.option("zookeeper.connect", "localhost:2181")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("topic", "test-spark")\
.option("checkpointLocation", "/root/")\
.outputMode("append")\
.start()发布于 2019-01-19 01:41:08
如果您的消息是AVRO格式的,我们可以直接在kafka中插入和写入消息。
from pyspark import SparkConf, SparkContext
from kafka import KafkaProducer
from kafka.errors import KafkaError
from pyspark.sql import SQLContext, SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
import avro.schema
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
import pandas as pd
ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(handler)
def handler(message):
records = message.collect()
for record in records:
<Data processing whatever you want and creating the var_val_value,var_val_key pair >
var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url}
avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
avroProducer.flush()https://stackoverflow.com/questions/50752504
复制相似问题