@@ -55,6 +55,11 @@ public class AdminIT {
5555 private static final String kinesisIngestionTopicId = "kinesis-ingestion-topic-" + _suffix ;
5656 private static final String cloudStorageIngestionTopicId =
5757 "cloud-storage-ingestion-topic-" + _suffix ;
58+ private static final String awsMskIngestionTopicId = "aws-msk-ingestion-topic-" + _suffix ;
59+ private static final String confluentCloudIngestionTopicId =
60+ "confluent-cloud-ingestion-topic-" + _suffix ;
61+ private static final String azureEventHubsIngestionTopicId =
62+ "azure-event-hubs-ingestion-topic-" + _suffix ;
5863 private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix ;
5964 private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix ;
6065 private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix ;
@@ -66,6 +71,9 @@ public class AdminIT {
6671 "java_samples_data_set" + _suffix .replace ("-" , "_" );
6772 private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix ;
6873 private static final String bigqueryTableId = "java_samples_table_" + _suffix ;
74+ private static final String gcpServiceAccount =
75+ "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" ;
76+ // AWS Kinesis ingestion settings.
6977 private static final String streamArn =
7078 "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name" ;
7179 private static final String consumerArn =
@@ -75,20 +83,41 @@ public class AdminIT {
7583 "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/"
7684 + "consumer/consumer-2:2222222222" ;
7785 private static final String awsRoleArn = "arn:aws:iam::111111111111:role/fake-role-name" ;
78- private static final String gcpServiceAccount =
79- "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" ;
86+ // GCS ingestion settings.
8087 private static final String cloudStorageBucket = "pubsub-cloud-storage-bucket" ;
8188 private static final String cloudStorageInputFormat = "text" ;
8289 private static final String cloudStorageTextDelimiter = "," ;
8390 private static final String cloudStorageMatchGlob = "**.txt" ;
8491 private static final String cloudStorageMinimumObjectCreateTime = "1970-01-01T00:00:01Z" ;
8592 private static final String cloudStorageMinimumObjectCreateTimeSeconds = "seconds: 1" ;
93+ // AWS MSK ingestion settings.
94+ String clusterArn =
95+ "arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1" ;
96+ String mskTopic = "fake-msk-topic-name" ;
97+ // Confluent Cloud ingestion settings.
98+ String bootstrapServer = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092" ;
99+ String clusterId = "fake-cluster-id" ;
100+ String confluentTopic = "fake-confluent-topic-name" ;
101+ String identityPoolId = "fake-pool-id" ;
102+ // Azure Event Hubs ingestion settings.
103+ String resourceGroup = "fake-resource-group" ;
104+ String namespace = "fake-namespace" ;
105+ String eventHub = "fake-event-hub" ;
106+ String clientId = "11111111-1111-1111-1111-111111111111" ;
107+ String tenantId = "22222222-2222-2222-2222-222222222222" ;
108+ String subscriptionId = "33333333-3333-3333-3333-333333333333" ;
86109
87110 private static final TopicName topicName = TopicName .of (projectId , topicId );
88111 private static final TopicName kinesisIngestionTopicName =
89112 TopicName .of (projectId , kinesisIngestionTopicId );
90113 private static final TopicName cloudStorageIngestionTopicName =
91114 TopicName .of (projectId , cloudStorageIngestionTopicId );
115+ private static final TopicName awsMskIngestionTopicName =
116+ TopicName .of (projectId , awsMskIngestionTopicId );
117+ private static final TopicName confluentCloudIngestionTopicName =
118+ TopicName .of (projectId , confluentCloudIngestionTopicId );
119+ private static final TopicName azureEventHubsIngestionTopicName =
120+ TopicName .of (projectId , azureEventHubsIngestionTopicId );
92121 private static final SubscriptionName pullSubscriptionName =
93122 SubscriptionName .of (projectId , pullSubscriptionId );
94123 private static final SubscriptionName pushSubscriptionName =
@@ -361,5 +390,76 @@ public void testAdmin() throws Exception {
361390 // Test delete Cloud Storage ingestion topic.
362391 DeleteTopicExample .deleteTopicExample (projectId , cloudStorageIngestionTopicId );
363392 assertThat (bout .toString ()).contains ("Deleted topic." );
393+
394+ bout .reset ();
395+ // Test create topic with AWS MSK ingestion settings.
396+ CreateTopicWithAwsMskIngestionExample .createTopicWithAwsMskIngestionExample (
397+ projectId ,
398+ awsMskIngestionTopicId ,
399+ clusterArn ,
400+ mskTopic ,
401+ awsRoleArn ,
402+ gcpServiceAccount );
403+ assertThat (bout .toString ())
404+ .contains ("google.pubsub.v1.Topic.name=" + awsMskIngestionTopicName .toString ());
405+ assertThat (bout .toString ()).contains (clusterArn );
406+ assertThat (bout .toString ()).contains (mskTopic );
407+ assertThat (bout .toString ()).contains (awsRoleArn );
408+ assertThat (bout .toString ()).contains (gcpServiceAccount );
409+
410+ bout .reset ();
411+ // Test delete AWS MSK ingestion topic.
412+ DeleteTopicExample .deleteTopicExample (projectId , awsMskIngestionTopicId );
413+ assertThat (bout .toString ()).contains ("Deleted topic." );
414+
415+ bout .reset ();
416+ // Test create topic with Confluent Cloud ingestion settings.
417+ CreateTopicWithConfluentCloudIngestionExample .createTopicWithConfluentCloudIngestionExample (
418+ projectId ,
419+ confluentCloudIngestionTopicId ,
420+ bootstrapServer ,
421+ clusterId ,
422+ confluentTopic ,
423+ identityPoolId ,
424+ gcpServiceAccount );
425+ assertThat (bout .toString ())
426+ .contains ("google.pubsub.v1.Topic.name=" + confluentCloudIngestionTopicName .toString ());
427+ assertThat (bout .toString ()).contains (bootstrapServer );
428+ assertThat (bout .toString ()).contains (clusterId );
429+ assertThat (bout .toString ()).contains (confluentTopic );
430+ assertThat (bout .toString ()).contains (identityPoolId );
431+ assertThat (bout .toString ()).contains (gcpServiceAccount );
432+
433+ bout .reset ();
434+ // Test delete Confluent Cloud ingestion topic.
435+ DeleteTopicExample .deleteTopicExample (projectId , confluentCloudIngestionTopicId );
436+ assertThat (bout .toString ()).contains ("Deleted topic." );
437+
438+ bout .reset ();
439+ // Test create topic with Azure Event Hubs ingestion settings.
440+ CreateTopicWithAzureEventHubsIngestionExample .createTopicWithAzureEventHubsIngestionExample (
441+ projectId ,
442+ azureEventHubsIngestionTopicId ,
443+ resourceGroup ,
444+ namespace ,
445+ eventHub ,
446+ clientId ,
447+ tenantId ,
448+ subscriptionId ,
449+ gcpServiceAccount );
450+ assertThat (bout .toString ()).contains (
451+ "google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName .toString ());
452+ assertThat (bout .toString ()).contains (resourceGroup );
453+ assertThat (bout .toString ()).contains (namespace );
454+ assertThat (bout .toString ()).contains (eventHub );
455+ assertThat (bout .toString ()).contains (clientId );
456+ assertThat (bout .toString ()).contains (tenantId );
457+ assertThat (bout .toString ()).contains (subscriptionId );
458+ assertThat (bout .toString ()).contains (gcpServiceAccount );
459+
460+ bout .reset ();
461+ // Test delete Azure Event Hubs ingestion topic.
462+ DeleteTopicExample .deleteTopicExample (projectId , azureEventHubsIngestionTopicId );
463+ assertThat (bout .toString ()).contains ("Deleted topic." );
364464 }
365465}
0 commit comments