Skip to content

Commit 983105d

Browse files
authored
feat(pubsub): expose common errors for easier handling (#7940)
* feat(pubsub): expose common errors for easier handling * unexport topic ordering not enabled error, switch ErrPublishingPaused to custom error
1 parent f037795 commit 983105d

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed

pubsub/integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bufio"
1919
"bytes"
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io/ioutil"
2324
"os"
@@ -1347,7 +1348,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
13471348
Data: []byte("should fail"),
13481349
OrderingKey: orderingKey,
13491350
})
1350-
if _, err := r.Get(ctx); err == nil || !strings.Contains(err.Error(), "pubsub: Publishing for ordering key") {
1351+
if _, err := r.Get(ctx); err == nil || !errors.As(err, &ErrPublishingPaused{}) {
13511352
t.Fatalf("expected ordering keys publish error, got %v", err)
13521353
}
13531354

pubsub/topic.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
535535
}
536536
}
537537

538-
var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
538+
// ErrTopicStopped indicates that topic has been stopped and further publishing will fail.
539+
var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")
539540

540541
// A PublishResult holds the result from a call to Publish.
541542
//
@@ -548,6 +549,8 @@ var errTopicStopped = errors.New("pubsub: Stop has been called for this topic")
548549
// }
549550
type PublishResult = ipubsub.PublishResult
550551

552+
var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")
553+
551554
// Publish publishes msg to the topic asynchronously. Messages are batched and
552555
// sent according to the topic's PublishSettings. Publish never blocks.
553556
//
@@ -565,7 +568,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
565568

566569
r := ipubsub.NewPublishResult()
567570
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
568-
ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering"))
571+
ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
569572
return r
570573
}
571574

@@ -582,7 +585,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
582585
defer t.mu.RUnlock()
583586
// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
584587
if t.stopped {
585-
ipubsub.SetPublishResult(r, "", errTopicStopped)
588+
ipubsub.SetPublishResult(r, "", ErrTopicStopped)
586589
return r
587590
}
588591

@@ -697,6 +700,16 @@ func (t *Topic) initBundler() {
697700
t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
698701
}
699702

703+
// ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
704+
type ErrPublishingPaused struct {
705+
OrderingKey string
706+
}
707+
708+
func (e ErrPublishingPaused) Error() string {
709+
return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey)
710+
711+
}
712+
700713
func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
701714
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
702715
if err != nil {
@@ -716,7 +729,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
716729
var res *pb.PublishResponse
717730
start := time.Now()
718731
if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
719-
err = fmt.Errorf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", orderingKey)
732+
err = ErrPublishingPaused{OrderingKey: orderingKey}
720733
} else {
721734
// Apply custom publish retryer on top of user specified retryer and
722735
// default retryer.

pubsub/topic_test.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package pubsub
1717
import (
1818
"bytes"
1919
"context"
20+
"errors"
2021
"fmt"
2122
"strings"
2223
"sync"
@@ -167,8 +168,8 @@ func TestStopPublishOrder(t *testing.T) {
167168
topic.Stop()
168169
r := topic.Publish(ctx, &Message{})
169170
_, err := r.Get(ctx)
170-
if err != errTopicStopped {
171-
t.Errorf("got %v, want errTopicStopped", err)
171+
if !errors.Is(err, ErrTopicStopped) {
172+
t.Errorf("got %v, want ErrTopicStopped", err)
172173
}
173174
}
174175

@@ -460,8 +461,8 @@ func TestFlushStopTopic(t *testing.T) {
460461
r5 := topic.Publish(ctx, &Message{
461462
Data: []byte("this should fail"),
462463
})
463-
if _, err := r5.Get(ctx); err != errTopicStopped {
464-
t.Errorf("got %v, want errTopicStopped", err)
464+
if _, err := r5.Get(ctx); !errors.Is(err, ErrTopicStopped) {
465+
t.Errorf("got %v, want ErrTopicStopped", err)
465466
}
466467
}
467468

@@ -673,3 +674,19 @@ func addSingleResponse(srv *pstest.Server, id string) {
673674
MessageIds: []string{id},
674675
}, nil)
675676
}
677+
678+
func TestPublishOrderingNotEnabled(t *testing.T) {
679+
ctx := context.Background()
680+
c, srv := newFake(t)
681+
defer c.Close()
682+
defer srv.Close()
683+
684+
topic, err := c.CreateTopic(ctx, "test-topic")
685+
if err != nil {
686+
t.Fatal(err)
687+
}
688+
res := publishSingleMessageWithKey(ctx, topic, "test", "non-existent-key")
689+
if _, err := res.Get(ctx); !errors.Is(err, errTopicOrderingNotEnabled) {
690+
t.Errorf("got %v, want errTopicOrderingNotEnabled", err)
691+
}
692+
}

0 commit comments

Comments
 (0)