首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据转换后,将kafka流数据帧保存到Databricks中的Redis

数据转换后,将kafka流数据帧保存到Databricks中的Redis
EN

Stack Overflow用户
提问于 2021-02-27 03:51:48
回答 1查看 276关注 0票数 0

在对数据执行聚合后,我使用pyspark将kafka流定向到redis。最终输出是一个流数据名。

我连接到kafka streams的代码。(您可能会发现我的代码是外行工作,请忽略)

代码语言:javascript
复制
app_schema = StructType([
        StructField("applicationId",StringType(),True),
        StructField("applicationTimeStamp",StringType(),True)
    ])

# group_id = "mygroup"
topic = "com.mobile-v1"
bootstrap_servers = "server-1:9093,server-2:9093,server-3:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user@stream.com" password="xxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_mobile_apps_df = spark.readStream.format("kafka").options(**options).options().load()

kafka_mobile_apps_df = kafka_mobile_apps_df\
    .select(from_json(col("value").cast("string"), app_schema).alias("mob_apps"))

由于订阅到代理,这给了我流数据帧。之后,我将数据聚合到count_df,如下所示

代码语言:javascript
复制
count_df = kafka_mobile_apps_df.withColumn("diff_days", ((col("TimeStamp_")) - (col("TimeStamp")))/(60.0*60.0*24))\
                            .withColumn("within_7d_ind", when(col("diff_days") < 7.0, 1).otherwise(0))\
                            .groupBy("_applicationId")
                            .agg(sum(col("within_7d_ind")).alias(feature+"_7day_velocity"))

现在,我正在尝试将这个count_df流写入redis。在我的研究之后,我发现我可以使用" spark-redis _2.11“来连接spark-redis。

我不知道scala,我在scala中找到了一个spark-redis github示例。有没有人能帮个忙,用pyspark写这个count_df到redis的认证的确切方法是什么?

请查找spark-redis github here

我已经在集群上安装了所需的jar "com.redislabs:spark-redis_2.12:2.5.0“。

谢谢。

刚刚发现他们还不支持python,请让我知道有没有其他方法来写这个?

EN

回答 1

Stack Overflow用户

发布于 2021-07-02 10:13:56

你应该这样做,我已经在这里回答了这个问题,https://stackoverflow.com/a/68218806/2986344

更多有用的链接:https://github.com/RedisLabs/spark-redis/issues/307

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66392010

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档