我一直在为golang开发一个Google /Sub客户端库。对戈朗来说很新鲜,我的目的是学习这门语言。不太确定什么是最佳实践,目的是在一段时间内学习它们。
让我直截了当地谈谈图书馆正在做的事情:
// Creates a client, and exposes deleteTopic, topicExists and createSubscription though the client
package pubsubclient
import (
"context"
"log"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"
)
type pubSubClient struct {
psclient *pubsub.Client
}
// getClient creates a pubsub client
func getClient(projectID string) (*pubSubClient, error) {
client, err := pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Printf("Error when creating pubsub client. Err: %v", err)
return nil, err
}
return &pubSubClient{psclient: client}, nil
}
// topicExists checks if a given topic exists
func (client *pubSubClient) topicExists(topicName string) (bool, error) {
topic := client.psclient.Topic(topicName)
return topic.Exists(context.Background())
}
// createTopic creates a topic if a topic name does not exist or returns one
// if it is already present
func (client *pubSubClient) createTopic(topicName string) (*pubsub.Topic, error) {
topicExists, err := client.topicExists(topicName)
if err != nil {
log.Printf("Could not check if topic exists. Error: %+v", err)
return nil, err
}
var topic *pubsub.Topic
if !topicExists {
topic, err = client.psclient.CreateTopic(context.Background(), topicName)
if err != nil {
log.Printf("Could not create topic. Err: %+v", err)
return nil, err
}
} else {
topic = client.psclient.Topic(topicName)
}
return topic, nil
}
// deleteTopic Deletes a topic
func (client *pubSubClient) deleteTopic(topicName string) error {
return client.psclient.Topic(topicName).Delete(context.Background())
}
// createSubscription creates the subscription to a topic
func (client *pubSubClient) createSubscription(subscriptionName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
subscription := client.psclient.Subscription(subscriptionName)
subscriptionExists, err := subscription.Exists(context.Background())
if err != nil {
log.Printf("Could not check if subscription %s exists. Err: %v", subscriptionName, err)
return nil, err
}
if !subscriptionExists {
cfg := pubsub.SubscriptionConfig{
Topic: topic,
// The subscriber has a configurable, limited amount of time -- known as the ackDeadline -- to acknowledge
// the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and
// Cloud Pub/Sub will attempt to redeliver the message.
AckDeadline: 60 * time.Second,
}
subscription, err = client.psclient.CreateSubscription(context.Background(), subscriptionName, cfg)
if err != nil {
log.Printf("Could not create subscription %s. Err: %v", subscriptionName, err)
return nil, err
}
subscription.ReceiveSettings = pubsub.ReceiveSettings{
// This is the maximum amount of messages that are allowed to be processed by the callback function at a time.
// Once this limit is reached, the client waits for messages to be acked or nacked by the callback before
// requesting more messages from the server.
MaxOutstandingMessages: 100,
// This is the maximum amount of time that the client will extend a message's deadline. This value should be
// set as high as messages are expected to be processed, plus some buffer.
MaxExtension: 10 * time.Second,
}
}
return subscription, nil
}这是发行者代码
package pubsubclient
import (
"context"
"encoding/json"
"cloud.google.com/go/pubsub"
)
// Publisher contract to be returned to the consumer
type Publisher struct {
topic *pubsub.Topic
}
// PublisherConfig to be provided by the consumer.
type PublisherConfig struct {
ProjectID string
TopicName string
}
// GetPublisher gives a publisher
func GetPublisher(config PublisherConfig) (*Publisher, error) {
client, err := getClient(config.ProjectID)
if err != nil {
return nil, err
}
topic, err := client.createTopic(config.TopicName)
if err != nil {
return nil, err
}
return &Publisher{
topic: topic,
}, nil
}
// Publish message to pubsub
func (publisher *Publisher) Publish(payload interface{}) (string, error) {
data, err := json.Marshal(payload)
if err != nil {
return ``, err
}
message := &pubsub.Message{
Data: data,
}
response := publisher.topic.Publish(context.Background(), message)
return response.Get(context.Background())
}这是订户代码
package pubsubclient
import (
"context"
"log"
"sync"
"cloud.google.com/go/pubsub"
)
// SubscribeMessageHandler that handles the message
type SubscribeMessageHandler func(chan *pubsub.Message)
// ErrorHandler that logs the error received while reading a message
type ErrorHandler func(error)
// SubscriberConfig subscriber config
type SubscriberConfig struct {
ProjectID string
TopicName string
SubscriptionName string
ErrorHandler ErrorHandler
Handle SubscribeMessageHandler
}
// Subscriber subscribe to a topic and pass each message to the
// handler function
type Subscriber struct {
topic *pubsub.Topic
subscription *pubsub.Subscription
errorHandler ErrorHandler
handle SubscribeMessageHandler
cancel func()
}
// CreateSubscription creates a subscription
func CreateSubscription(config SubscriberConfig) (*Subscriber, error) {
client, err := getClient(config.ProjectID)
if err != nil {
return nil, err
}
topic, err := client.createTopic(config.TopicName)
if err != nil {
return nil, err
}
subscription, err := client.createSubscription(config.SubscriptionName, topic)
if err != nil {
return nil, err
}
return &Subscriber{
topic: topic,
subscription: subscription,
errorHandler: config.ErrorHandler,
handle: config.Handle,
}, nil
}
// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier to orchestrate a use case where one application needs
// to subscribe to more than one topic
func (subscriber *Subscriber) Process(wg *sync.WaitGroup) {
log.Printf("Starting a Subscriber on topic %s", subscriber.topic.String())
output := make(chan *pubsub.Message)
go func(subscriber *Subscriber, output chan *pubsub.Message) {
defer close(output)
ctx := context.Background()
ctx, subscriber.cancel = context.WithCancel(ctx)
err := subscriber.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
output <- msg
})
if err != nil {
// The wait group is stopped or marked done when an error is encountered
subscriber.errorHandler(err)
subscriber.Stop()
wg.Done()
}
}(subscriber, output)
subscriber.handle(output)
}
// Stop the subscriber, closing the channel that was returned by Start.
func (subscriber *Subscriber) Stop() {
if subscriber.cancel != nil {
log.Print("Stopped the subscriber")
subscriber.cancel()
}
}如果这是一个非常大的代码审查,我将分解它。我的主要问题是订阅者的Process()方法。现在,这个方法接受一个waitGroup,并且不太确定这是否是一个好的设计。为了演示我设想如何使用该方法的示例:
// Process will start pulling from the pubsub. The process accepts a waitgroup as
// it will be easier for us to orchestrate a use case where one application needs
// more than one subscriber
var wg sync.WaitGroup
wg.Add(1)
go subscriber.Process(&wg)
publishMessages(publisher)
wg.Wait()这是正确的设计方法吗?还有其他我可能需要遵循的好设计模式吗?请让我知道。
发布于 2019-08-21 21:13:14
getClient,我确信您应该从外部通过context对象,而不是简单地用Background创建一个虚拟对象。而且,在某个时候,同时记录一个错误并将其传回会咬你一口,因为你最终会得到两个或更多相同错误的副本(除非你100%的努力只记录在原点的错误)。context注释实际上适用于所有其他方法。作为库的用户,我希望在上下文中传递,否则就没有什么意义了。createSubscription中的配置看起来应该是从外部来的,也就是说,有默认值,但是让用户重写它们。不过,如果没有将其解释为pubsub库的一部分,那么这些注释是很棒的。Process中,匿名函数不需要参数,它只需自动捕获subscriber和output的值。我会说等待小组很好吗?假设Stop调用将导致适当的错误,从而关闭运行Process的goroutine。对我来说有点模糊的是,如果errorHandler总是被调用,即使是通过Stop终止订阅时也是故意的呢?当然,这不是最糟糕的问题。
https://codereview.stackexchange.com/questions/226544
复制相似问题