From 19dd12903757fa16075f8dab4ed5a2fa0bbe5299 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 25 Nov 2019 10:04:56 -0500 Subject: [PATCH 1/5] Modifying Publish example in README to match other examples given, and fix issue #6784 --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b928096b..ca3e1c833 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,10 @@ With Pub/Sub you can publish messages to a topic. Add the following import at th ```java import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; ``` @@ -135,6 +138,16 @@ try { ByteString data = ByteString.copyFromUtf8("my-message"); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { + public void onSuccess(String messageId) { + System.out.println("published with message id: " + messageId); + } + + public void onFailure(Throwable t) { + System.out.println("failed to publish: " + t); + } + }, MoreExecutors.directExecutor()); + //... } finally { if (publisher != null) { publisher.shutdown(); @@ -284,4 +297,4 @@ Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5] [troubleshooting]: https://github.com/googleapis/google-cloud-common/blob/master/troubleshooting/readme.md#troubleshooting [contributing]: https://github.com/googleapis/java-pubsub/blob/master/CONTRIBUTING.md [code-of-conduct]: https://github.com/googleapis/java-pubsub/blob/master/CODE_OF_CONDUCT.md#contributor-code-of-conduct -[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE \ No newline at end of file +[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE From 41585299b46e4a471252c124bb412a92d371ce7a Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 25 Nov 2019 10:04:56 -0500 Subject: [PATCH 2/5] fix: Modifying Publish example in README to match other examples, and fix Issue #11 --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b928096b..ca3e1c833 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,10 @@ With Pub/Sub you can publish messages to a topic. Add the following import at th ```java import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; ``` @@ -135,6 +138,16 @@ try { ByteString data = ByteString.copyFromUtf8("my-message"); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { + public void onSuccess(String messageId) { + System.out.println("published with message id: " + messageId); + } + + public void onFailure(Throwable t) { + System.out.println("failed to publish: " + t); + } + }, MoreExecutors.directExecutor()); + //... } finally { if (publisher != null) { publisher.shutdown(); @@ -284,4 +297,4 @@ Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5] [troubleshooting]: https://github.com/googleapis/google-cloud-common/blob/master/troubleshooting/readme.md#troubleshooting [contributing]: https://github.com/googleapis/java-pubsub/blob/master/CONTRIBUTING.md [code-of-conduct]: https://github.com/googleapis/java-pubsub/blob/master/CODE_OF_CONDUCT.md#contributor-code-of-conduct -[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE \ No newline at end of file +[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE From 01d41b726a0c5655b079d641594ef7dd9c279672 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 13 Jan 2020 12:40:07 -0500 Subject: [PATCH 3/5] feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. --- .../com/google/cloud/pubsub/v1/MessageDispatcher.java | 9 ++++++++- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 8 ++++++++ .../google/cloud/pubsub/v1/MessageDispatcherTest.java | 8 +++++++- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 9 +++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 53b979d5e..34b482cd8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -341,10 +341,17 @@ private void processBatch(List batch) { // This should be a blocking flow controller and never throw an exception. throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler); + processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler); } } + private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { + return PubsubMessage.newBuilder(receivedMessage.getMessage()) + .putAttributes( + "googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt())) + .build(); + } + private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { final SettableApiFuture response = SettableApiFuture.create(); final AckReplyConsumer consumer = diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 059ac0ce4..d08ae921b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -44,6 +44,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -205,6 +206,13 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + public static int getDeliveryAttempt(PubsubMessage message) { + if (!message.containsAttributes("googclient_deliveryattempt")) { + throw new RuntimeException("Message does not contain delivery attempt information"); + } + return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); + } + /** Subscription which the subscriber is subscribed to. */ public String getSubscriptionNameString() { return subscriptionName; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 163475407..b84e82c8c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -37,10 +37,13 @@ import org.threeten.bp.Duration; public class MessageDispatcherTest { + private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); + private static final int DELIVERY_INFO_COUNT = 3; private static final ReceivedMessage TEST_MESSAGE = ReceivedMessage.newBuilder() .setAckId("ackid") - .setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build()) + .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) .build(); private static final Runnable NOOP_RUNNABLE = new Runnable() { @@ -78,6 +81,9 @@ public void setUp() { new MessageReceiver() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + assertThat(message.getData()).isEqualTo(MESSAGE_DATA); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); consumers.add(consumer); } }; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 5512145e1..4f0a9600d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -84,6 +84,15 @@ public void tearDown() throws Exception { testChannel.shutdown(); } + @Test + public void testDeliveryAttemptHelper() { + int deliveryAttempt = 3; + PubsubMessage message = PubsubMessage.newBuilder() + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); + assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + } + @Test public void testOpenedChannels() throws Exception { int expectedChannelCount = 1; From ed3b1eeefcc5b5eaa941e6138ab9c9850553577b Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 13 Jan 2020 12:53:49 -0500 Subject: [PATCH 4/5] Fix formatting --- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 4f0a9600d..11a67c0d3 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -87,9 +87,10 @@ public void tearDown() throws Exception { @Test public void testDeliveryAttemptHelper() { int deliveryAttempt = 3; - PubsubMessage message = PubsubMessage.newBuilder() - .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) - .build(); + PubsubMessage message = + PubsubMessage.newBuilder() + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); } From 72f7996fd7c0fd9f5e786c151ca2168b8eefb520 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 14 Jan 2020 09:27:51 -0500 Subject: [PATCH 5/5] fix: making changes requested in pull request --- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 6 ++---- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 3 +++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index d08ae921b..422a0577f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -206,11 +206,9 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + /** Returns the delivery attempt count for a received {@link PubsubMessage} */ public static int getDeliveryAttempt(PubsubMessage message) { - if (!message.containsAttributes("googclient_deliveryattempt")) { - throw new RuntimeException("Message does not contain delivery attempt information"); - } - return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); + return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0")); } /** Subscription which the subscriber is subscribed to. */ diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 11a67c0d3..8f8489f21 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -92,6 +92,9 @@ public void testDeliveryAttemptHelper() { .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + + PubsubMessage emptyMessage = PubsubMessage.newBuilder().build(); + assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0); } @Test