@@ -103,6 +103,88 @@ def create_topic_with_kinesis_ingestion(
103103 # [END pubsub_create_topic_with_kinesis_ingestion]
104104
105105
106+ def create_topic_with_cloud_storage_ingestion (
107+ project_id : str ,
108+ topic_id : str ,
109+ bucket : str ,
110+ input_format : str ,
111+ text_delimiter : str ,
112+ match_glob : str ,
113+ minimum_object_create_time : str ,
114+ ) -> None :
115+ """Create a new Pub/Sub topic with Cloud Storage Ingestion Settings."""
116+ # [START pubsub_create_topic_with_cloud_storage_ingestion]
117+ from google .cloud import pubsub_v1
118+ from google .protobuf import timestamp_pb2
119+ from google .pubsub_v1 .types import Topic
120+ from google .pubsub_v1 .types import IngestionDataSourceSettings
121+
122+ # TODO(developer)
123+ # project_id = "your-project-id"
124+ # topic_id = "your-topic-id"
125+ # bucket = "your-bucket"
126+ # input_format = "text" (can be one of "text", "avro", "pubsub_avro")
127+ # text_delimiter = "\n"
128+ # match_glob = "**.txt"
129+ # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
130+
131+ publisher = pubsub_v1 .PublisherClient ()
132+ topic_path = publisher .topic_path (project_id , topic_id )
133+
134+ cloud_storage_settings = IngestionDataSourceSettings .CloudStorage (
135+ bucket = bucket ,
136+ )
137+ if input_format == "text" :
138+ cloud_storage_settings .text_format = (
139+ IngestionDataSourceSettings .CloudStorage .TextFormat (
140+ delimiter = text_delimiter
141+ )
142+ )
143+ elif input_format == "avro" :
144+ cloud_storage_settings .avro_format = (
145+ IngestionDataSourceSettings .CloudStorage .AvroFormat ()
146+ )
147+ elif input_format == "pubsub_avro" :
148+ cloud_storage_settings .pubsub_avro_format = (
149+ IngestionDataSourceSettings .CloudStorage .PubSubAvroFormat ()
150+ )
151+ else :
152+ print (
153+ "Invalid input_format: "
154+ + input_format
155+ + "; must be in ('text', 'avro', 'pubsub_avro')"
156+ )
157+ return
158+
159+ if match_glob :
160+ cloud_storage_settings .match_glob = match_glob
161+
162+ if minimum_object_create_time :
163+ try :
164+ minimum_object_create_time_timestamp = timestamp_pb2 .Timestamp ()
165+ minimum_object_create_time_timestamp .FromJsonString (
166+ minimum_object_create_time
167+ )
168+ cloud_storage_settings .minimum_object_create_time = (
169+ minimum_object_create_time_timestamp
170+ )
171+ except ValueError :
172+ print ("Invalid minimum_object_create_time: " + minimum_object_create_time )
173+ return
174+
175+ request = Topic (
176+ name = topic_path ,
177+ ingestion_data_source_settings = IngestionDataSourceSettings (
178+ cloud_storage = cloud_storage_settings ,
179+ ),
180+ )
181+
182+ topic = publisher .create_topic (request = request )
183+
184+ print (f"Created topic: { topic .name } with Cloud Storage Ingestion Settings" )
185+ # [END pubsub_create_topic_with_cloud_storage_ingestion]
186+
187+
106188def update_topic_type (
107189 project_id : str ,
108190 topic_id : str ,
@@ -615,6 +697,19 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
615697 create_topic_with_kinesis_ingestion_parser .add_argument ("aws_role_arn" )
616698 create_topic_with_kinesis_ingestion_parser .add_argument ("gcp_service_account" )
617699
700+ create_topic_with_cloud_storage_ingestion_parser = subparsers .add_parser (
701+ "create_cloud_storage_ingestion" ,
702+ help = create_topic_with_cloud_storage_ingestion .__doc__ ,
703+ )
704+ create_topic_with_cloud_storage_ingestion_parser .add_argument ("topic_id" )
705+ create_topic_with_cloud_storage_ingestion_parser .add_argument ("bucket" )
706+ create_topic_with_cloud_storage_ingestion_parser .add_argument ("input_format" )
707+ create_topic_with_cloud_storage_ingestion_parser .add_argument ("text_delimiter" )
708+ create_topic_with_cloud_storage_ingestion_parser .add_argument ("match_glob" )
709+ create_topic_with_cloud_storage_ingestion_parser .add_argument (
710+ "minimum_object_create_time"
711+ )
712+
618713 update_topic_type_parser = subparsers .add_parser (
619714 "update_kinesis_ingestion" , help = update_topic_type .__doc__
620715 )
@@ -693,6 +788,16 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
693788 args .aws_role_arn ,
694789 args .gcp_service_account ,
695790 )
791+ elif args .command == "create_cloud_storage_ingestion" :
792+ create_topic_with_cloud_storage_ingestion (
793+ args .project_id ,
794+ args .topic_id ,
795+ args .bucket ,
796+ args .input_format ,
797+ args .text_delimiter ,
798+ args .match_glob ,
799+ args .minimum_object_create_time ,
800+ )
696801 elif args .command == "update_kinesis_ingestion" :
697802 update_topic_type (
698803 args .project_id ,
0 commit comments