首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google pubsub client in golang

Google pubsub client in golang
EN

Code Review用户
提问于 2019-08-21 06:43:58
回答 1查看 1.6K关注 0票数 4

我一直在为golang开发一个Google /Sub客户端库。对戈朗来说很新鲜,我的目的是学习这门语言。不太确定什么是最佳实践,目的是在一段时间内学习它们。

让我直截了当地谈谈图书馆正在做的事情:

  • 创建一个主题。
  • 创建订阅。
  • 将消息发布到主题。
  • 使用拉式订阅服务器输出单个主题消息。

代码语言:javascript
复制
// Creates a client, and exposes deleteTopic, topicExists and createSubscription though the client
package pubsubclient

import (
    "context"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
    "google.golang.org/api/iterator"
)

type pubSubClient struct {
    psclient *pubsub.Client
}

// getClient creates a pubsub client
func getClient(projectID string) (*pubSubClient, error) {
    client, err := pubsub.NewClient(context.Background(), projectID)
    if err != nil {
        log.Printf("Error when creating pubsub client. Err: %v", err)
        return nil, err
    }
    return &pubSubClient{psclient: client}, nil
}

// topicExists checks if a given topic exists
func (client *pubSubClient) topicExists(topicName string) (bool, error) {
    topic := client.psclient.Topic(topicName)
    return topic.Exists(context.Background())
}

// createTopic creates a topic if a topic name does not exist or returns one
// if it is already present
func (client *pubSubClient) createTopic(topicName string) (*pubsub.Topic, error) {
    topicExists, err := client.topicExists(topicName)
    if err != nil {
        log.Printf("Could not check if topic exists. Error: %+v", err)
        return nil, err
    }
    var topic *pubsub.Topic

    if !topicExists {
        topic, err = client.psclient.CreateTopic(context.Background(), topicName)
        if err != nil {
            log.Printf("Could not create topic. Err: %+v", err)
            return nil, err
        }
    } else {
        topic = client.psclient.Topic(topicName)
    }

    return topic, nil
}

// deleteTopic Deletes a topic
func (client *pubSubClient) deleteTopic(topicName string) error {
    return client.psclient.Topic(topicName).Delete(context.Background())
}

// createSubscription creates the subscription to a topic
func (client *pubSubClient) createSubscription(subscriptionName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
    subscription := client.psclient.Subscription(subscriptionName)

    subscriptionExists, err := subscription.Exists(context.Background())
    if err != nil {
        log.Printf("Could not check if subscription %s exists. Err: %v", subscriptionName, err)
        return nil, err
    }

    if !subscriptionExists {

        cfg := pubsub.SubscriptionConfig{
            Topic: topic,
            // The subscriber has a configurable, limited amount of time -- known as the ackDeadline -- to acknowledge
            // the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and
            // Cloud Pub/Sub will attempt to redeliver the message.
            AckDeadline: 60 * time.Second,
        }

        subscription, err = client.psclient.CreateSubscription(context.Background(), subscriptionName, cfg)
        if err != nil {
            log.Printf("Could not create subscription %s. Err: %v", subscriptionName, err)
            return nil, err
        }
        subscription.ReceiveSettings = pubsub.ReceiveSettings{
            // This is the maximum amount of messages that are allowed to be processed by the callback function at a time.
            // Once this limit is reached, the client waits for messages to be acked or nacked by the callback before
            // requesting more messages from the server.
            MaxOutstandingMessages: 100,
            // This is the maximum amount of time that the client will extend a message's deadline. This value should be
            // set as high as messages are expected to be processed, plus some buffer.
            MaxExtension: 10 * time.Second,
        }
    }
    return subscription, nil
}

这是发行者代码

代码语言:javascript
复制
package pubsubclient

import (
    "context"
    "encoding/json"

    "cloud.google.com/go/pubsub"
)

// Publisher contract to be returned to the consumer
type Publisher struct {
    topic *pubsub.Topic
}

// PublisherConfig to be provided by the consumer.
type PublisherConfig struct {
    ProjectID string
    TopicName string
}

// GetPublisher gives a publisher
func GetPublisher(config PublisherConfig) (*Publisher, error) {
    client, err := getClient(config.ProjectID)
    if err != nil {
        return nil, err
    }
    topic, err := client.createTopic(config.TopicName)
    if err != nil {
        return nil, err
    }
    return &Publisher{
        topic: topic,
    }, nil
}

