首页
学习
活动
专区
圈层
工具
发布
    • 综合排序
    • 最热优先
    • 最新优先
    时间不限
  • 来自专栏技术交流学习

    sarama消费kafka协议版本问题

    kafka作为消息队列,go消费数据常用的库,有sarama包和其扩展sarama-cluster。 https://github.com/Shopify/sarama https://github.com/bsm/sarama-cluster 问题 最近遇到消费出现问题,报错: consumer/broker FetchRequest: kafka: error decoding packet: message of length 115804741 too large or too small 定位 找到sarama The // global `sarama.MaxResponseSize` still applies. 修复 sarama调用改为v3协议。但是运行一段时间后,还是出现了大于100M,但是只是略微大几十个字节。

    5.2K10发布于 2020-07-07
  • 来自专栏golang开发笔记

    Go 操作kafka包sarama

    概述 sarama 是一个纯 Go 客户端库,用于处理 Apache Kafka(0.8 及更高版本)。 开源包:https://github.com/Shopify/sarama 文档地址:https://pkg.go.dev/github.com/shopify/sarama 闲话少叙,上示例 package string{"localhost:9092"}, sarama.NewConfig()) if err ! = nil { log.Fatalln(err) } }() msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder https://github.com/Shopify/sarama 目录 上一节: 下一节:

    4.7K20编辑于 2022-03-19
  • 来自专栏大猪的笔记

    go sarama拾遗:有趣的超时

    第一版配置 在sarama的配置中,很自然的想起了带Timeout和Retry的配置。 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max 我决定把sarama的代码clone下来,加点日志。看看它的执行流程。 go.mod ... require( .... ) replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama

    4.5K40发布于 2020-11-19
  • 来自专栏golang算法架构leetcode技术php

    golang源码分析:sarama kafka client(part IV:reblance)

    Sarama Go客户端存在以下已知问题: 当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。 当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。 当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息 解决方案 建议尽早将Sarama Go客户端替换为Confluent Go客户端。 看到这句话我就犯嘀咕了,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。 // Wait for session exit signal <-sess.ctx.Done() 1,获取元数据 元数据的同步可以参考我前面的分享golang源码分析:sarama

    1K10编辑于 2022-08-03
  • 来自专栏Golang语言开发栈

    Golang 语言中 kafka 客户端库 sarama

    本文我们只介绍 Apache Kafka 的 Golang 客户端库 SaramaSarama 是 MIT 许可的 Apache Kafka 0.8 及更高版本的 Golang 客户端库。 , value sarama.Encoder) { producer, err := sarama.NewSyncProducer(brokerAddr, config) if err ! 除此之外,Sarama 库还提供了很多其它 Api,感兴趣的读者朋友可以阅读官方文档了解更多。 参考资料: https://github.com/Shopify/sarama https://shopify.github.io/sarama/ https://pkg.go.dev/github.com /Shopify/sarama https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions https://kafka.apache.org

    7.3K30发布于 2021-05-13
  • 来自专栏golang算法架构leetcode技术php

    golang源码分析:sarama kafka client(part II:消费者)

    这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始: package main import ( "fmt" "log" "sync" "github.com/Shopify/sarama" ) // 消费者练习 func main() { // 生成消费者 实例 consumer, err := sarama.NewConsumer nil } func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim () config.Consumer.Return.Errors = false config.Version = sarama.V0_10_2_0 client, err := sarama.NewClient = nil { panic(err) } group1, err := sarama.NewConsumerGroupFromClient("c1", client) if err

    1.1K20编辑于 2022-08-02
  • 来自专栏golang算法架构leetcode技术php

    golang源码分析:sarama kafka client(part I:生产者)

    https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。 说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分: examples mocks tools //基于客户端,实现的kafka客户端工具 tools " ) func main() { // 构建 生产者 // 生成 生产者配置文件 config := sarama.NewConfig() // 设置生产者 消息 回复等级 0 1 all config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll //kafka server: Replication-factor // 构建 消息 msg := &sarama.ProducerMessage{} msg.Topic = "test" msg.Value = sarama.StringEncoder

    76210编辑于 2022-08-02
  • 来自专栏golang算法架构leetcode技术php

    golang源码分析:sarama kafka client(part III:client的角色)

    理解client的角色对我们理解kafka和sarama非常有帮助。 child *partitionConsumer) preferredBroker() (*Broker, error) 上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama

    43520编辑于 2022-08-02
  • 来自专栏Cloud Native 云原生自习室

    sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is y

    [ERROR] sarama.NewSyncProducer error:kafka: client has run out of available brokers to talk to (Is your

    3.8K20发布于 2021-08-16
  • 来自专栏云云众生s

    Kafka主题分区时不要丢失消息

    使用 Golang IBM/sarama 在 Kafka 主题上消费新添加的分区中的事件。 我们将使用Golang作为编程语言,并使用IBM/sarama作为Kafka库。 我将使用Sarama的源代码注释来解释负载均衡策略。 轮询 此策略返回轮询负载均衡策略,该策略以交替顺序将分区分配给成员。 {"localhost:9092"} topic := "example-topic" config := sarama.NewConfig() config.Version = sarama.V3 , err := sarama.NewSyncProducer(brokers, config) if err !

    81910编辑于 2024-12-23
  • 来自专栏福大大架构师每日一题

    docker和k3s安装kafka,go语言发送和接收kafka消息

    . package main import ( "fmt" "log" "time" "github.com/IBM/sarama" ) func main() { // 配置生产者 config := sarama.NewConfig() config.Producer.Return.Successes = true { Topic: "test-topic", Key: sarama.StringEncoder("message-key"), Value: sarama.StringEncoder returnnil } func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim // 创建消费者组 consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err !

    30110编辑于 2025-12-18
  • 来自专栏Java架构师必看

    golang kafka客户端实现

    最近在弄golang框架的事情,连接kafka,目前采用的是sarama进行连接,开发测试是ok的,但是考虑到在生产环境中使用。 sarama.SyncProducer consumer sarama.Consumer consumerGroup sarama.ConsumerGroup } func var kafkaAsyncProducer sarama.AsyncProducer var kafkaConsumer sarama.Consumer func init() { brokerListStr 为了解决这个问题,通过查询资料和网上的相关内容发现sarama有一个cluster,已经解决了这个问题的。 " cluster "github.com/bsm/sarama-cluster" "strings" ) type KafkaClient struct { asyncProducer sarama.AsyncProducer

    2.9K30发布于 2021-05-14
  • 来自专栏OpenIM

    Golang正确使用kafka的姿势-细节决定成败

    下面我们来一起看一下如何使用sarama包来解决这些问题。 config := sarama.NewConfig() 2. 1. consumerConfig := sarama.NewConfig() 2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig. 3. 6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim nil } 5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim

    2.6K00发布于 2021-09-22
  • 来自专栏golang从入门到进阶

    Go + Kafka实战指南!

    ("fmt""log""time""github.com/Shopify/sarama")funcmain(){//1️⃣Kafka配置config:=sarama.NewConfig()config.Producer.Return.Successes ,error){varproducers[]sarama.SyncProducerfori:=0;i<poolSize;i++{config:=sarama.NewConfig()config.Producer.Return.Successes =trueproducer,err:=sarama.NewSyncProducer(brokers,config)iferr! {Topic:topic,Key:sarama.StringEncoder(userID),//相同userID=相同分区Value:sarama.StringEncoder(payload),}原理: 用Go和Sarama库实现Kafka,可以获得高度的灵活性和性能。关键是理解基本概念并遵循经过验证的最佳实践。

    17010编辑于 2026-03-27
  • 来自专栏coder修行路

    Go实现海量日志收集系统(二)

    /config/server.properties 操作kafka需要安装一个包:go get github.com/Shopify/sarama 写一个简单的代码,通过go调用往kafka里扔数据: package main import ( "github.com/Shopify/sarama" "fmt" ) func main() { config := sarama.NewConfig () config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config) if err ! () config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner

    3.8K101发布于 2018-03-30
  • 来自专栏Stephen

    Golang中使用Kafka实现消息队列发布订阅

    使用golang创建同步消息生产者package mainimport ("fmt""github.com/Shopify/sarama""log""time")var address = []string = trueconfig.Producer.Timeout = 5 * time.Secondproducer, err := sarama.NewSyncProducer(address, config {Topic: "topic1", // 主题名称Value: sarama.ByteEncoder(value), // 消息内容}// 发送消息part, offset ""log""time")var address = []string{"192.168.10.232:9092"}func main() {// 配置config := sarama.NewConfig mainimport ("fmt""os""os/signal"cluster "github.com/bsm/sarama-cluster")func main() {// 配置config :=

    1.8K41编辑于 2022-08-09
  • 来自专栏火丁笔记

    关于OCR项目的流水账

    看上去似乎 kafka-go 最好,confluent-kafka-go 次之,sarama 最烂,可是当我问一个鹅厂小伙伴的时候,他说他们都用 sarama,信大厂得永生,于是乎我也决定选 sarama 但是不管怎么说,使用 sarama 的案例相对更多,用起来也更安心些,不过用之前要清楚坑在哪: Golang中如何正确的使用sarama包操作Kafka? 为什么不推荐使用Sarama Go客户端收发消息? Sarama 的版本 一开始用 sarama 的时候,就遭到了当头棒喝,遇到了如下错误: ERROR: Failed to open Kafka producer: kafka: client has go.mod 文件中: replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.27.0 多个 goroutines 的协同

    1.2K10编辑于 2021-12-14
  • 来自专栏利志分享

    go的kafka生产和消费

    关于go的client,官方推荐有个如下几个: https://github.com/Shopify/sarama https://github.com/stealthly/go_kafka_client /stealthly/siesta https://github.com/optiopay/kafka https://github.com/nuance/kafka 我这里使用的就是官方推荐的第一个sarama sarama.SyncProducer func InitKafkaProducer(addressList string) { var err error mqConfig := sarama.NewConfig () // 设置producer // 发送完数据需要leader和follow都确认 mqConfig.Producer.RequiredAcks = sarama.WaitForAll 1 kafkaClient, err := sarama.NewClient(strings.Split(addressList, ","), mqConfig) if err !

    1.9K30编辑于 2022-04-25
  • 来自专栏仙士可博客

    kafka学习一:docker安装kafka

    local kafka_data: driver: local 推数据测试 package main import ( "fmt" "github.com/Shopify/sarama " ) func main() { //kafka配置项 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner {} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") defer client.Close } fmt.Printf("pid:%v offset:%v\n", pid, offset) } 拉数据测试 func consumer() { consumer, err := sarama.NewConsumer

    5.1K10编辑于 2023-02-16
  • 来自专栏FunTester

    Kafka测试初探【Go】

    我用的是shopify出的sarama,依赖如下github.com/Shopify/sarama v1.38.1。在搜资料的过程中,还发现有使用其他客户端的,选择挺多。 Kafka配置 Sarama框架中的生产者和消费者的配置类是一个,不太清楚这么设计的意图,两个配置重合度并不高,在Sarama中也是分开配置,但使用了同一个配置类。 = true config.Producer.RequiredAcks = sarama.NoResponse config.Producer.Compression = sarama.CompressionLZ4 = nil { log.Fatal(err) return } }() // 定义需要发送的消息 headers := []sarama.RecordHeader{sarama.RecordHeader Topic: "topic_test", Key: sarama.StringEncoder("test"), Value: sarama.StringEncoder("ddddddddddddddddd

    42050编辑于 2023-08-04
领券