首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >以cloudevents格式向Kafka topic发送数据

以cloudevents格式向Kafka topic发送数据
EN

Stack Overflow用户
提问于 2021-09-21 13:04:19
回答 1查看 315关注 0票数 1

现在我有了这段代码,它工作得很好。(向Kafka topic发送一些json格式的数据)

代码语言:javascript
复制
j, err := json.Marshal(data)
if err != nil {
    log.Fatal(err)
}
msg := &sarama.ProducerMessage{
    Topic: tName,
    Value: sarama.StringEncoder(j),
}

_, _, err = producer.SendMessage(msg)

但是有人希望ho拥有cloudevents格式的数据。我应该怎么做,因为这个事件结构不能直接转换成->。

代码语言:javascript
复制
type Event struct {
    Context     EventContext
    DataEncoded []byte
    // DataBase64 indicates if the event, when serialized, represents
    // the data field using the base64 encoding.
    // In v0.3, this field is superseded by DataContentEncoding
    DataBase64  bool
    FieldErrors map[string]error
}

所以这段代码甚至不能编译。

代码语言:javascript
复制
j, err := json.Marshal(data)
if err != nil {
    log.Fatal(err)
}

//...

event := cloudevents.NewEvent()
event.SetSource("example/uri") 
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, j)

producerMsg := &sarama.ProducerMessage{
    Topic: s.outputTopic,
    Value: sarama.StringEncoder(event),
}
_, _, err = s.producer.SendMessage(producerMsg)

如何将此事件发送到Kafka?尝试将event.DataEncoded转换为字符串或类似的内容?顺便说一句。编程语言是golang。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-21 13:33:12

你看到文档中序列化事件的部分了吗?

https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent

代码语言:javascript
复制
event := cloudevents.NewEvent()
event.SetSource("example/uri") 
event.SetType("example.type") 
// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above 
event.SetData(cloudevents.ApplicationJSON, data)

bytes, err := json.Marshal(event)
if err != nil {
  log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
    Topic: s.outputTopic,
    Value: bytes,  // you've already encoded the event 
}

否则,请务必查看所提供的使用CloudEvent客户端https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go的示例代码

代码语言:javascript
复制
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
    log.Fatalf("failed to create protocol: %s", err.Error())
}

defer sender.Close(context.Background())

c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
    log.Fatalf("failed to create client, %v", err)
}

event := cloudevents.NewEvent() 
event.Set... 

c.Send(..., event)

... 
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69269403

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档