kafka作为消息队列,go消费数据常用的库,有sarama包和其扩展sarama-cluster。 https://github.com/Shopify/sarama https://github.com/bsm/sarama-cluster 问题 最近遇到消费出现问题,报错: consumer/broker FetchRequest: kafka: error decoding packet: message of length 115804741 too large or too small 定位 找到sarama The // global `sarama.MaxResponseSize` still applies. 修复 sarama调用改为v3协议。但是运行一段时间后,还是出现了大于100M,但是只是略微大几十个字节。
概述 sarama 是一个纯 Go 客户端库,用于处理 Apache Kafka(0.8 及更高版本)。 开源包:https://github.com/Shopify/sarama 文档地址:https://pkg.go.dev/github.com/shopify/sarama 闲话少叙,上示例 package string{"localhost:9092"}, sarama.NewConfig()) if err ! = nil { log.Fatalln(err) } }() msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder https://github.com/Shopify/sarama 目录 上一节: 下一节:
第一版配置 在sarama的配置中,很自然的想起了带Timeout和Retry的配置。 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max 我决定把sarama的代码clone下来,加点日志。看看它的执行流程。 go.mod ... require( .... ) replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama
Sarama Go客户端存在以下已知问题: 当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。 当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。 当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息 解决方案 建议尽早将Sarama Go客户端替换为Confluent Go客户端。 看到这句话我就犯嘀咕了,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。 // Wait for session exit signal <-sess.ctx.Done() 1,获取元数据 元数据的同步可以参考我前面的分享golang源码分析:sarama
本文我们只介绍 Apache Kafka 的 Golang 客户端库 Sarama。Sarama 是 MIT 许可的 Apache Kafka 0.8 及更高版本的 Golang 客户端库。 , value sarama.Encoder) { producer, err := sarama.NewSyncProducer(brokerAddr, config) if err ! 除此之外,Sarama 库还提供了很多其它 Api,感兴趣的读者朋友可以阅读官方文档了解更多。 参考资料: https://github.com/Shopify/sarama https://shopify.github.io/sarama/ https://pkg.go.dev/github.com /Shopify/sarama https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions https://kafka.apache.org
这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始: package main import ( "fmt" "log" "sync" "github.com/Shopify/sarama" ) // 消费者练习 func main() { // 生成消费者 实例 consumer, err := sarama.NewConsumer nil } func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim () config.Consumer.Return.Errors = false config.Version = sarama.V0_10_2_0 client, err := sarama.NewClient = nil { panic(err) } group1, err := sarama.NewConsumerGroupFromClient("c1", client) if err
https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。 说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分: examples mocks tools //基于客户端,实现的kafka客户端工具 tools " ) func main() { // 构建 生产者 // 生成 生产者配置文件 config := sarama.NewConfig() // 设置生产者 消息 回复等级 0 1 all config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll //kafka server: Replication-factor // 构建 消息 msg := &sarama.ProducerMessage{} msg.Topic = "test" msg.Value = sarama.StringEncoder
理解client的角色对我们理解kafka和sarama非常有帮助。 child *partitionConsumer) preferredBroker() (*Broker, error) 上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama
[ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is your
使用 Golang IBM/sarama 在 Kafka 主题上消费新添加的分区中的事件。 我们将使用Golang作为编程语言,并使用IBM/sarama作为Kafka库。 我将使用Sarama的源代码注释来解释负载均衡策略。 轮询 此策略返回轮询负载均衡策略,该策略以交替顺序将分区分配给成员。 {"localhost:9092"} topic := "example-topic" config := sarama.NewConfig() config.Version = sarama.V3 , err := sarama.NewSyncProducer(brokers, config) if err !
. package main import ( "fmt" "log" "time" "github.com/IBM/sarama" ) func main() { // 配置生产者 config := sarama.NewConfig() config.Producer.Return.Successes = true { Topic: "test-topic", Key: sarama.StringEncoder("message-key"), Value: sarama.StringEncoder returnnil } func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim // 创建消费者组 consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err !
最近在弄golang框架的事情,连接kafka,目前采用的是sarama进行连接,开发测试是ok的,但是考虑到在生产环境中使用。 sarama.SyncProducer consumer sarama.Consumer consumerGroup sarama.ConsumerGroup } func var kafkaAsyncProducer sarama.AsyncProducer var kafkaConsumer sarama.Consumer func init() { brokerListStr 为了解决这个问题,通过查询资料和网上的相关内容发现sarama有一个cluster,已经解决了这个问题的。 " cluster "github.com/bsm/sarama-cluster" "strings" ) type KafkaClient struct { asyncProducer sarama.AsyncProducer
下面我们来一起看一下如何使用sarama包来解决这些问题。 config := sarama.NewConfig() 2. 1. consumerConfig := sarama.NewConfig() 2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig. 3. 6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim nil } 5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim
("fmt""log""time""github.com/Shopify/sarama")funcmain(){//1️⃣Kafka配置config:=sarama.NewConfig()config.Producer.Return.Successes ,error){varproducers[]sarama.SyncProducerfori:=0;i<poolSize;i++{config:=sarama.NewConfig()config.Producer.Return.Successes =trueproducer,err:=sarama.NewSyncProducer(brokers,config)iferr! {Topic:topic,Key:sarama.StringEncoder(userID),//相同userID=相同分区Value:sarama.StringEncoder(payload),}原理: 用Go和Sarama库实现Kafka,可以获得高度的灵活性和性能。关键是理解基本概念并遵循经过验证的最佳实践。
/config/server.properties 操作kafka需要安装一个包:go get github.com/Shopify/sarama 写一个简单的代码,通过go调用往kafka里扔数据: package main import ( "github.com/Shopify/sarama" "fmt" ) func main() { config := sarama.NewConfig () config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config) if err ! () config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner
使用golang创建同步消息生产者package mainimport ("fmt""github.com/Shopify/sarama""log""time")var address = []string = trueconfig.Producer.Timeout = 5 * time.Secondproducer, err := sarama.NewSyncProducer(address, config {Topic: "topic1", // 主题名称Value: sarama.ByteEncoder(value), // 消息内容}// 发送消息part, offset ""log""time")var address = []string{"192.168.10.232:9092"}func main() {// 配置config := sarama.NewConfig mainimport ("fmt""os""os/signal"cluster "github.com/bsm/sarama-cluster")func main() {// 配置config :=
看上去似乎 kafka-go 最好,confluent-kafka-go 次之,sarama 最烂,可是当我问一个鹅厂小伙伴的时候,他说他们都用 sarama,信大厂得永生,于是乎我也决定选 sarama 但是不管怎么说,使用 sarama 的案例相对更多,用起来也更安心些,不过用之前要清楚坑在哪: Golang中如何正确的使用sarama包操作Kafka? 为什么不推荐使用Sarama Go客户端收发消息? Sarama 的版本 一开始用 sarama 的时候,就遭到了当头棒喝,遇到了如下错误: ERROR: Failed to open Kafka producer: kafka: client has go.mod 文件中: replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.27.0 多个 goroutines 的协同
关于go的client,官方推荐有个如下几个: https://github.com/Shopify/sarama https://github.com/stealthly/go_kafka_client /stealthly/siesta https://github.com/optiopay/kafka https://github.com/nuance/kafka 我这里使用的就是官方推荐的第一个sarama sarama.SyncProducer func InitKafkaProducer(addressList string) { var err error mqConfig := sarama.NewConfig () // 设置producer // 发送完数据需要leader和follow都确认 mqConfig.Producer.RequiredAcks = sarama.WaitForAll 1 kafkaClient, err := sarama.NewClient(strings.Split(addressList, ","), mqConfig) if err !
local kafka_data: driver: local 推数据测试 package main import ( "fmt" "github.com/Shopify/sarama " ) func main() { //kafka配置项 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner {} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") defer client.Close } fmt.Printf("pid:%v offset:%v\n", pid, offset) } 拉数据测试 func consumer() { consumer, err := sarama.NewConsumer
我用的是shopify出的sarama,依赖如下github.com/Shopify/sarama v1.38.1。在搜资料的过程中,还发现有使用其他客户端的,选择挺多。 Kafka配置 Sarama框架中的生产者和消费者的配置类是一个,不太清楚这么设计的意图,两个配置重合度并不高,在Sarama中也是分开配置,但使用了同一个配置类。 = true config.Producer.RequiredAcks = sarama.NoResponse config.Producer.Compression = sarama.CompressionLZ4 = nil { log.Fatal(err) return } }() // 定义需要发送的消息 headers := []sarama.RecordHeader{sarama.RecordHeader Topic: "topic_test", Key: sarama.StringEncoder("test"), Value: sarama.StringEncoder("ddddddddddddddddd