Kafka and Python

To access Kafka topics, you can use the Kafka command-line tools or write a script in a programming language like Python using the confluent-kafka or kafka-python library. Below are examples of both approaches:


1. Using Kafka Command-Line Tools

Kafka provides command-line tools to interact with topics, such as listing topics, consuming messages, and producing messages.

List Kafka Topics

kafka-topics.sh --list --bootstrap-server <broker_address>:<broker_port>

Consume Messages from a Topic

kafka-console-consumer.sh --bootstrap-server <broker_address>:<broker_port> --topic <topic_name> --from-beginning

Produce Messages to a Topic

kafka-console-producer.sh --broker-list <broker_address>:<broker_port> --topic <topic_name>

2. Using Python with confluent-kafka Library

The confluent-kafka library is a popular Python client for Kafka.

Install the Library

pip install confluent-kafka

Script to Consume Messages from a Kafka Topic

from confluent_kafka import Consumer, KafkaError

# Kafka configuration
conf = {
    'bootstrap.servers': '<broker_address>:<broker_port>',  # Kafka broker address
    'group.id': 'my_consumer_group',                       # Consumer group ID
    'auto.offset.reset': 'earliest'                        # Start reading from the earliest message
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
topic = '<topic_name>'
consumer.subscribe([topic])

# Poll for messages
try:
    while True:
        msg = consumer.poll(timeout=1.0)  # Wait for a message
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("Reached end of partition")
            else:
                print(f"Error: {msg.error()}")
        else:
            print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    print("Consumer interrupted")
finally:
    consumer.close()

Script to Produce Messages to a Kafka Topic

from confluent_kafka import Producer

# Kafka configuration
conf = {
    'bootstrap.servers': '<broker_address>:<broker_port>',  # Kafka broker address
}

# Create a producer instance
producer = Producer(conf)

# Topic to produce messages to
topic = '<topic_name>'

# Produce messages
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

for i in range(10):
    message = f"Message {i}"
    producer.produce(topic, key=str(i), value=message, callback=delivery_report)

# Wait for all messages to be delivered
producer.flush()

3. Using Python with kafka-python Library

The kafka-python library is another option for interacting with Kafka.

Install the Library

pip install kafka-python

Script to Consume Messages from a Kafka Topic

from kafka import KafkaConsumer

# Create a Kafka consumer
consumer = KafkaConsumer(
    '<topic_name>',
    bootstrap_servers=['<broker_address>:<broker_port>'],
    auto_offset_reset='earliest',  # Start reading from the earliest message
    group_id='my_consumer_group'
)

# Poll for messages
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")

Script to Produce Messages to a Kafka Topic

from kafka import KafkaProducer

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers=['<broker_address>:<broker_port>'])

# Topic to produce messages to
topic = '<topic_name>'

# Produce messages
for i in range(10):
    message = f"Message {i}"
    producer.send(topic, value=message.encode('utf-8'))

# Wait for all messages to be delivered
producer.flush()

Key Notes

  • Replace <broker_address>:<broker_port> with your Kafka broker's address and port (e.g., localhost:9092).
  • Replace <topic_name> with the name of the Kafka topic you want to interact with.
  • Ensure Kafka is running and accessible before running the scripts.