Kafka Python
How to use Python Client for Kafka?
We can use a Python client for Kafka by using the following steps:
- Set up Apache Kafka Environment.
- Install the confluent kafka python library.
- We import the producer and consumer using the Python library installed.
- We create a Kafka producer and start sending messages.
- We create Kafka consumers. Here we can start receiving messages sent by the Kafka producer.
How to Install the Kafka Python Library?
Before installing Kafka Python, we also need to set up the Apache Kafka Environment. To do this we need to follow the following steps.
- Install Java 8 on your system You can download Java 8 here
- Download and Install Apache Kafka Download and Install Apache Kafka here. After downloading, add the Kafka bin path in the environment variable.
- Start Zookeeper Open the command prompt(cmd) and change the directory in cmd to the Kafka folder. You can start Zookeeper by entering the following command in the command prompt in the Kafka folder:
- Start Kafka broker You can start Kafka broker by entering the following command in the command prompt in kafka folder:
- Create Kafka topic You can make a Kafka topic by entering the following command in the command prompt in kafka folder:
Now we need to ensure that we have Python installed on our machine. We can check the following by entering the following command in the command prompt(cmd)
If Python is installed on your machine, we can move further to download the required Kafka Python library otherwise you need to download Python.
There are various libraries in Python for writing producer and consumer codes. Some of them are confluent-kafka, pykafka, kafka-python, kafkaesque etc.. We can install the library by entering the following pip command in the command prompt:
We can check the installation of the above library by entering the same command again. If it is successfully installed, we will get a message starting with Requirement already satisfied:
Also by entering the command pip freeze, we can get a list of all libraries installed in the machine and can check whether confluent-kafka is installed or not.
Python Client Installation
After installing the confluent-kafka library, we can import the Producer and Consumer classes to interact with Kafka.
The following is the code for importing Producer and Consumer:
KafkaProducer
Here is the simplest code for a confluent Kafka python producer which writes the message to the Kafka topic.
Let us understand each line in detail. First, we import the Producer method from the library.
Here we create the Producer object with the name p. The bootstrap.server specifies the address and port which is to be used for configuration.
Here we produce 5 messages with different keys and values. The value signifies the message that we are sending. Here 'mytopic' is the topic used for sending the message. Finally, the last line flushes the buffer to ensure that the messages are sent before the broker exits.
KafkaConsumer
The Kafka consumer is used for reading the messages from the Kafka topic.
Let us see the Python code for writing Kafka Consumer
Let us understand the above code.
First, we are importing the library to start using Consumer.
Next, we are making an instance of a Consumer object with the name c. The "auto.offset.reset" set to earliest helps the consumer to consume the messages from the beginning. The bootstrap servers help the Consumer object to find the address of the PORT.
Next, we subscribe to the topic 'mytopic' to start receiving messages by using the method c.subscribe().
Now let us understand the while loop in the code. The poll method in confluent kafka python is used to read/fetch the messages. The output of the c.poll(1.0) is None if there are no messages else it returns the message. Finally, we check if there is any error in fetching messages. If the message has no errors, we print the received message, its key, partition, and topic.
Python Client Code Examples
Here are the Python client code examples for the producer and consumer.
Example 1:
Producer:
In a folder, create a file producer.py and write the following code. Make sure the Kafka is running.
In the above code we are sending 10 messages to the topic 'het-topic'. Run the file Producer.py using the command
Consumer: Now create a Consumer.py file in the same folder. Copy the below code in the file.
Run the code. You will be able to see the output as shown below.
We will be able to see all the messages sent in order. Also, the Consumer does not terminate but keeps on listening.
Open another terminal or cmd and run the Producer.py file again. The Consumer will also consume those messages and the output will be:
Example 2: In this example, let us write a producer code with a delivery report function. Copy the following code in the Producer Python file.
Run the Producer Python file. You will be able to see the following as seen in the image.
As seen, it gives feedback about each message sent. Run the consumer Python file and see the output.
Compression
Compression is an important feature as it helps us to reduce data transfer. Due to compression, the data stored in the Kafka cluster also reduces, thus reducing storage costs.
There are various compression algorithms available in the confluent-kafka library. Some examples are ZStandard, LZ4, Gzip, Snappy etc. The default compression library used by Kafka is Snappy.
- ZStandard:
Zstd or ZStandard is a new compression algorithm that outperforms Gzip and Snappy in compressing data. It also uses a lesser CPU. - LZ4:
This algorithm provides a good amount of compression ratios. - Gzip:
This provides better compression but requires a higher amount of CPU usage. - Snappy:
This compression has a lower compression ratio than Gzip but uses less amount of CPU than Gzip
To use the compression, we have to specify the compression while creating the topic. We can create the topic with the required compression by following the following command:
Focus on the last two words --config compression.type=gzip. It sets the type of compression as gzip while creating the topic.
We can use snappy compression by using --config compression.type=snappy and LZ4 compression by using --config compression.type=lz4.
Protocol
In Kafka, the protocol helps us to set rules on how should Kafka systems communicate with each other. The confluent kafka library takes care of network protocols like TCP, SSL/TLS, and serialization and message deserialization.
We can use protocols in confluent kafka Python by handling errors, specifying the protocol version, Enabling compression etc.
- Handling errors in confluent kafka python library
As seen in the above code, whenever there is any error in producing messages, the error_callback function will be called and the corresponding error will be printed. 2. Specifying the protocol version We can use api.version.request to specify the protocol version to be used while communicating in Kafka. 3. Compression We have already this topic above.
Conclusion
We conclude the following from the document:
- The confluent Kafka python library can be used to write Kafka Producer and Consumer.
- It is a flexible library. It allows the user to determine the compression algorithm to be used. The library guarantees the delivery of messages and allows users to set the number of partitions.
- The callback function helps the user in error handling. This becomes very helpful in handling bugs.
- It is easy to write and understand simple Producer and Consumer code in Python.
- We can make a scalable, fault-tolerant Kafka producer and consumer by using the library.