@@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
9494 # [END pubsub_create_pull_subscription]
9595
9696
97+ def optimistic_subscribe (
98+ project_id : str ,
99+ topic_id : str ,
100+ subscription_id : str ,
101+ timeout : Optional [float ] = None ,
102+ ) -> None :
103+ """Optimistically subscribe to messages instead of making calls to verify existence
104+ of a subscription first and then subscribing to messages from it. This avoids admin
105+ operation calls to verify the existence of a subscription and reduces the probability
106+ of running out of quota for admin operations."""
107+ # [START pubsub_optimistic_subscribe]
108+ from google .api_core .exceptions import NotFound
109+ from google .cloud import pubsub_v1
110+ from concurrent .futures import TimeoutError
111+
112+ # TODO(developer)
113+ # project_id = "your-project-id"
114+ # subscription_id = "your-subscription-id"
115+ # Number of seconds the subscriber should listen for messages
116+ # timeout = 5.0
117+ # topic_id = "your-topic-id"
118+
119+ # Create a subscriber client.
120+ subscriber = pubsub_v1 .SubscriberClient ()
121+
122+ # The `subscription_path` method creates a fully qualified identifier
123+ # in the form `projects/{project_id}/subscriptions/{subscription_id}`
124+ subscription_path = subscriber .subscription_path (project_id , subscription_id )
125+
126+ # Define callback to be called when a message is received.
127+ def callback (message : pubsub_v1 .subscriber .message .Message ) -> None :
128+ # Ack message after processing it.
129+ message .ack ()
130+
131+ # Wrap subscriber in a 'with' block to automatically call close() when done.
132+ with subscriber :
133+ try :
134+ # Optimistically subscribe to messages on the subscription.
135+ streaming_pull_future = subscriber .subscribe (
136+ subscription_path , callback = callback
137+ )
138+ streaming_pull_future .result (timeout = timeout )
139+ except TimeoutError :
140+ print ("Successfully subscribed until the timeout passed." )
141+ streaming_pull_future .cancel () # Trigger the shutdown.
142+ streaming_pull_future .result () # Block until the shutdown is complete.
143+ except NotFound :
144+ print (f"Subscription { subscription_path } not found, creating it." )
145+
146+ try :
147+ # If the subscription does not exist, then create it.
148+ publisher = pubsub_v1 .PublisherClient ()
149+ topic_path = publisher .topic_path (project_id , topic_id )
150+ subscription = subscriber .create_subscription (
151+ request = {"name" : subscription_path , "topic" : topic_path }
152+ )
153+
154+ if subscription :
155+ print (f"Subscription { subscription .name } created" )
156+ else :
157+ raise ValueError ("Subscription creation failed." )
158+
159+ # Subscribe on the created subscription.
160+ try :
161+ streaming_pull_future = subscriber .subscribe (
162+ subscription .name , callback = callback
163+ )
164+ streaming_pull_future .result (timeout = timeout )
165+ except TimeoutError :
166+ streaming_pull_future .cancel () # Trigger the shutdown.
167+ streaming_pull_future .result () # Block until the shutdown is complete.
168+ except Exception as e :
169+ print (
170+ f"Exception occurred when creating subscription and subscribing to it: { e } "
171+ )
172+ except Exception as e :
173+ print (f"Exception occurred when attempting optimistic subscribe: { e } " )
174+ # [END pubsub_optimistic_subscribe]
175+
176+
97177def create_subscription_with_dead_letter_topic (
98178 project_id : str ,
99179 topic_id : str ,
@@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
11611241 remove_dead_letter_policy_parser .add_argument ("topic_id" )
11621242 remove_dead_letter_policy_parser .add_argument ("subscription_id" )
11631243
1244+ optimistic_subscribe_parser = subparsers .add_parser (
1245+ "optimistic-subscribe" , help = optimistic_subscribe .__doc__
1246+ )
1247+ optimistic_subscribe_parser .add_argument ("topic_id" )
1248+ optimistic_subscribe_parser .add_argument ("subscription_id" )
1249+ optimistic_subscribe_parser .add_argument (
1250+ "timeout" , default = None , type = float , nargs = "?"
1251+ )
1252+
11641253 receive_parser = subparsers .add_parser ("receive" , help = receive_messages .__doc__ )
11651254 receive_parser .add_argument ("subscription_id" )
11661255 receive_parser .add_argument ("timeout" , default = None , type = float , nargs = "?" )
@@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
13031392 )
13041393 elif args .command == "remove-dead-letter-policy" :
13051394 remove_dead_letter_policy (args .project_id , args .topic_id , args .subscription_id )
1395+ elif args .command == "optimistic-subscribe" :
1396+ optimistic_subscribe (
1397+ args .project_id , args .topic_id , args .subscription_id , args .timeout
1398+ )
13061399 elif args .command == "receive" :
13071400 receive_messages (args .project_id , args .subscription_id , args .timeout )
13081401 elif args .command == "receive-custom-attributes" :
0 commit comments