Skip to content

Commit 0b51b1b

Browse files
authored
CI: Add typed_flaky to streaming_pull system tests (#895)
1 parent ee2ea73 commit 0b51b1b

File tree

3 files changed

+98
-24
lines changed

3 files changed

+98
-24
lines changed

noxfile.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
]
5656
SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [
5757
"psutil",
58+
"flaky",
5859
]
5960
SYSTEM_TEST_LOCAL_DEPENDENCIES = []
6061
SYSTEM_TEST_DEPENDENCIES = []

owlbot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@
338338
versions=gcp.common.detect_versions(path="./google", default_first=True),
339339
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10", "3.11"],
340340
system_test_python_versions=["3.10"],
341-
system_test_external_dependencies=["psutil"],
341+
system_test_external_dependencies=["psutil","flaky"],
342342
)
343343
s.move(templated_files, excludes=[".coveragerc", ".github/release-please.yml", "README.rst", "docs/index.rst"])
344344

tests/system.py

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import sys
2424
import threading
2525
import time
26+
from typing import Any, Callable, cast, TypeVar
2627

2728
# special case python < 3.8
2829
if sys.version_info.major == 3 and sys.version_info.minor < 8:
2930
import mock
3031
else:
3132
from unittest import mock
3233

34+
from flaky import flaky
3335
import pytest
3436

3537
import google.auth
@@ -43,6 +45,9 @@
4345

4446
from test_utils.system import unique_resource_id
4547

48+
C = TypeVar("C", bound=Callable[..., Any])
49+
typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1))
50+
4651

4752
@pytest.fixture(scope="module")
4853
def project():
@@ -61,13 +66,13 @@ def subscriber(request):
6166

6267

6368
@pytest.fixture
64-
def topic_path(project, publisher):
69+
def topic_path_base(project, publisher):
6570
topic_name = "t" + unique_resource_id("-")
6671
yield publisher.topic_path(project, topic_name)
6772

6873

6974
@pytest.fixture
70-
def subscription_path(project, subscriber):
75+
def subscription_path_base(project, subscriber):
7176
sub_name = "s" + unique_resource_id("-")
7277
yield subscriber.subscription_path(project, sub_name)
7378

@@ -82,7 +87,9 @@ def cleanup():
8287
to_call(*args, **kwargs)
8388

8489

85-
def test_publish_messages(publisher, topic_path, cleanup):
90+
def test_publish_messages(publisher, topic_path_base, cleanup):
91+
# Customize topic path to test.
92+
topic_path = topic_path_base + "-publish-messages"
8693
# Make sure the topic gets deleted.
8794
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
8895

@@ -100,7 +107,9 @@ def test_publish_messages(publisher, topic_path, cleanup):
100107
assert isinstance(result, str)
101108

102109

103-
def test_publish_large_messages(publisher, topic_path, cleanup):
110+
def test_publish_large_messages(publisher, topic_path_base, cleanup):
111+
# Customize topic path to test.
112+
topic_path = topic_path_base + "-publish-large-messages"
104113
# Make sure the topic gets deleted.
105114
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
106115

@@ -130,8 +139,11 @@ def test_publish_large_messages(publisher, topic_path, cleanup):
130139

131140

