2323import sys
2424import threading
2525import time
26+ from typing import Any , Callable , cast , TypeVar
2627
2728# special case python < 3.8
2829if sys .version_info .major == 3 and sys .version_info .minor < 8 :
2930 import mock
3031else :
3132 from unittest import mock
3233
34+ from flaky import flaky
3335import pytest
3436
3537import google .auth
4345
4446from test_utils .system import unique_resource_id
4547
48+ C = TypeVar ("C" , bound = Callable [..., Any ])
49+ typed_flaky = cast (Callable [[C ], C ], flaky (max_runs = 3 , min_passes = 1 ))
50+
4651
4752@pytest .fixture (scope = "module" )
4853def project ():
@@ -61,13 +66,13 @@ def subscriber(request):
6166
6267
6368@pytest .fixture
64- def topic_path (project , publisher ):
69+ def topic_path_base (project , publisher ):
6570 topic_name = "t" + unique_resource_id ("-" )
6671 yield publisher .topic_path (project , topic_name )
6772
6873
6974@pytest .fixture
70- def subscription_path (project , subscriber ):
75+ def subscription_path_base (project , subscriber ):
7176 sub_name = "s" + unique_resource_id ("-" )
7277 yield subscriber .subscription_path (project , sub_name )
7378
@@ -82,7 +87,9 @@ def cleanup():
8287 to_call (* args , ** kwargs )
8388
8489
85- def test_publish_messages (publisher , topic_path , cleanup ):
90+ def test_publish_messages (publisher , topic_path_base , cleanup ):
91+ # Customize topic path to test.
92+ topic_path = topic_path_base + "-publish-messages"
8693 # Make sure the topic gets deleted.
8794 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
8895
@@ -100,7 +107,9 @@ def test_publish_messages(publisher, topic_path, cleanup):
100107 assert isinstance (result , str )
101108
102109
103- def test_publish_large_messages (publisher , topic_path , cleanup ):
110+ def test_publish_large_messages (publisher , topic_path_base , cleanup ):
111+ # Customize topic path to test.
112+ topic_path = topic_path_base + "-publish-large-messages"
104113 # Make sure the topic gets deleted.
105114 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
106115
@@ -130,8 +139,11 @@ def test_publish_large_messages(publisher, topic_path, cleanup):
130139
131140
132141def test_subscribe_to_messages (
133- publisher , topic_path , subscriber , subscription_path , cleanup
142+ publisher , topic_path_base , subscriber , subscription_path_base , cleanup
134143):
144+ # Customize topic path to test.
145+ topic_path = topic_path_base + "-subscribe-to-messages"
146+ subscription_path = subscription_path_base + "-subscribe-to-messages"
135147 # Make sure the topic and subscription get deleted.
136148 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
137149 cleanup .append (
@@ -175,8 +187,12 @@ def test_subscribe_to_messages(
175187
176188
177189def test_subscribe_to_messages_async_callbacks (
178- publisher , topic_path , subscriber , subscription_path , cleanup
190+ publisher , topic_path_base , subscriber , subscription_path_base , cleanup
179191):
192+ # Customize topic path to test.
193+ custom_str = "-subscribe-to-messages-async-callback"
194+ topic_path = topic_path_base + custom_str
195+ subscription_path = subscription_path_base + custom_str
180196 # Make sure the topic and subscription get deleted.
181197 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
182198 cleanup .append (
@@ -227,8 +243,12 @@ def test_subscribe_to_messages_async_callbacks(
227243
228244
229245def test_creating_subscriptions_with_non_default_settings (
230- publisher , subscriber , project , topic_path , subscription_path , cleanup
246+ publisher , subscriber , project , topic_path_base , subscription_path_base , cleanup
231247):
248+ # Customize topic path to test.
249+ custom_str = "-creating-subscriptions-with-non-default-settings"
250+ topic_path = topic_path_base + custom_str
251+ subscription_path = subscription_path_base + custom_str
232252 # Make sure the topic and subscription get deleted.
233253 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
234254 cleanup .append (
@@ -346,7 +366,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
346366 assert subscriptions == {subscription_paths [0 ], subscription_paths [2 ]}
347367
348368
349- def test_managing_topic_iam_policy (publisher , topic_path , cleanup ):
369+ def test_managing_topic_iam_policy (publisher , topic_path_base , cleanup ):
370+ topic_path = topic_path_base + "-managing-topic-iam-policy"
350371 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
351372
352373 # create a topic and customize its policy
@@ -375,8 +396,11 @@ def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
375396
376397
377398def test_managing_subscription_iam_policy (
378- publisher , subscriber , topic_path , subscription_path , cleanup
399+ publisher , subscriber , topic_path_base , subscription_path_base , cleanup
379400):
401+ custom_str = "-managing-subscription-iam-policy"
402+ topic_path = topic_path_base + custom_str
403+ subscription_path = subscription_path_base + custom_str
380404 # Make sure the topic and subscription get deleted.
381405 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
382406 cleanup .append (
@@ -410,7 +434,7 @@ def test_managing_subscription_iam_policy(
410434
411435
412436def test_subscriber_not_leaking_open_sockets (
413- publisher , topic_path , subscription_path , cleanup
437+ publisher , topic_path_base , subscription_path_base , cleanup
414438):
415439 # Make sure the topic and the supscription get deleted.
416440 # NOTE: Since subscriber client will be closed in the test, we should not
@@ -419,8 +443,12 @@ def test_subscriber_not_leaking_open_sockets(
419443 # Also, since the client will get closed, we need another subscriber client
420444 # to clean up the subscription. We also need to make sure that auxiliary
421445 # subscriber releases the sockets, too.
446+ custom_str = "-not-leaking-open-sockets"
447+ subscription_path = subscription_path_base + custom_str
448+ topic_path = topic_path_base + custom_str
422449 subscriber = pubsub_v1 .SubscriberClient (transport = "grpc" )
423450 subscriber_2 = pubsub_v1 .SubscriberClient (transport = "grpc" )
451+
424452 cleanup .append (
425453 (subscriber_2 .delete_subscription , (), {"subscription" : subscription_path })
426454 )
@@ -460,8 +488,11 @@ def test_subscriber_not_leaking_open_sockets(
460488
461489
462490def test_synchronous_pull_no_deadline_error_if_no_messages (
463- publisher , topic_path , subscriber , subscription_path , cleanup
491+ publisher , topic_path_base , subscriber , subscription_path_base , cleanup
464492):
493+ custom_str = "-synchronous-pull-deadline-error-if-no-messages"
494+ topic_path = topic_path_base + custom_str
495+ subscription_path = subscription_path_base + custom_str
465496 # Make sure the topic and subscription get deleted.
466497 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
467498 cleanup .append (
@@ -485,8 +516,11 @@ def test_synchronous_pull_no_deadline_error_if_no_messages(
485516
486517class TestStreamingPull (object ):
487518 def test_streaming_pull_callback_error_propagation (
488- self , publisher , topic_path , subscriber , subscription_path , cleanup
519+ self , publisher , topic_path_base , subscriber , subscription_path_base , cleanup
489520 ):
521+ custom_str = "-streaming-pull-callback-error-propagation"
522+ topic_path = topic_path_base + custom_str
523+ subscription_path = subscription_path_base + custom_str
490524 # Make sure the topic and subscription get deleted.
491525 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
492526 cleanup .append (
@@ -512,9 +546,19 @@ class CallbackError(Exception):
512546 with pytest .raises (CallbackError ):
513547 future .result (timeout = 30 )
514548
549+ @typed_flaky
515550 def test_streaming_pull_ack_deadline (
516- self , publisher , subscriber , project , topic_path , subscription_path , cleanup
551+ self ,
552+ publisher ,
553+ subscriber ,
554+ project ,
555+ topic_path_base ,
556+ subscription_path_base ,
557+ cleanup ,
517558 ):
559+ custom_str = "-streaming-pull-ack-deadline"
560+ topic_path = topic_path_base + custom_str
561+ subscription_path = subscription_path_base + custom_str
518562 # Make sure the topic and subscription get deleted.
519563 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
520564 cleanup .append (
@@ -564,8 +608,11 @@ def test_streaming_pull_ack_deadline(
564608 subscription_future .cancel ()
565609
566610 def test_streaming_pull_max_messages (
567- self , publisher , topic_path , subscriber , subscription_path , cleanup
611+ self , publisher , topic_path_base , subscriber , subscription_path_base , cleanup
568612 ):
613+ custom_str = "-streaming-pull-max-messages"
614+ topic_path = topic_path_base + custom_str
615+ subscription_path = subscription_path_base + custom_str
569616 # Make sure the topic and subscription get deleted.
570617 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
571618 cleanup .append (
@@ -619,9 +666,13 @@ def test_streaming_pull_max_messages(
619666 finally :
620667 subscription_future .cancel () # trigger clean shutdown
621668
669+ @typed_flaky
622670 def test_streaming_pull_blocking_shutdown (
623- self , publisher , topic_path , subscriber , subscription_path , cleanup
671+ self , publisher , topic_path_base , subscriber , subscription_path_base , cleanup
624672 ):
673+ custom_str = "-streaming-pull-blocking-shutdown"
674+ topic_path = topic_path_base + custom_str
675+ subscription_path = subscription_path_base + custom_str
625676 # Make sure the topic and subscription get deleted.
626677 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
627678 cleanup .append (
@@ -702,9 +753,11 @@ def callback2(message):
702753)
703754class TestBasicRBAC (object ):
704755 def test_streaming_pull_subscriber_permissions_sufficient (
705- self , publisher , topic_path , subscriber , subscription_path , cleanup
756+ self , publisher , topic_path_base , subscriber , subscription_path_base , cleanup
706757 ):
707-
758+ custom_str = "-streaming-pull-subscriber-permissions-sufficient"
759+ topic_path = topic_path_base + custom_str
760+ subscription_path = subscription_path_base + custom_str
708761 # Make sure the topic and subscription get deleted.
709762 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
710763 cleanup .append (
@@ -739,9 +792,11 @@ def test_streaming_pull_subscriber_permissions_sufficient(
739792 future .cancel ()
740793
741794 def test_publisher_role_can_publish_messages (
742- self , publisher , topic_path , subscriber , subscription_path , cleanup
795+ self , publisher , topic_path_base , subscriber , subscription_path_base , cleanup
743796 ):
744-
797+ custom_str = "-publisher-role-can-publish-messages"
798+ topic_path = topic_path_base + custom_str
799+ subscription_path = subscription_path_base + custom_str
745800 # Make sure the topic and subscription get deleted.
746801 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
747802 cleanup .append (
@@ -767,8 +822,17 @@ def test_publisher_role_can_publish_messages(
767822 "Snapshot creation is not instant on the backend, causing test falkiness."
768823 )
769824 def test_snapshot_seek_subscriber_permissions_sufficient (
770- self , project , publisher , topic_path , subscriber , subscription_path , cleanup
825+ self ,
826+ project ,
827+ publisher ,
828+ topic_path_base ,
829+ subscriber ,
830+ subscription_path_base ,
831+ cleanup ,
771832 ):
833+ custom_str = "-snapshot-seek-subscriber-permissions-sufficient"
834+ topic_path = topic_path_base + custom_str
835+ subscription_path = subscription_path_base + custom_str
772836 snapshot_name = "snap" + unique_resource_id ("-" )
773837 snapshot_path = "projects/{}/snapshots/{}" .format (project , snapshot_name )
774838
@@ -813,10 +877,10 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
813877 assert len (response .received_messages ) == 1
814878
815879 def test_viewer_role_can_list_resources (
816- self , project , publisher , topic_path , subscriber , cleanup
880+ self , project , publisher , topic_path_base , subscriber , cleanup
817881 ):
818882 project_path = "projects/" + project
819-
883+ topic_path = topic_path_base + "-viewer-role-can-list-resources"
820884 # Make sure the created topic gets deleted.
821885 cleanup .append ((publisher .delete_topic , (), {"topic" : topic_path }))
822886
@@ -844,8 +908,17 @@ def test_viewer_role_can_list_resources(
844908 next (iter (viewer_only_subscriber .list_snapshots (project = project_path )), None )
845909
846910 def test_editor_role_can_create_resources (
847- self , project , publisher , topic_path , subscriber , subscription_path , cleanup
911+ self ,
912+ project ,
913+ publisher ,
914+ topic_path_base ,
915+ subscriber ,
916+ subscription_path_base ,
917+ cleanup ,
848918 ):
919+ custom_str = "-editor-role-can-create-resources"
920+ topic_path = topic_path_base + custom_str
921+ subscription_path = subscription_path_base + custom_str
849922 snapshot_name = "snap" + unique_resource_id ("-" )
850923 snapshot_path = "projects/{}/snapshots/{}" .format (project , snapshot_name )
851924
0 commit comments