首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在kafka生成器中,Json列作为键,并在键的基础上在不同的分区中推送。

在kafka生成器中,Json列作为键,并在键的基础上在不同的分区中推送。
EN

Stack Overflow用户
提问于 2020-12-22 14:12:41
回答 1查看 321关注 0票数 1

正如我们所知道的,我们可以向kafka生产者发送一个内部散列的密钥,以查找主题数据中的哪个分区。我有一个生产者,在那里我用JSON格式发送数据。

代码语言:javascript
复制
[
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 07:50:42",
    "TIME": 75042,
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:02:26",
    "TIME": 80226
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:39:55",
    "TIME": 83955
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:43:26",
    "TIME": 84326
},
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:44:22",
    "TIME": 84422
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:09",
    "TIME": 84509
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
          },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  }
]

我想在主题中推送数据,但基于键(DEVICEID)进行不同的分区。我已经创建了两个分区0 &1的主题,但是它存储了分区-0中的所有数据。我希望所有唯一的密钥(DeviceID)存储在不同的分区中。代码:

代码语言:javascript
复制
object Producer extends App{
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
    val producer = new KafkaProducer[String,JsonNode](props)
    println("inside prducer")
    val mapper = (new ObjectMapper() with ScalaObjectMapper).
        registerModule(DefaultScalaModule).
        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
        findAndRegisterModules(). // register joda and java-time modules automatically
        asInstanceOf[ObjectMapper with ScalaObjectMapper] 
     val filename = "/Users/rishunigam/Documents/devicd.json"
     val jsonNode: JsonNode=  mapper.readTree(new File(filename))
     val s = jsonNode.size()
     for(i <- 0 to jsonNode.size()-1) {
     val js = jsonNode.get(i)
       val keys = jsonNode.get(i).findValue("DEVICEID").toString
       println(keys)
       println(js)
     val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
   println(record)
  producer.send(record)
}
    println("producer complete")
    producer.close()
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-22 19:41:28

将所有数据存储在分区-0中。

但这并不意味着它不起作用。只意味着密钥的散列最终在同一个分区中结束。

如果您想要覆盖默认的分区处理程序,则需要定义自己的分区处理程序类来解析消息并分配适当的分区,然后在生产者属性中设置partitioner.class

我希望所有唯一的密钥(DeviceID)都存储在不同的分区中

然后,您必须提前知道您的竞争数据集,以便为N个设备创建N个分区。当你添加一个全新的设备时会发生什么呢?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65410209

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档