To access Kafka topics, you can use the Kafka command-line tools or write a script in a programming language like Python using the confluent-kafka or kafka-python library. Below are examples of both approaches:
1. Using Kafka Command-Line Tools
Kafka provides command-line tools to interact with topics, such as listing topics, consuming messages, and producing messages.
List Kafka Topics
kafka-topics.sh --list --bootstrap-server <broker_address>:<broker_port>
Consume Messages from a Topic
kafka-console-consumer.sh --bootstrap-server <broker_address>:<broker_port> --topic <topic_name> --from-beginning
Produce Messages to a Topic
kafka-console-producer.sh --broker-list <broker_address>:<broker_port> --topic <topic_name>
2. Using Python with confluent-kafka Library
The confluent-kafka library is a popular Python client for Kafka.
Install the Library
pip install confluent-kafka
Script to Consume Messages from a Kafka Topic
from confluent_kafka import Consumer, KafkaError
# Kafka configuration
conf = {
'bootstrap.servers': '<broker_address>:<broker_port>', # Kafka broker address
'group.id': 'my_consumer_group', # Consumer group ID
'auto.offset.reset': 'earliest' # Start reading from the earliest message
}
# Create a consumer instance
consumer = Consumer(conf)
# Subscribe to a topic
topic = '<topic_name>'
consumer.subscribe([topic])
# Poll for messages
try:
while True:
msg = consumer.poll(timeout=1.0) # Wait for a message
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition")
else:
print(f"Error: {msg.error()}")
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
print("Consumer interrupted")
finally:
consumer.close()
Script to Produce Messages to a Kafka Topic
from confluent_kafka import Producer
# Kafka configuration
conf = {
'bootstrap.servers': '<broker_address>:<broker_port>', # Kafka broker address
}
# Create a producer instance
producer = Producer(conf)
# Topic to produce messages to
topic = '<topic_name>'
# Produce messages
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
for i in range(10):
message = f"Message {i}"
producer.produce(topic, key=str(i), value=message, callback=delivery_report)
# Wait for all messages to be delivered
producer.flush()
3. Using Python with kafka-python Library
The kafka-python library is another option for interacting with Kafka.
Install the Library
pip install kafka-python
Script to Consume Messages from a Kafka Topic
from kafka import KafkaConsumer
# Create a Kafka consumer
consumer = KafkaConsumer(
'<topic_name>',
bootstrap_servers=['<broker_address>:<broker_port>'],
auto_offset_reset='earliest', # Start reading from the earliest message
group_id='my_consumer_group'
)
# Poll for messages
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
Script to Produce Messages to a Kafka Topic
from kafka import KafkaProducer
# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers=['<broker_address>:<broker_port>'])
# Topic to produce messages to
topic = '<topic_name>'
# Produce messages
for i in range(10):
message = f"Message {i}"
producer.send(topic, value=message.encode('utf-8'))
# Wait for all messages to be delivered
producer.flush()
Key Notes
- Replace
<broker_address>:<broker_port> with your Kafka broker's address and port (e.g., localhost:9092 ).
- Replace
<topic_name> with the name of the Kafka topic you want to interact with.
- Ensure Kafka is running and accessible before running the scripts.
|