首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >弹性搜索- Kafka连接没有发布正确的键值作为文档id

弹性搜索- Kafka连接没有发布正确的键值作为文档id
EN

Stack Overflow用户
提问于 2020-06-10 08:18:46
回答 2查看 893关注 0票数 0

我试图发布数据从卡夫卡主题到弹性搜索使用卡夫卡连接。下面是我的配置。

代码语言:javascript
复制
{
  "name": "elasticsearch_sink_19",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "k-connect-status",
    "connection.url": "http://docker.for.mac.host.internal:9200",
    "type.name": "connectstatus",
    "behavior.on.malformed.documents": "ignore",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter", 
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "validate.non.null": "false",
    "key.ignore":"true",
    "schema.ignore":"true",
    "value.converter.schemas.enable": "false"
  }
}

"key.ignore“true是用错误的_id (不期望的_id)发布数据。

来自主题的数据:

格式:JSON

代码语言:javascript
复制
{
  "ROWTIME":1591743374742,
  "ROWKEY":"status-connector-elasticsearch_sink_31",
  "state":"RUNNING",
  "trace":null,
  "worker_id":"connect:8083",
  "generation":2
}

示例弹性搜索输出:

代码语言:javascript
复制
{
  "_index" : "k-connect-status",
  "_type" : "connectstatus",
  "_id" : "k-connect-status+1+17",
  "_score" : 1.0,
  "_source" : {
    "generation" : 11,
    "trace" : null,
    "state" : "UNASSIGNED",
    "worker_id" : "connect:8083"
  }
}

期望弹性搜索输出

代码语言:javascript
复制
{
  "_index" : "k-connect-status",
  "_type" : "connectstatus",
  "_id" : "status-connector-elasticsearch_sink_31",
  "_score" : 1.0,
  "_source" : {
    "generation" : 11,
    "trace" : null,
    "state" : "UNASSIGNED",
    "worker_id" : "connect:8083"
  }
}

ROWKEY作为弹性搜索的_id。至少带有主题名称的ROWKEY。

"key.ignore“false不发布任何数据。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-06-10 10:05:28

"key.ignore":"true",正在做它应该做的事情。它忽略了Kafka消息的键,而是使用了topic+partition+offset的元组,在引用的Elasticsearch输出中可以看到这一点:

代码语言:javascript
复制
  "_id" : "k-connect-status+1+17",

如果要使用Kafka消息的键,则需要设置"key.ignore":"false"

请参阅本教程以了解更多信息,并查看有关键视频 / 代码的说明。

票数 2
EN

Stack Overflow用户

发布于 2020-06-10 10:10:59

默认情况下,Elasticsearch Sink连接器通过连接创建文档_id :主题、分区和偏移量。它发生在你的坐席上。您应该在key.ignore上设置false并传递正确的密钥。

如果您需要从消息值中提取一些信息并将其放入键中,则应该使用适当的转型。我想你可以试试ValueToKey

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62298918

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档