// Publish message to pubsub
func (publisher *Publisher) Publish(payload interface{}) (string, error) {
    data, err := json.Marshal(payload)
    if err != nil {
        return ``, err
    }
    message := &pubsub.Message{
        Data: data,
    }
    response := publisher.topic.Publish(context.Background(), message)
    return response.Get(context.Background())
}

这是订户代码

代码语言:javascript
复制
package pubsubclient

import (
    "context"
    "log"
    "sync"

    "cloud.google.com/go/pubsub"
)

// SubscribeMessageHandler that handles the message
type SubscribeMessageHandler func(chan *pubsub.Message)

// ErrorHandler that logs the error received while reading a message
type ErrorHandler func(error)

// SubscriberConfig subscriber config
type SubscriberConfig struct {
    ProjectID        string
    TopicName        string
    SubscriptionName string
    ErrorHandler     ErrorHandler
    Handle           SubscribeMessageHandler
}

// Subscriber subscribe to a topic and pass each message to the
// handler function
type Subscriber struct {
    topic        *pubsub.Topic
    subscription *pubsub.Subscription
    errorHandler ErrorHandler
    handle       SubscribeMessageHandler
    cancel       func()
}

// CreateSubscription creates a subscription
func CreateSubscription(config SubscriberConfig) (*Subscriber, error) {
    client, err := getClient(config.ProjectID)
    if err != nil {
        return nil, err
    }
    topic, err := client.createTopic(config.TopicName)
    if err != nil {
        return nil, err
    }
    subscription, err := client.createSubscription(config.SubscriptionName, topic)
    if err != nil {
        return nil, err
    }
    return &Subscriber{
        topic:        topic,
        subscription: subscription,
        errorHandler: config.ErrorHandler,
        handle:       config.Handle,
    }, nil
}

// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier to orchestrate a use case where one application needs
// to subscribe to more than one topic
func (subscriber *Subscriber) Process(wg *sync.WaitGroup) {
    log.Printf("Starting a Subscriber on topic %s", subscriber.topic.String())
    output := make(chan *pubsub.Message)
    go func(subscriber *Subscriber, output chan *pubsub.Message) {
        defer close(output)

        ctx := context.Background()
        ctx, subscriber.cancel = context.WithCancel(ctx)
        err := subscriber.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
            output <- msg
        })

        if err != nil {
            // The wait group is stopped or marked done when an error is encountered
            subscriber.errorHandler(err)
            subscriber.Stop()
            wg.Done()
        }
    }(subscriber, output)

    subscriber.handle(output)
}

// Stop the subscriber, closing the channel that was returned by Start.
func (subscriber *Subscriber) Stop() {
    if subscriber.cancel != nil {
        log.Print("Stopped the subscriber")
        subscriber.cancel()
    }
}

如果这是一个非常大的代码审查,我将分解它。我的主要问题是订阅者的Process()方法。现在,这个方法接受一个waitGroup,并且不太确定这是否是一个好的设计。为了演示我设想如何使用该方法的示例:

代码语言:javascript
复制
// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier for us to orchestrate a use case where one application needs
// more than one subscriber
var wg sync.WaitGroup
wg.Add(1)
go subscriber.Process(&wg)
publishMessages(publisher)
wg.Wait()

这是正确的设计方法吗?还有其他我可能需要遵循的好设计模式吗?请让我知道。

EN

回答 1

Code Review用户

发布于 2019-08-21 21:13:14

  • getClient,我确信您应该从外部通过context对象,而不是简单地用Background创建一个虚拟对象。而且,在某个时候,同时记录一个错误并将其传回会咬你一口,因为你最终会得到两个或更多相同错误的副本(除非你100%的努力只记录在原点的错误)。
  • 同样的context注释实际上适用于所有其他方法。作为库的用户,我希望在上下文中传递,否则就没有什么意义了。
  • createSubscription中的配置看起来应该是从外部来的,也就是说,有默认值,但是让用户重写它们。不过,如果没有将其解释为pubsub库的一部分,那么这些注释是很棒的。
  • Process中,匿名函数不需要参数,它只需自动捕获subscriberoutput的值。

我会说等待小组很好吗?假设Stop调用将导致适当的错误,从而关闭运行Process的goroutine。对我来说有点模糊的是,如果errorHandler总是被调用,即使是通过Stop终止订阅时也是故意的呢?当然,这不是最糟糕的问题。

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/226544

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档