4 min read

Google Cloud’s Dataproc lets you run native Apache Spark and Hadoop clusters on Google Cloud in a simpler, more cost-effective way. In this blog, we will talk about our newest optional components available in Dataproc’s Component Exchange: Docker and Apache Flink.

Docker container on Dataproc

Docker is a widely used container technology. Since it’s now a Dataproc optional component, Docker daemons can now be installed on every node of the Dataproc cluster. This will give you the ability to install containerized applications and interact with Hadoop clusters easily on the cluster. 

In addition, Docker is also critical to supporting these features:

  1. Running containers with YARN

  2. Portable Apache Beam job

Running containers on YARN allows you to manage dependencies of your YARN application separately, and also allows you to create containerized services on YARN. Get more details here. Portable Apache Beam packages jobs into Docker containers and submits them the Flink cluster. Find more detail about Beam portability

Docker optional component is also configured to use Google Container Registry, in addition to the default Docker registry. This lets you use container images managed by your organization.

Here is how to create a Dataproc cluster with the Docker optional component:

gcloud beta dataproc clusters create <cluster-name>

When you run the Docker application, the log will be streamed to Cloud Logging, using gcplogs driver.

If your application does not depend on any Hadoop services, check out Kubernetes and Google Kubernetes Engine to run containers natively. For more on using Dataproc, check out our documentation.

Apache Flink on Dataproc

Among streaming analytics technologies, Apache Beam and Apache Flink stand out. Apache Flink is a distributed processing engine using stateful computation. Apache Beam is a unified model for defining batch and steaming processing pipelines. Using Apache Flink as an execution engine, you can also run Apache Beam jobs on Dataproc, in addition to Google’s Cloud Dataflow service.

Flink and running Beam on Flink are suitable for large-scale, continuous jobs, and provide:

  • A streaming-first runtime that supports both batch processing and data streaming programs

  • A runtime that supports very high throughput and low event latency at the same time

  • Fault-tolerance with exactly-once processing guarantees

  • Natural back-pressure in streaming programs

  • Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms

  • Integration with YARN and other components of the Apache Hadoop ecosystem

Our Dataproc team here at Google Cloud recently announced that Flink Operator on Kubernetes is now available. It allows you to run Apache Flink jobs in Kubernetes, bringing the benefits of reducing platform dependency and producing better hardware efficiency. 

Basic Flink Concepts

A Flink cluster consists of a Flink JobManager and a set of Flink TaskManagers. Like similar roles in other distributed systems such as YARN, JobManager has responsibilities such as accepting jobs, managing resources and supervising jobs. TaskManagers are responsible for running the actual tasks. 

When running Flink on Dataproc, we use YARN as resource manager for Flink. You can run Flink jobs in 2 ways: job cluster and session cluster. For the job cluster, YARN will create JobManager and TaskManagers for the job and will destroy the cluster once the job is finished. For session clusters, YARN will create JobManager and a few TaskManagers.The cluster can serve multiple jobs until being shut down by the user.

How to create a cluster with Flink

Use this command to get started:

gcloud beta dataproc clusters create <cluster-name>

How to run a Flink job

After a Dataproc cluster with Flink starts, you can submit your Flink jobs to YARN directly using the Flink job cluster. After accepting the job, Flink will start a JobManager and slots for this job in YARN. The Flink job will be run in the YARN cluster until finished. The JobManager created will then be shut down. Job logs will be available in regular YARN logs. Try this command to run a word-counting example:

The Dataproc cluster will not start a Flink Session cluster by default. Instead, Dataproc will create the script “/usr/bin/flink-yarn-daemon,” which will start a Flink session. 

If you want to start a Flink session when Dataproc is created, use the metadata key to allow it:

If you want to start the Flink session after Dataproc is created, you can run the following command on master node:

Submit jobs to that session cluster. You’ll need to get the Flink JobManager URL:

How to run a Java Beam job

It is very easy to run an Apache Beam job written in Java. There is no extra configuration needed. As long as you package your Beam jobs into a JAR file, you do not need to configure anything to run Beam on Flink. This is the command you can use:

How to run a Python Beam job written in Python

Beam jobs written in Python use a different execution model. To run them in Flink on Dataproc, you will also need to enable the Docker optional component. Here’s how to create a cluster:

You will also need to install necessary Python libraries needed by Beam, such as apache_beam and apache_beam[gcp]. You can pass in a Flink master URL to let it run in a session cluster. If you leave the URL out, you need to use the job cluster mode to run this job:

After you’ve written your Python job, simply run it to submit: