Samples showing how to use Pub/Sub Lite with Cloud Dataflow.
This sample shows how to create an Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, group the messages using a fixed-sized windowing function, and writes them to Cloud Storage.
Resources needed for this example:
- A pair of Pub/Sub Lite topic and subscription.
- A Cloud Storage bucket.
- Enable the APIs: Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.
When you enable Cloud Dataflow, which uses Compute Engine, a default Compute Engine service account with the Editor role (
roles/editor) is created.
-
You can skip this step if you are trying this example in a Google Cloud environment like Cloud Shell.
Otherwise, create a user-managed service account and grant it the following roles on your project:
roles/dataflow.adminroles/pubsublite.viewerroles/pubsublite.subscriberroles/logging.viewer
Then create a service account key and point
GOOGLE_APPLICATION_CREDNETIALSto your downloaded key file.
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file- Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-gcs-bucket
gsutil mb gs://$BUCKET- Create a Pub/Sub Lite topic and subscription. Set
LITE_LOCATIONto a Pub/Sub Lite location.
export TOPIC=your-lite-topic
export SUBSCRIPTION=your-lite-subscription
export LITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \
--zone=$LITE_LOCATION \
--partitions=1 \
--per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
--zone=$LITE_LOCATION \
--topic=$TOPIC- Set
DATAFLOW_REGIONto a Dataflow region close to your Pub/Sub Lite location.
export DATAFLOW_REGION=your-dateflow-regionThe following example runs a streaming pipeline. Choose DirectRunner to test it locally or DataflowRunner to run it on Dataflow.
--subscription: the Pub/Sub Lite subscription to read messages from--output: the full filepath of the output files--windowSize [optional]: the window size in minutes, defaults to 1--runner [optional]:DataflowRunnerorDirectRunner--project [optional]: your project ID, optional if usingDirectRunner--region [optional]: the Dataflow region, optional if usingDirectRunner--tempLocation: a Cloud Storage location for temporary files, optional if usingDirectRunner
Gradle:
gradle execute -Dexec.args="\
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--runner=DataflowRunner \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp"Maven:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args="\
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--runner=DataflowRunner \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp"Publish some messages to your Lite topic. Then check for files in your Cloud Storage bucket.
gsutil ls "gs://$BUCKET/samples/output*"With a metadata.md, you can create a Dataflow Flex template. Custom Dataflow Flex templates can be shared. You can run them with different input parameters.
- Create a fat JAR. You should see
target/pubsublite-streaming-bundled-1.0.jaras an output.
mvn clean package -DskipTests=true
ls -lh- Provide names and locations for your template file and template container image.
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"- Build a custom Flex template.
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image-gcr-path "$TEMPLATE_IMAGE" \
--sdk-language "JAVA" \
--flex-template-base-image JAVA11 \
--metadata-file "metadata.json" \
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"- Run a job with the custom Flex template using
gcloudor in Cloud Console.
Note: Pub/Sub Lite allows only one subscriber to pull messages from one partition. If your Pub/Sub Lite topic has only one partition and you use a subscription attached to that topic in more than one Dataflow jobs, only one of them will get messages.
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region "$DATAFLOW_REGION" -
Stop the pipeline. If you use
DirectRunner,Ctrl+Cto cancel. If you useDataflowRunner, click on the job you want to stop, then choose "Cancel". -
Delete the Lite topic and subscription.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscription delete $SUBSCRIPTION- Delete the Cloud Storage objects:
gsutil -m rm -rf "gs://$BUCKET/samples/output*"- Delete the template image in Cloud Registry and delete the Flex template if you have created them.
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH- Delete the Cloud Storage bucket:
gsutil rb "gs://$BUCKET"