132141
def test_subscribe_to_messages(
133-
publisher, topic_path, subscriber, subscription_path, cleanup
142+
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
134143
):
144+
# Customize topic path to test.
145+
topic_path = topic_path_base + "-subscribe-to-messages"
146+
subscription_path = subscription_path_base + "-subscribe-to-messages"
135147
# Make sure the topic and subscription get deleted.
136148
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
137149
cleanup.append(
@@ -175,8 +187,12 @@ def test_subscribe_to_messages(
175187

176188

177189
def test_subscribe_to_messages_async_callbacks(
178-
publisher, topic_path, subscriber, subscription_path, cleanup
190+
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
179191
):
192+
# Customize topic path to test.
193+
custom_str = "-subscribe-to-messages-async-callback"
194+
topic_path = topic_path_base + custom_str
195+
subscription_path = subscription_path_base + custom_str
180196
# Make sure the topic and subscription get deleted.
181197
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
182198
cleanup.append(
@@ -227,8 +243,12 @@ def test_subscribe_to_messages_async_callbacks(
227243

228244

229245
def test_creating_subscriptions_with_non_default_settings(
230-
publisher, subscriber, project, topic_path, subscription_path, cleanup
246+
publisher, subscriber, project, topic_path_base, subscription_path_base, cleanup
231247
):
248+
# Customize topic path to test.
249+
custom_str = "-creating-subscriptions-with-non-default-settings"
250+
topic_path = topic_path_base + custom_str
251+
subscription_path = subscription_path_base + custom_str
232252
# Make sure the topic and subscription get deleted.
233253
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
234254
cleanup.append(
@@ -346,7 +366,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
346366
assert subscriptions == {subscription_paths[0], subscription_paths[2]}
347367

348368

349-
def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
369+
def test_managing_topic_iam_policy(publisher, topic_path_base, cleanup):
370+
topic_path = topic_path_base + "-managing-topic-iam-policy"
350371
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
351372

352373
# create a topic and customize its policy
@@ -375,8 +396,11 @@ def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
375396

376397

377398
def test_managing_subscription_iam_policy(
378-
publisher, subscriber, topic_path, subscription_path, cleanup
399+
publisher, subscriber, topic_path_base, subscription_path_base, cleanup
379400
):
401+
custom_str = "-managing-subscription-iam-policy"
402+
topic_path = topic_path_base + custom_str
403+
subscription_path = subscription_path_base + custom_str
380404
# Make sure the topic and subscription get deleted.
381405
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
382406
cleanup.append(
@@ -410,7 +434,7 @@ def test_managing_subscription_iam_policy(
410434

411435

412436
def test_subscriber_not_leaking_open_sockets(
413-
publisher, topic_path, subscription_path, cleanup
437+
publisher, topic_path_base, subscription_path_base, cleanup
414438
):
415439
# Make sure the topic and the supscription get deleted.
416440
# NOTE: Since subscriber client will be closed in the test, we should not
@@ -419,8 +443,12 @@ def test_subscriber_not_leaking_open_sockets(
419443
# Also, since the client will get closed, we need another subscriber client
420444
# to clean up the subscription. We also need to make sure that auxiliary
421445
# subscriber releases the sockets, too.
446+
custom_str = "-not-leaking-open-sockets"
447+
subscription_path = subscription_path_base + custom_str
448+
topic_path = topic_path_base + custom_str
422449
subscriber = pubsub_v1.SubscriberClient(transport="grpc")
423450
subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc")
451+
424452
cleanup.append(
425453
(subscriber_2.delete_subscription, (), {"subscription": subscription_path})
426454
)
@@ -460,8 +488,11 @@ def test_subscriber_not_leaking_open_sockets(
460488

461489

462490
def test_synchronous_pull_no_deadline_error_if_no_messages(
463-
publisher, topic_path, subscriber, subscription_path, cleanup
491+
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
464492
):
493+
custom_str = "-synchronous-pull-deadline-error-if-no-messages"
494+
topic_path = topic_path_base + custom_str
495+
subscription_path = subscription_path_base + custom_str
465496
# Make sure the topic and subscription get deleted.
466497
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
467498
cleanup.append(
@@ -485,8 +516,11 @@ def test_synchronous_pull_no_deadline_error_if_no_messages(
485516

486517
class TestStreamingPull(object):
487518
def test_streaming_pull_callback_error_propagation(
488-
self, publisher, topic_path, subscriber, subscription_path, cleanup
519+
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
489520
):
521+
custom_str = "-streaming-pull-callback-error-propagation"
522+
topic_path = topic_path_base + custom_str
523+
subscription_path = subscription_path_base + custom_str
490524
# Make sure the topic and subscription get deleted.
491525
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
492526
cleanup.append(
@@ -512,9 +546,19 @@ class CallbackError(Exception):
512546
with pytest.raises(CallbackError):
513547
future.result(timeout=30)
514548

549+
@typed_flaky
515550
def test_streaming_pull_ack_deadline(
516-
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
551+
self,
552+
publisher,
553+
subscriber,
554+
project,
555+
topic_path_base,
556+
subscription_path_base,
557+
cleanup,
517558
):
559+
custom_str = "-streaming-pull-ack-deadline"
560+
topic_path = topic_path_base + custom_str
561+
subscription_path = subscription_path_base + custom_str
518562
# Make sure the topic and subscription get deleted.
519563
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
520564
cleanup.append(
@@ -564,8 +608,11 @@ def test_streaming_pull_ack_deadline(
564608
subscription_future.cancel()
565609

566610
def test_streaming_pull_max_messages(
567-
self, publisher, topic_path, subscriber, subscription_path, cleanup
611+
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
568612
):
613+
custom_str = "-streaming-pull-max-messages"
614+
topic_path = topic_path_base + custom_str
615+
subscription_path = subscription_path_base + custom_str
569616
# Make sure the topic and subscription get deleted.
570617
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
571618
cleanup.append(
@@ -619,9 +666,13 @@ def test_streaming_pull_max_messages(
619666
finally:
620667
subscription_future.cancel() # trigger clean shutdown
621668

669+
@typed_flaky
622670
def test_streaming_pull_blocking_shutdown(
623-
self, publisher, topic_path, subscriber, subscription_path, cleanup
671+
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
624672
):
673+
custom_str = "-streaming-pull-blocking-shutdown"
674+
topic_path = topic_path_base + custom_str
675+
subscription_path = subscription_path_base + custom_str
625676
# Make sure the topic and subscription get deleted.
626677
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
627678
cleanup.append(
@@ -702,9 +753,11 @@ def callback2(message):
702753
)
703754
class TestBasicRBAC(object):
704755
def test_streaming_pull_subscriber_permissions_sufficient(
705-
self, publisher, topic_path, subscriber, subscription_path, cleanup
756+
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
706757
):
707-
758+
custom_str = "-streaming-pull-subscriber-permissions-sufficient"
759+
topic_path = topic_path_base + custom_str
760+
subscription_path = subscription_path_base + custom_str
708761
# Make sure the topic and subscription get deleted.
709762
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
710763
cleanup.append(
@@ -739,9 +792,11 @@ def test_streaming_pull_subscriber_permissions_sufficient(
739792
future.cancel()
740793

741794
def test_publisher_role_can_publish_messages(
742-
self, publisher, topic_path, subscriber, subscription_path, cleanup
795+
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
743796
):
744-
797+
custom_str = "-publisher-role-can-publish-messages"
798+
topic_path = topic_path_base + custom_str
799+
subscription_path = subscription_path_base + custom_str
745800
# Make sure the topic and subscription get deleted.
746801
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
747802
cleanup.append(
@@ -767,8 +822,17 @@ def test_publisher_role_can_publish_messages(
767822
"Snapshot creation is not instant on the backend, causing test falkiness."
768823
)
769824
def test_snapshot_seek_subscriber_permissions_sufficient(
770-
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
825+
self,
826+
project,
827+
publisher,
828+
topic_path_base,
829+
subscriber,
830+
subscription_path_base,
831+
cleanup,
771832
):
833+
custom_str = "-snapshot-seek-subscriber-permissions-sufficient"
834+
topic_path = topic_path_base + custom_str
835+
subscription_path = subscription_path_base + custom_str
772836
snapshot_name = "snap" + unique_resource_id("-")
773837
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)
774838

@@ -813,10 +877,10 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
813877
assert len(response.received_messages) == 1
814878

815879
def test_viewer_role_can_list_resources(
816-
self, project, publisher, topic_path, subscriber, cleanup
880+
self, project, publisher, topic_path_base, subscriber, cleanup
817881
):
818882
project_path = "projects/" + project
819-
883+
topic_path = topic_path_base + "-viewer-role-can-list-resources"
820884
# Make sure the created topic gets deleted.
821885
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
822886

@@ -844,8 +908,17 @@ def test_viewer_role_can_list_resources(
844908
next(iter(viewer_only_subscriber.list_snapshots(project=project_path)), None)
845909

846910
def test_editor_role_can_create_resources(
847-
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
911+
self,
912+
project,
913+
publisher,
914+
topic_path_base,
915+
subscriber,
916+
subscription_path_base,
917+
cleanup,
848918
):
919+
custom_str = "-editor-role-can-create-resources"
920+
topic_path = topic_path_base + custom_str
921+
subscription_path = subscription_path_base + custom_str
849922
snapshot_name = "snap" + unique_resource_id("-")
850923
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)
851924

0 commit comments

Comments
 (0)