Connecting to RabbitMQ

Pika provides multiple adapters to connect to RabbitMQ allowing for different ways of providing socket communication depending on what is appropriate for your application.

  • SelectConnection: A native event based connection adapter that implements select, kqueue, poll and epoll.
  • AsyncoreConnection: Legacy adapter kept for convenience of previous Pika users.
  • TornadoConnection: Connection adapter for use with the Tornado IO Loop.
  • BlockingConnection: Enables blocking, synchronous operation on top of library for simple uses.

IO and Event Looping

Due to the need to check for and send content on a consistent basis, Pika now implements or extends IOLoops in each of its asynchronous connection adapters. These IOLoops are blocking methods which loop and listen for events. Each asynchronous adapters follows the same standard for invoking the IOLoop. The IOLoop is created when the connection adapter is created. To start it simply call the connection.ioloop.start() method.

If you are using an external IOLoop such as Tornado’s IOLoop, you may invoke that as you normally would and then add the adapter to it.

Example:

from pika.adapters import SelectConnection

# Create our connection object
connection = SelectConnection()

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Continuation-Passing Style

Interfacing with Pika asynchronously is done by passing in callback methods you would like to have invoked when a certain event has completed. For example, if you are going to declare a queue, you pass in a method that will be called when the RabbitMQ server returns a Queue.DeclareOk response.

In our example below we use the following four easy steps:

  1. We start by creating our connection object, then starting our event loop.
  2. When we are connected, the on_connected method is called. In that method we create a channel.
  3. When the channel is created, the on_channel_open method is called. In that method we declare a queue.
  4. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us.
  5. When RabbitMQ has a message to send us, it call the handle_delivery method passing the AMQP Method frame, Header frame and Body.

Note

Step #1 is on line #28 and Step #2 is on line #6. This is so that Python knows about the functions we’ll call in Steps #2 through #5.

Example:

from pika.adapters import SelectConnection

# Create a global channel variable to hold our channel object in
channel = None

# Step #2
def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    # Open a channel
    connection.channel(on_channel_open)

# Step #3
def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)

