Skip to content

Commit e01d3f1

Browse files
docs(samples): Update Topic with Kinesis Ingestion Settings
Add code to demonstrate updating a topic with Kinesis Ingestion Settings IngestionDataSourceSettings were added as an attribute to Topic [here](https://github.com/googleapis/googleapis/blob/65277ddce9caa1cfd1a0eb7ab67980fc73d20b50/google/pubsub/v1/pubsub.proto#L316)
1 parent 83dc9ff commit e01d3f1

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

samples/snippets/publisher.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,58 @@ def create_topic_kinesis_ingestion(
105105
# [END pubsub_create_topic_kinesis_ingestion]
106106

107107

108+
def update_topic_kinesis_ingestion(
109+
project_id: str,
110+
topic_id: str,
111+
stream_arn: str,
112+
consumer_arn: str,
113+
aws_role_arn: str,
114+
gcp_service_account: str,
115+
) -> None:
116+
"""Update Pub/Sub topic with AWS Kinesis Ingestion Settings."""
117+
# [START pubsub_quickstart_update_topic_kinesis_ingestion]
118+
# [START pubsub_update_topic_kinesis_ingestion]
119+
from google.cloud import pubsub_v1
120+
from google.pubsub_v1.types import Topic
121+
from google.pubsub_v1.types import IngestionDataSourceSettings
122+
from google.pubsub_v1.types import UpdateTopicRequest
123+
from google.protobuf import field_mask_pb2
124+
125+
# TODO(developer)
126+
# project_id = "your-project-id"
127+
# topic_id = "your-topic-id"
128+
# stream_arn = "your-stream-arn"
129+
# consumer_arn = "your-consumer-arn"
130+
# aws_role_arn = "your-aws-role-arn"
131+
# gcp_service_account = "your-gcp-service-account"
132+
133+
publisher = pubsub_v1.PublisherClient()
134+
topic_path = publisher.topic_path(project_id, topic_id)
135+
136+
update_request = UpdateTopicRequest(
137+
topic=Topic(
138+
name=topic_path,
139+
ingestion_data_source_settings=IngestionDataSourceSettings(
140+
aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
141+
stream_arn=stream_arn,
142+
consumer_arn=consumer_arn,
143+
aws_role_arn=aws_role_arn,
144+
gcp_service_account=gcp_service_account,
145+
)
146+
),
147+
),
148+
update_mask=field_mask_pb2.FieldMask(
149+
paths=["ingestion_data_source_settings"]),
150+
)
151+
152+
topic = publisher.update_topic(request=update_request)
153+
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")
154+
155+
156+
# [END pubsub_quickstart_update_topic_kinesis_ingestion]
157+
# [END pubsub_update_topic_kinesis_ingestion]
158+
159+
108160
def delete_topic(project_id: str, topic_id: str) -> None:
109161
"""Deletes an existing Pub/Sub topic."""
110162
# [START pubsub_delete_topic]
@@ -483,6 +535,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
483535
create_topic_kinesis_ingestion_parser.add_argument("consumer_arn")
484536
create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
485537
create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")
538+
539+
update_topic_kinesis_ingestion_parser = subparsers.add_parser(
540+
"update_kinesis_ingestion", help=update_topic_kinesis_ingestion.__doc__
541+
)
542+
update_topic_kinesis_ingestion_parser.add_argument("topic_id")
543+
update_topic_kinesis_ingestion_parser.add_argument("stream_arn")
544+
update_topic_kinesis_ingestion_parser.add_argument("consumer_arn")
545+
update_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
546+
update_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")
486547

487548
delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__)
488549
delete_parser.add_argument("topic_id")
@@ -553,6 +614,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
553614
args.aws_role_arn,
554615
args.gcp_service_account,
555616
)
617+
elif args.command == "update_kinesis_ingestion":
618+
update_topic_kinesis_ingestion(
619+
args.project_id,
620+
args.topic_id,
621+
args.stream_arn,
622+
args.consumer_arn,
623+
args.aws_role_arn,
624+
args.gcp_service_account,
625+
)
556626
elif args.command == "delete":
557627
delete_topic(args.project_id, args.topic_id)
558628
elif args.command == "publish":

samples/snippets/publisher_test.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ def test_create(
123123
out, _ = capsys.readouterr()
124124
assert f"Created topic: {topic_path}" in out
125125

126+
# Clean up resource created for the test.
127+
publisher_client.delete_topic(request={"topic": topic_path})
128+
126129

127130
def test_create_kinesis_ingestion(
128131
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
@@ -155,6 +158,48 @@ def test_create_kinesis_ingestion(
155158
out, _ = capsys.readouterr()
156159
assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out
157160

161+
# Clean up resource created for the test.
162+
publisher_client.delete_topic(request={"topic": topic_path})
163+
164+
def test_update_kinesis_ingestion(
165+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
166+
) -> None:
167+
# The scope of `topic_path` is limited to this function.
168+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
169+
170+
# Outside of automated CI tests, these values must be of actual AWS resources for the test to pass.
171+
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
172+
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
173+
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
174+
gcp_service_account = (
175+
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
176+
)
177+
178+
try:
179+
publisher_client.delete_topic(request={"topic": topic_path})
180+
except NotFound:
181+
pass
182+
183+
publisher.create_topic(PROJECT_ID, TOPIC_ID)
184+
185+
out, _ = capsys.readouterr()
186+
assert f"Created topic: {topic_path}" in out
187+
188+
publisher.update_topic_kinesis_ingestion(
189+
PROJECT_ID,
190+
TOPIC_ID,
191+
stream_arn,
192+
consumer_arn,
193+
aws_role_arn,
194+
gcp_service_account,
195+
)
196+
197+
out, _ = capsys.readouterr()
198+
assert f"Updated topic: {topic_path} with AWS Kinesis Ingestion Settings" in out
199+
200+
# Clean up resource created for the test.
201+
publisher_client.delete_topic(request={"topic": topic_path})
202+
158203

159204
def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None:
160205
publisher.list_topics(PROJECT_ID)

0 commit comments

Comments
 (0)