Pika is a pure Python implementation of the AMQP 0-9-1 protocol that allows Python programs to communicate with AMQP brokers like RabbitMQ. It provides an API for creating client applications that can publish messages to, and consume messages from, RabbitMQ queues.
Key Concepts:
- AMQP (Advanced Message Queuing Protocol): An open standard application layer protocol for message-oriented middleware. It defines how message producers and consumers communicate with a message broker.
- RabbitMQ: A popular open-source message broker that implements AMQP. Pika is designed to interact with RabbitMQ.
- Connection: Represents a network connection to the RabbitMQ broker.
- Channel: A virtual connection inside a connection. All AMQP commands are issued over a channel. You can have multiple channels over a single connection, which is more efficient than opening multiple connections.
- Exchange: Message producers publish messages to an exchange. Exchanges then route these messages to queues based on rules called 'bindings' and 'routing keys'. There are several types of exchanges (direct, fanout, topic, headers).
- Queue: A named buffer that stores messages until they are consumed. Consumers retrieve messages from queues.
- Producer (Publisher): An application that sends messages to an exchange.
- Consumer: An application that receives messages from a queue.
- Acknowledgment (Ack): A mechanism by which a consumer tells the broker that it has successfully processed a message. This prevents message loss in case of consumer failure.
How Pika Works:
1. Establish a Connection: A Pika client first connects to a RabbitMQ server, typically using `pika.BlockingConnection` for synchronous applications or `pika.AsyncioConnection`/`pika.TornadoConnection` for asynchronous ones.
2. Create a Channel: Once connected, the client opens a channel over the connection.
3. Declare Exchanges and Queues: Producers and consumers declare the exchanges and queues they intend to use. This ensures they exist on the broker.
4. Bind Queues to Exchanges: For messages to reach a queue, the queue must be bound to an exchange with a specific routing key (depending on the exchange type).
5. Publish Messages (Producer): Producers publish messages to an exchange, specifying a routing key.
6. Consume Messages (Consumer): Consumers subscribe to a queue and receive messages. When a message is processed, the consumer sends an acknowledgment back to the broker.
Pika offers different adapters for various concurrency models: `BlockingConnection` for simple scripts and synchronous applications, `AsyncioConnection` for integrating with Python's `asyncio` event loop, and `TornadoConnection` for use with the Tornado web framework.
Example Code
Publisher (publisher.py)
import pika
import time
Establish a connection to RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Declare a queue (idempotent - creates if it doesn't exist)
queue_name = 'hello_queue'
channel.queue_declare(queue=queue_name, durable=True)
print(f" [-] Publisher is ready. Sending messages to '{queue_name}'...")
Publish 5 messages
for i in range(5):
message = f"Hello World! Message {i}"
Publish to the default exchange, routing key is the queue name
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message.encode(), Messages must be bytes
properties=pika.BasicProperties(delivery_mode=2) Make message persistent
)
print(f" [x] Sent '{message}'")
time.sleep(1) Simulate some work
Close the connection
connection.close()
Consumer (consumer.py)
import pika
import time
Establish a connection to RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Declare the same queue as the publisher
queue_name = 'hello_queue'
channel.queue_declare(queue=queue_name, durable=True)
Ensure only one message is delivered to a consumer at a time
channel.basic_qos(prefetch_count=1)
Define a callback function to process messages
def callback(ch, method, properties, body):
print(f" [x] Received '{body.decode()}'")
Simulate work by sleeping for a duration related to message content
time.sleep(body.count(b'l'))
print(" [x] Done")
Acknowledge the message, telling RabbitMQ it can be removed from the queue
ch.basic_ack(delivery_tag=method.delivery_tag)
Start consuming messages from the queue
on_message_callback: function to call when a message is received
auto_ack=False: we will manually acknowledge messages
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False
)
print(f" [-] Waiting for messages on '{queue_name}'. To exit press CTRL+C")
Start blocking and waiting for messages to arrive
channel.start_consuming()








Pika