# Step #4
def on_queue_declared(frame):
    """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
    channel.basic_consume(handle_delivery, queue='test')

# Step #5
def handle_delivery(channel, method, header, body):
    """Called when we receive a message from RabbitMQ"""
    print body

# Step #1: Connect to RabbitMQ
connection = SelectConnection(parameters, on_connected)

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Credentials

The credentials module provides the mechanism by which you pass the username and password to the connection.ConnectionParameters() class when it is created.

class credentials.PlainCredentials(username, password, erase_on_connect=False)

The PlainCredentials class returns the properly formatted username and password to the Connection. As of this version of Pika, only PlainCredentials are supported. To authenticate with Pika, simply create a credentials object passing in the username and password and pass that to the ConnectionParameters object.

If you do not pass in credentials to the ConnectionParameters object, it will create credentials for ‘guest’ with the password of ‘guest’.

If you pass True to erase_on_connect the credentials will not be stored in memory after the Connection attempt has been made. This means that you will not be able to use a Reconnection Strategy successfully if this is enabled.

Parameters:

  • username: plain text string value
  • password: plain text string value
  • erase_on_connect: bool erase credentials on connect. Default: False
erase_credentials()

Called by Connection when it no longer needs the credentials

response_for(start)

Validate that our type of authentication is supported

Example:

import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials)

Connection Parameters

To connect to RabbitMQ via an adapter, first you must construct a ConnectionParameters object. This has all of the options which are involved in creating a connection with RabbitMQ.

class connection.ConnectionParameters(host='localhost', port=5672, virtual_host='/', credentials=None, channel_max=0, frame_max=131072, heartbeat=False)

Connection parameters object that is passed into the connection adapter upon construction. The following parameters are passed and are used to negotiate communication with RabbitMQ:

  • host: Hostname or IP Address to connect to, defaults to localhost.
  • port: TCP port to connect to, defaults to 5672
  • virtual_host: RabbitMQ virtual host to use, defaults to /
  • credentials: A instance of a credentials class to authenticate with. Defaults to PlainCredentials for the guest user.
  • channel_max: Maximum number of channels to allow, defaults to 0 for None
  • frame_max: The maximum byte size for an AMQP frame. Defaults to 131072
  • heartbeat: Turn heartbeat checking on or off. Defaults to False.

Example:

import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials,
                                       host='rabbit-server1',
                                       virtual_host='/')

TCP Backpressure

As of RabbitMQ 2.0, client side Channel.Flow has been removed [1]. Instead, the RabbitMQ broker uses TCP Backpressure to slow your client if it is delivering messages too fast. Pika attempts to help you handle this situation by providing a mechanism by which you may be notified if Pika has noticed too many frames have yet to be delivered. By registering a callback function with the add_backpressure_callback method of any connection adapter, your function will be called when Pika sees that a backlog of 10 times the average frame size you have been sending has been exceeded. You may tweak the notification multiplier value by calling the set_backpressure_multiplier method passing any integer value.

Available Adapters

The following connection adapters are available for connecting with RabbitMQ:

SelectConnection

Note

SelectConnection is the recommended method for using Pika under most circumstances. It supports multiple event notification methods including select, epoll, kqueue and poll.

By default SelectConnection will attempt to use the most appropriate event notification method for your system. In order to override the default behavior you may set the poller type by assigning a string value to the select_connection modules POLLER_TYPE attribute prior to creating the SelectConnection object instance. Valid values are: kqueue, poll, epoll, select

Poller Type Override Example:

import select_connection
select_connection.POLLER_TYPE = 'epoll'
connection = select_connection.SelectConnection()

See the Continuation-Passing Style example for an example of using SelectConnection.

class adapters.select_connection.SelectConnection(parameters=None, on_open_callback=None, reconnection_strategy=None)
add_backpressure_callback(callback)

Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.

add_on_close_callback(callback)

Add a callback notification when the connection has closed

add_on_open_callback(callback)

Add a callback notification when the connection has opened

channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

close(code=200, text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

is_open

Returns a boolean reporting the current connection state

set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

AsyncoreConnection

Note

Use It is recommended that you use SelectConnection and its method signatures are the same as AsyncoreConnection.

The AsyncoreConnection class is provided for legacy support and quicker porting from applications that used Pika version 0.5.2 and prior.

class adapters.asyncore_connection.AsyncoreConnection(parameters=None, on_open_callback=None, reconnection_strategy=None)
add_backpressure_callback(callback)

Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.

add_on_close_callback(callback)

Add a callback notification when the connection has closed

add_on_open_callback(callback)

Add a callback notification when the connection has opened

channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

close(code=200, text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

is_open

Returns a boolean reporting the current connection state

set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

TornadoConnection

Tornado is an open source version of the scalable, non-blocking web server and tools that power FriendFeed. For more information on tornado, visit http://tornadoweb.org

Since the Tornado IOLoop blocks once it is started, it is suggested that you use a timer to add Pika to your tornado.Application instance after the HTTPServer has started.

The following is a simple, non-working example on how to add Pika to the Tornado IOLoop without blocking other applications from doing so. To see a fully workng example, see the Tornado Demo application in the examples.

Example:

from pika.adapters.tornado_connection import TornadoConnection

class PikaClient(object):
    def connect(self):
        self.connection = TornadoConnection(on_connected_callback=self.on_connected)

# Create our Tornado Application
application = tornado.web.Application([
    (r"/", ExampleHandler)
], **settings)

# Create our Pika Client
application.pika = PikaClient()

# Start the HTTPServer
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8080)

# Get a handle to the instance of IOLoop
ioloop = tornado.ioloop.IOLoop.instance()

# Add our Pika connect to the IOLoop since we loop on ioloop.start
ioloop.add_timeout(500, application.pika.connect)

# Start the IOLoop
ioloop.start()
class adapters.tornado_connection.TornadoConnection(parameters=None, on_open_callback=None, reconnection_strategy=None)
add_backpressure_callback(callback)

Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.

add_on_close_callback(callback)

Add a callback notification when the connection has closed

add_on_open_callback(callback)

Add a callback notification when the connection has opened

channel(on_open_callback, channel_number=None)

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

close(code=200, text='Normal shutdown')

Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.

is_open

Returns a boolean reporting the current connection state

set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

BlockingConnection

The BlockingConnection creates a layer on top of Pika’s asynchronous core providng methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you are still required to implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using basic_consume or if you want to be notified of a delivery failure when using basic_publish.

Basic.Get is a blocking call which will either return the Method Frame, Header Frame and Body of a message, or it will return a Basic.GetEmpty frame as the Method Frame.

For more information on using the BlockingConnection, see BlockingChannel

Publishing Example:

from pika.adapters import BlockingConnection
from pika import BasicProperties

# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)

# Send a message
channel.basic_publish(exchange='',
                      routing_key="test",
                      body="Hello World!",
                      properties=BasicProperties(content_type="text/plain",
                                                 delivery_mode=1))

Consuming Example:

from pika.adapters import BlockingConnection

# Open a connection to RabbitMQ on localhost using all default parameters
connection = BlockingConnection()

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="test", durable=True,
                      exclusive=False, auto_delete=False)

# Start our counter at 0
messages = 0

# Method that will receive our messages and stop consuming after 10
def _on_message(channel, method, header, body):
    print "Message:"
    print "\t%r" % method
    print "\t%r" % header
    print "\t%r" % body

    # Acknowledge message receipt
    channel.basic_ack(method.delivery_tag)

    # We've received 10 messages, stop consuming
    global messages
    messages += 1
    if messages > 10:
        channel.stop_consuming()

# Setup up our consumer callback
channel.basic_consume(_on_message, queue="test")

# This is blocking until channel.stop_consuming is called and will allow us to receive messages
channel.start_consuming()
class adapters.blocking_connection.BlockingConnection(parameters=None, reconnection_strategy=None)

The BlockingConnection adapter is meant for simple implementations where you want to have blocking behavior. The behavior layered on top of the async library. Because of the nature of AMQP there are a few callbacks one needs to do, even in a blocking implementation. These include receiving messages from Basic.Deliver, Basic.GetOk, and Basic.Return.

channel(channel_number=None)

Create a new channel with the next available or specified channel #

add_timeout(delay_sec, callback)

Add a timeout calling callback to our stack that will execute in delay_sec.

remove_timeout(timeout_id)

Remove a timeout from the stack

process_timeouts()

Process our self._timeouts event stack

add_backpressure_callback(callback)

Add a callback notification when we think backpressue is being applied due to the size of the output buffer being exceeded.

add_on_close_callback(callback)

Add a callback notification when the connection has closed

add_on_open_callback(callback)

Add a callback notification when the connection has opened

is_open

Returns a boolean reporting the current connection state

set_backpressure_multiplier(value=10)

Alter the backpressure multiplier value. We set this to 10 by default. This value is used to raise warnings and trigger the backpressure callback.

Footnotes

[1]“more effective flow control mechanism that does not require cooperation from clients and reacts quickly to prevent the broker from exhausing memory - see http://www.rabbitmq.com/extensions.html#memsup” from http://lists.rabbitmq.com/pipermail/rabbitmq-announce/attachments/20100825/2c672695/attachment.txt