Pika provides multiple adapters to connect to RabbitMQ allowing for different ways of providing socket communication depending on what is appropriate for your application.
Due to the need to check for and send content on a consistent basis, Pika now implements or extends IOLoops in each of its asynchronous connection adapters. These IOLoops are blocking methods which loop and listen for events. Each asynchronous adapters follows the same standard for invoking the IOLoop. The IOLoop is created when the connection adapter is created. To start it simply call the connection.ioloop.start() method.
If you are using an external IOLoop such as Tornado’s IOLoop, you may invoke that as you normally would and then add the adapter to it.
Example:
from pika.adapters import SelectConnection
# Create our connection object
connection = SelectConnection()
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Interfacing with Pika asynchronously is done by passing in callback methods you would like to have invoked when a certain event has completed. For example, if you are going to declare a queue, you pass in a method that will be called when the RabbitMQ server returns a Queue.DeclareOk response.
In our example below we use the following four easy steps:
Note
Step #1 is on line #28 and Step #2 is on line #6. This is so that Python knows about the functions we’ll call in Steps #2 through #5.
Example:
from pika.adapters import SelectConnection
# Create a global channel variable to hold our channel object in
channel = None
# Step #2
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
# Open a channel
connection.channel(on_channel_open)
# Step #3
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)
# Step #4
def on_queue_declared(frame):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
channel.basic_consume(handle_delivery, queue='test')
# Step #5
def handle_delivery(channel, method, header, body):
"""Called when we receive a message from RabbitMQ"""
print body
# Step #1: Connect to RabbitMQ
connection = SelectConnection(parameters, on_connected)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
The credentials module provides the mechanism by which you pass the username and password to the connection.ConnectionParameters() class when it is created.
The PlainCredentials class returns the properly formatted username and password to the Connection. As of this version of Pika, only PlainCredentials are supported. To authenticate with Pika, simply create a credentials object passing in the username and password and pass that to the ConnectionParameters object.
If you do not pass in credentials to the ConnectionParameters object, it will create credentials for ‘guest’ with the password of ‘guest’.
If you pass True to erase_on_connect the credentials will not be stored in memory after the Connection attempt has been made. This means that you will not be able to use a Reconnection Strategy successfully if this is enabled.
Parameters:
Called by Connection when it no longer needs the credentials
Validate that our type of authentication is supported
Example:
import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials)
To connect to RabbitMQ via an adapter, first you must construct a ConnectionParameters object. This has all of the options which are involved in creating a connection with RabbitMQ.
Connection parameters object that is passed into the connection adapter upon construction. The following parameters are passed and are used to negotiate communication with RabbitMQ:
Example:
import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials,
host='rabbit-server1',
virtual_host='/')
As of RabbitMQ 2.0, client side Channel.Flow has been removed [1]. Instead, the RabbitMQ broker uses TCP Backpressure to slow your client if it is delivering messages too fast. Pika attempts to help you handle this situation by providing a mechanism by which you may be notified if Pika has noticed too many frames have yet to be delivered. By registering a callback function with the add_backpressure_callback method of any connection adapter, your function will be called when Pika sees that a backlog of 10 times the average frame size you have been sending has been exceeded. You may tweak the notification multiplier value by calling the set_backpressure_multiplier method passing any integer value.
The following connection adapters are available for connecting with RabbitMQ:
Note
SelectConnection is the recommended method for using Pika under most circumstances. It supports multiple event notification methods including select, epoll, kqueue and poll.
By default SelectConnection will attempt to use the most appropriate event notification method for your system. In order to override the default behavior you may set the poller type by assigning a string value to the select_connection modules POLLER_TYPE attribute prior to creating the SelectConnection object instance. Valid values are: kqueue, poll, epoll, select
Poller Type Override Example:
import select_connection
select_connection.POLLER_TYPE = 'epoll'
connection = select_connection.SelectConnection()
See the Continuation-Passing Style example for an example of using SelectConnection.
Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.
Add a callback notification when the connection has closed
Add a callback notification when the connection has opened
Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
Returns a boolean reporting the current connection state
Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.
Note
Use It is recommended that you use SelectConnection and its method signatures are the same as AsyncoreConnection.
The AsyncoreConnection class is provided for legacy support and quicker porting from applications that used Pika version 0.5.2 and prior.
Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.
Add a callback notification when the connection has closed
Add a callback notification when the connection has opened
Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
Returns a boolean reporting the current connection state
Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.
Tornado is an open source version of the scalable, non-blocking web server and tools that power FriendFeed. For more information on tornado, visit http://tornadoweb.org
Since the Tornado IOLoop blocks once it is started, it is suggested that you use a timer to add Pika to your tornado.Application instance after the HTTPServer has started.
The following is a simple, non-working example on how to add Pika to the Tornado IOLoop without blocking other applications from doing so. To see a fully workng example, see the Tornado Demo application in the examples.
Example:
from pika.adapters.tornado_connection import TornadoConnection
class PikaClient(object):
def connect(self):
self.connection = TornadoConnection(on_connected_callback=self.on_connected)
# Create our Tornado Application
application = tornado.web.Application([
(r"/", ExampleHandler)
], **settings)
# Create our Pika Client
application.pika = PikaClient()
# Start the HTTPServer
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8080)
# Get a handle to the instance of IOLoop
ioloop = tornado.ioloop.IOLoop.instance()
# Add our Pika connect to the IOLoop since we loop on ioloop.start
ioloop.add_timeout(500, application.pika.connect)
# Start the IOLoop
ioloop.start()
Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.
Add a callback notification when the connection has closed
Add a callback notification when the connection has opened
Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
Returns a boolean reporting the current connection state
Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.
The BlockingConnection creates a layer on top of Pika’s asynchronous core providng methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you are still required to implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish.
Basic.Get is a blocking call which will either return the Method Frame, Header Frame and Body of a message, or it will return a Basic.GetEmpty frame as the Method Frame.
For more information on using the BlockingConnection, see BlockingChannel
Publishing Example:
from pika.adapters import BlockingConnection
from pika import BasicProperties
# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Send a message
channel.basic_publish(exchange='',
routing_key="test",
body="Hello World!",
properties=BasicProperties(content_type="text/plain",
delivery_mode=1))
Consuming Example:
from pika.adapters import BlockingConnection
# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True,
exclusive=False, auto_delete=False)
# Start our counter at 0
messages = 0
# Method that will receive our messages and stop consuming after 10
def _on_message(channel, method, header, body):
print "Message:"
print "\t%r" % method
print "\t%r" % header
print "\t%r" % body
# Acknowledge message receipt
channel.basic_ack(method.delivery_tag)
# We've received 10 messages, stop consuming
global messages
messages += 1
if messages > 10:
channel.stop_consuming()
# Setup up our consumer callback
channel.basic_consume(_on_message, queue="test")
# This is blocking until channel.stop_consuming is called and will allow us to receive messages
channel.start_consuming()
The BlockingConnection adapter is meant for simple implementations where you want to have blocking behavior. The behavior layered on top of the async library. Because of the nature of AMQP there are a few callbacks one needs to do, even in a blocking implementation. These include receiving messages from Basic.Deliver, Basic.GetOk, and Basic.Return.
Create a new channel with the next available or specified channel #
Add a timeout calling callback to our stack that will execute in delay_sec.
Remove a timeout from the stack
Process our self._timeouts event stack
Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.
Add a callback notification when the connection has closed
Add a callback notification when the connection has opened
Returns a boolean reporting the current connection state
Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.
Footnotes
| [1] | “more effective flow control mechanism that does not require cooperation from clients and reacts quickly to prevent the broker from exhausing memory - see http://www.rabbitmq.com/extensions.html#memsup” from http://lists.rabbitmq.com/pipermail/rabbitmq-announce/attachments/20100825/2c672695/attachment.txt |