我试图让kafka成为消费者,它将在一段特定的时间内收集消息,之后我可以手动提交已收集的消息。但是我在shopify sarama找不到可以用来提交一条或一批消息的方法或api,请帮帮忙
发布于 2020-11-25 20:09:17
使用自动提交,你不能完全控制它什么时候发生。它是周期性的,并且在幕后为你发生。如果对您不合适,您还可以使用ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string)在您想要的时候(因此,即使在特定的时间之后)将偏移量作为一批使用的消息的最后一个提交。
发布于 2020-11-25 18:30:39
您可以使用偏移配置的AutoCommit字段的间隔参数:
// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// Deprecated: CommitInterval exists for historical compatibility
// and should not be used. Please use Consumer.Offsets.AutoCommit
CommitInterval time.Duration
// AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}举个例子:
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...
// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minutehttps://stackoverflow.com/questions/65001853
复制相似问题