@@ -105,6 +105,58 @@ def create_topic_kinesis_ingestion(
105105 # [END pubsub_create_topic_kinesis_ingestion]
106106
107107
108+ def update_topic_kinesis_ingestion (
109+ project_id : str ,
110+ topic_id : str ,
111+ stream_arn : str ,
112+ consumer_arn : str ,
113+ aws_role_arn : str ,
114+ gcp_service_account : str ,
115+ ) -> None :
116+ """Update Pub/Sub topic with AWS Kinesis Ingestion Settings."""
117+ # [START pubsub_quickstart_update_topic_kinesis_ingestion]
118+ # [START pubsub_update_topic_kinesis_ingestion]
119+ from google .cloud import pubsub_v1
120+ from google .pubsub_v1 .types import Topic
121+ from google .pubsub_v1 .types import IngestionDataSourceSettings
122+ from google .pubsub_v1 .types import UpdateTopicRequest
123+ from google .protobuf import field_mask_pb2
124+
125+ # TODO(developer)
126+ # project_id = "your-project-id"
127+ # topic_id = "your-topic-id"
128+ # stream_arn = "your-stream-arn"
129+ # consumer_arn = "your-consumer-arn"
130+ # aws_role_arn = "your-aws-role-arn"
131+ # gcp_service_account = "your-gcp-service-account"
132+
133+ publisher = pubsub_v1 .PublisherClient ()
134+ topic_path = publisher .topic_path (project_id , topic_id )
135+
136+ update_request = UpdateTopicRequest (
137+ topic = Topic (
138+ name = topic_path ,
139+ ingestion_data_source_settings = IngestionDataSourceSettings (
140+ aws_kinesis = IngestionDataSourceSettings .AwsKinesis (
141+ stream_arn = stream_arn ,
142+ consumer_arn = consumer_arn ,
143+ aws_role_arn = aws_role_arn ,
144+ gcp_service_account = gcp_service_account ,
145+ )
146+ ),
147+ ),
148+ update_mask = field_mask_pb2 .FieldMask (
149+ paths = ["ingestion_data_source_settings" ]),
150+ )
151+
152+ topic = publisher .update_topic (request = update_request )
153+ print (f"Updated topic: { topic .name } with AWS Kinesis Ingestion Settings" )
154+
155+
156+ # [END pubsub_quickstart_update_topic_kinesis_ingestion]
157+ # [END pubsub_update_topic_kinesis_ingestion]
158+
159+
108160def delete_topic (project_id : str , topic_id : str ) -> None :
109161 """Deletes an existing Pub/Sub topic."""
110162 # [START pubsub_delete_topic]
@@ -483,6 +535,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
483535 create_topic_kinesis_ingestion_parser .add_argument ("consumer_arn" )
484536 create_topic_kinesis_ingestion_parser .add_argument ("aws_role_arn" )
485537 create_topic_kinesis_ingestion_parser .add_argument ("gcp_service_account" )
538+
539+ update_topic_kinesis_ingestion_parser = subparsers .add_parser (
540+ "update_kinesis_ingestion" , help = update_topic_kinesis_ingestion .__doc__
541+ )
542+ update_topic_kinesis_ingestion_parser .add_argument ("topic_id" )
543+ update_topic_kinesis_ingestion_parser .add_argument ("stream_arn" )
544+ update_topic_kinesis_ingestion_parser .add_argument ("consumer_arn" )
545+ update_topic_kinesis_ingestion_parser .add_argument ("aws_role_arn" )
546+ update_topic_kinesis_ingestion_parser .add_argument ("gcp_service_account" )
486547
487548 delete_parser = subparsers .add_parser ("delete" , help = delete_topic .__doc__ )
488549 delete_parser .add_argument ("topic_id" )
@@ -553,6 +614,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
553614 args .aws_role_arn ,
554615 args .gcp_service_account ,
555616 )
617+ elif args .command == "update_kinesis_ingestion" :
618+ update_topic_kinesis_ingestion (
619+ args .project_id ,
620+ args .topic_id ,
621+ args .stream_arn ,
622+ args .consumer_arn ,
623+ args .aws_role_arn ,
624+ args .gcp_service_account ,
625+ )
556626 elif args .command == "delete" :
557627 delete_topic (args .project_id , args .topic_id )
558628 elif args .command == "publish" :
0 commit comments