首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >pyspark streaming DStreams to kafka主题

pyspark streaming DStreams to kafka主题
EN

Stack Overflow用户
提问于 2018-06-08 10:27:28
回答 2查看 229关注 0票数 0

简单地说,有没有可能将DStream流式传输到Kafka主题?

我有火花流作业,它做了所有的数据处理,现在我想把数据推送到卡夫卡主题。在pyspark中可以做到这一点吗?

EN

回答 2

Stack Overflow用户

发布于 2018-06-11 14:34:31

最好在写入kafka之前转换为json,否则请指定要写入kafka的键列和值列。

代码语言:javascript
复制
    query = jdf.selectExpr("to_json(struct(*)) AS value")\
  .writeStream\
  .format("kafka")\
  .option("zookeeper.connect", "localhost:2181")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("topic", "test-spark")\
  .option("checkpointLocation", "/root/")\
  .outputMode("append")\
  .start()
票数 0
EN

Stack Overflow用户

发布于 2019-01-19 01:41:08

如果您的消息是AVRO格式的,我们可以直接在kafka中插入和写入消息。

代码语言:javascript
复制
from pyspark import SparkConf, SparkContext
from kafka import KafkaProducer
from kafka.errors import KafkaError
from pyspark.sql import SQLContext, SparkSession

    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    import json
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer
    from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
    import avro.schema
    from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
    import pandas as pd


    ssc = StreamingContext(sc, 2)
    ssc = StreamingContext(sc, 2)
    topic = "test"
    brokers = "localhost:9092"
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)
    def handler(message):
        records = message.collect()
        for record in records:
             <Data processing whatever you want and creating the var_val_value,var_val_key pair >


               var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url} 
               avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
               avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
               avroProducer.flush()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50752504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档