首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Shopify Sarama处理Kafka错误

使用Shopify Sarama处理Kafka错误
EN

Stack Overflow用户
提问于 2015-04-01 18:24:30
回答 1查看 3K关注 0票数 3

因此,我试图将Kafka用于我的应用程序,该应用程序有一个生产者将操作记录到Kafka MQ中,而使用者将它从我的应用程序在Go中的MQ.Since中读取出来,我正在使用Shopify Sarama来实现这一点。

现在,我能够读取MQ并使用

代码语言:javascript
复制
fmt.Printf

但是,我真的希望错误处理比控制台打印更好,我愿意多走一步。

用于消费者连接的当前代码:

代码语言:javascript
复制
mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // Don't want to panic when error occurs, instead handle it
}

和信息的处理:

代码语言:javascript
复制
    go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
    }
}()

我的问题(我不熟悉测试卡夫卡,也不熟悉卡夫卡):

  1. 在上面的程序中哪里会出现错误,这样我就可以处理它们了?任何示例代码对我来说都是很好的开始。我可以想到的错误条件是,msgContent实际上并不包含JSON中的任何类型的ContentId字段。
  2. 在kafka中,是否存在消费者试图在当前偏移量下阅读的情况,但由于某种原因(即使JSON格式良好)?我的使用者是否有可能回溯到失败的偏移读和重新处理偏移量之上的x步?还是有更好的方法来做这件事?再说一遍,这些情况会是什么?

我愿意阅读和尝试一些事情。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-04-02 02:58:12

关于1)检查我在下面记录错误消息的位置。这或多或少是我会做的事。

关于2)我不知道如何尝试在一个话题上倒退。这是非常可能的,只要创造一个消费者一遍又一遍,它的起始偏移量减去一次。但我不建议,因为很有可能你最终会一遍又一遍地重复同样的信息。我建议经常保存你的胶印,这样你就能在事情恶化的时候恢复过来。

下面是我认为能解决你大部分问题的代码。我还没试过编译这个。最近,sarama api也发生了变化,因此api目前可能有一些不同。

代码语言:javascript
复制
func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {
    wg.Add(1)
    go func(){
        defer wg.Done()
        //to track the last known good offset we processed, which is 
        // updated after each successfully processed event. 
        saveprogress := func(off int64){
            //Save the offset somewhere...a file... 
            //Ive also used kafka to store progress 
            //using a special topic as a WAL
        }
        defer saveprogress(lastgoodoffset)

        client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())
        if err != nil {
            log.Error(err)
            return
        }
        defer client.Close()
        sarama.NewConsumerConfig()
        consumerConfig.OffsetMethod = sarama.OffsetMethodManual
        consumerConfig.OffsetValue = int64(lastgoodoff)
        consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)
        if err != nil {
            log.Error(err)
            return
        }
        defer consumer.Close()
        for {
            select {
            case event := <-consumer.Events():
                if event.Err != nil {
                    log.Error(event.Err)
                    return
                }
                msgContent := &Message{}
                err = json.Unmarshal(message.Value, msgContent)
                if err != nil {
                    log.Error(err)
                    continue //continue to skip this message or return to stop without updating the offset.
                }
                // Send the message on to be processed.
                out <- msgContent 

                lastgoodoff = event.Offset
            }
        }
    }()
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29398051

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档