This article is a book excerpt from Apache Kafka 1.0 Cookbook written by Raúl Estrada. This book will show how to use Kafka efficiently with practical solutions to the common problems that developers and administrators usually face while working with it.
In today’s tutorial, we will talk about the confluent platform and how to get started with organizing and managing data from several sources in one high-performance and reliable system.
The Confluent Platform is a full stream data system. It enables you to organize and manage data from several sources in one high-performance and reliable system. As mentioned in the first few chapters, the goal of an enterprise service bus is not only to provide the system a means to transport messages and data but also to provide all the tools that are required to connect the data origins (data sources), applications, and data destinations (data sinks) to the platform.
The Confluent Platform has these parts:
- Confluent Platform open source
- Confluent Platform enterprise
- Confluent Cloud
The Confluent Platform open source has the following components:
- Apache Kafka core
- Kafka Streams
- Kafka Connect
- Kafka clients
- Kafka REST Proxy
- Kafka Schema Registry
The Confluent Platform enterprise has the following components:
- Confluent Control Center
- Confluent support, professional services, and consulting
All the components are open source except the Confluent Control Center, which is a proprietary of Confluent Inc.
An explanation of each component is as follows:
- Kafka core: The Kafka brokers discussed at the moment in this book.
- Kafka Streams: The Kafka library used to build stream processing systems.
- Kafka Connect: The framework used to connect Kafka with databases, stores, and filesystems.
- Kafka clients: The libraries for writing/reading messages to/from Kafka. Note that there clients for these languages: Java, Scala, C/C++, Python, and Go.
- Kafka REST Proxy: If the application doesn’t run in the Kafka clients’ programming languages, this proxy allows connecting to Kafka through HTTP.
- Kafka Schema Registry: Recall that an enterprise service bus should have a message template repository. The Schema Registry is the repository of all the schemas and their historical versions, made to ensure that if an endpoint changes, then all the involved parts are acknowledged.
- Confluent Control Center: A powerful web graphic user interface for managing and monitoring Kafka systems.
- Confluent Cloud: Kafka as a service—a cloud service to reduce the burden of operations.
Installing the Confluent Platform
In order to use the REST proxy and the Schema Registry, we need to install the Confluent Platform. Also, the Confluent Platform has important administration, operation, and monitoring features fundamental for modern Kafka production systems.
Getting ready
At the time of writing this book, the Confluent Platform Version is 4.0.0. Currently, the supported operating systems are:
- Debian 8
- Red Hat Enterprise Linux
- CentOS 6.8 or 7.2
- Ubuntu 14.04 LTS and 16.04 LTS
macOS currently is just supported for testing and development purposes, not for production environments. Windows is not yet supported. Oracle Java 1.7 or higher is required.
The default ports for the components are:
- 2181: Apache ZooKeeper
- 8081: Schema Registry (REST API)
- 8082: Kafka REST Proxy
- 8083: Kafka Connect (REST API)
- 9021: Confluent Control Center
- 9092: Apache Kafka brokers
It is important to have these ports, or the ports where the components are going to run, Open
How to do it
There are two ways to install: downloading the compressed files or with apt-get command.
To install the compressed files:
- Download the Confluent open source v4.0 or Confluent Enterprise v4.0 TAR files from https://www.confluent.io/download/
- Uncompress the archive file (the recommended path for installation is under /opt)
- To start the Confluent Platform, run this command:
$ <confluent-path>/bin/confluent start
The output should be as follows:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
To install with the apt-get command (in Debian and Ubuntu):
- Install the Confluent public key used to sign the packages in the APT repository:
$ wget -qO - http://packages.confluent.io/deb/4.0/archive.key |sudo apt-key add -
- Add the repository to the sources list:
$ sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/4.0 stable main"
- Finally, run the apt-get update to install the Confluent Platform
- To install Confluent open source:
$ sudo apt-get update && sudo apt-get install confluent-platformoss- 2.11
- To install Confluent Enterprise:
$ sudo apt-get update && sudo apt-get install confluentplatform-2.11
The end of the package name specifies the Scala version. Currently, the supported versions are 2.11 (recommended) and 2.10.
There’s more
The Confluent Platform provides the system and component packages. The commands in this recipe are for installing all components of the platform. To install individual components, follow the instructions on this page: https://docs.confluent.io/current/installation/available_packages.html#avaiIable-packages.
Using Kafka operations
With the Confluent Platform installed, the administration, operation, and monitoring of Kafka become very simple. Let’s review how to operate Kafka with the Confluent Platform.
Getting ready
For this recipe, Confluent should be installed, up, and running.
How to do it
The commands in this section should be executed from the directory where the Confluent Platform is installed:
- To start ZooKeeper, Kafka, and the Schema Registry with one command, run:
$ confluent start schema-registry
The output of this command should be:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
To execute the commands outside the installation directory, add Confluent’s bin directory to PATH:
export PATH=<path_to_confluent>/bin:$PATH
- To manually start each service with its own command, run:
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schemaregistry.
properties
Note that the syntax of all the commands is exactly the same as always but without the .sh extension.
- To create a topic called test_topic, run the following command:
$ ./bin/kafka-topics --zookeeper localhost:2181 --create --topic test_topic --partitions 1 --replication-factor 1
- To send an Avro message to test_topic in the broker without writing a single line of code, use the following command:
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092
--topic test_topic --property
value.schema='{"name":"person","type":"record",
"fields":[{"name":"name","type":"string"},{"name":"age","type":"int
"}]}'
- Send some messages and press Enter after each line:
{"name": "Alice", "age": 27}
{"name": "Bob", "age": 30}
{"name": "Charles", "age":57}
- Enter with an empty line is interpreted as null. To shut down the process, press Ctrl + C.
- To consume the Avro messages from test_topic since the beginning, type:
$ ./bin/kafka-avro-console-consumer --topic test_topic --zookeeper
localhost:2181 --from-beginning
The messages created in the previous step will be written to the console in the format they were introduced.
- To shut down the consumer, press Ctrl + C.
- To test the Avro schema validation, try to produce data on the same topic using an incompatible schema, for example, with this producer:
$ ./bin/kafka-avro-console-producer --broker-list localhost:9092
--topic test_topic --property value.schema='{"type":"string"}'
- After you’ve hit Enter on the first message, the following exception is raised:
org.apache.kafka.common.errors.SerializationException: Error
registering Avro schema: "string"
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClient
Exception: Schema being registered is incompatible with the latest
schema; error code: 409
at
io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpR
equest(RestUtils.java:146)
- To shut down the services (Schema Registry, broker, and ZooKeeper) run: confluent stop
- To delete all the producer messages stored in the broker, run this: confluent destroy
There’s more
With the Confluent Platform, it is possible to manage all of the Kafka system through the
Kafka operations, which are classified as follows:
- Production deployment: Hardware configuration, file descriptors, and ZooKeeper configuration
- Post deployment: Admin operations, rolling restart, backup, and restoration
- Auto data balancing: Rebalancer execution and decommissioning brokers
- Monitoring: Metrics for each concept—broker, ZooKeeper, topics, producers, and consumers
- Metrics reporter: Message size, security, authentication, authorization, and verification
Monitoring with the Confluent Control Center
This recipe shows you how to use the metrics reporter of the Confluent Control Center.
Getting ready
The execution of the previous recipe is needed. Before starting the Control Center, configure the metrics reporter:
- Back up the server.properties file located at:
<confluent_path>/etc/kafka/server.properties
- In the server.properties file, uncomment the following lines:
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsRepo
rter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.metrics.reporter.topic.replicas=1
- Back up the Kafka Connect configuration located in:
<confluent_path>/etc/schema-registry/connect-avrodistributed.properties
- Add the following lines at the end of the connect-avrodistributed.properties file:
consumer.interceptor.classes=io.confluent.monitoring.clients.interc
eptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interc
eptor.MonitoringProducerInterceptor
- Start the Confluent Platform:
$ <confluent_path>/bin/confluent start
Before starting the Control Center, change its configuration:
- Back up the control-center.properties file located in:
<confluent_path>/etc/confluent-control-center/controlcenter.properties
- Add the following lines at the end of the control-center.properties file:
confluent.controlcenter.internal.topics.partitions=1
confluent.controlcenter.internal.topics.replication=1
confluent.controlcenter.command.topic.replication=1
confluent.monitoring.interceptor.topic.partitions=1
confluent.monitoring.interceptor.topic.replication=1
confluent.metrics.topic.partitions=1
confluent.metrics.topic.replication=1
- Start the Control Center:
<confluent_path>/bin/control-center-start
How to do it
- Open the Control Center web graphic user interface at the following URL: http://localhost:9021/.
- The test_topic created in the previous recipe is needed:
$ <confluent_path>/bin/kafka-topics --zookeeper localhost:2181 -- create --test_topic --partitions 1 --replication-factor 1
- From the Control Center, click on the Kafka Connect button on the left. Click on the New source button:
4. From the connector class, drop down the menu and select SchemaSourceConnector. Specify Connection Name as Schema-Avro-Source.
5. In the topic name, specify test_topic.
6. Click on Continue, and then click on the Save & Finish button to apply the configuration.
To create a new sink follow these steps:
- From Kafka Connect, click on the SINKS button and then on the New sink button:
- From the topics list, choose test_topic and click on the Continue button
- In the SINKS tab, set the connection class to SchemaSourceConnector; specify Connection Name as Schema-Avro-Source
- Click on the Continue button and then on Save & Finish to apply the new configuration
How it works
Click on the Data streams tab and a chart shows the total number of messages produced and consumed on the cluster:
To summarize, we discussed how to get started with the Apache Kafka confluent platform.
If you liked our post, please be sure to check out Apache Kafka 1.0 Cookbook which consists of useful recipes to work with your Apache Kafka installation.