Path: | docs/Queues.textile |
Last Update: | Tue Apr 09 04:47:52 +0000 2013 |
# @title Ruby AMQP gem: Working with queues
h1. Working with queues
h2. About this guide
This guide covers everything related to queues in the AMQP v0.9.1 specification, common usage scenarios and how to accomplish typical operations using the amqp gem. This work is licensed under a <a rel="license" href="Creative">creativecommons.org/licenses/by/3.0/">Creative Commons Attribution 3.0 Unported License</a> (including images & stylesheets). The source is available "on Github":github.com/ruby-amqp/amqp/tree/master/docs.
h2. Which versions of the amqp gem does this guide cover?
This guide covers v0.8.0 and later of the "Ruby amqp gem":github.com/ruby-amqp/amqp.
h2. Queues in AMQP v0.9.1 - overview
h3. What are AMQP queues?
Queues store and forward messages to consumers. They are similar to mailboxes in SMTP. Messages flow from producing applications to {file:docs/Exchanges.textile exchanges} that route them to queues and finally queues deliver the messages to consumer applications (or consumer applications fetch messages as needed).
Note that unlike some other messaging protocols/systems, messages are not delivered directly to queues. They are delivered to exchanges that route messages to queues using rules known as bindings.
AMQP is a programmable protocol, so queues and bindings alike are declared by applications.
h3. Concept of bindings
A binding is an association between a queue and an exchange. Queues must be bound to at least one exchange in order to receive messages from publishers. Learn more about bindings in the {file:docs/Bindings.textile Bindings guide}.
h3. Queue attributes
Queues have several attributes associated with them:
* Name * Exclusivity * Durability * Whether the queue is auto-deleted when no longer used * Other metadata (sometimes called _X-arguments_)
These attributes define how queues can be used, what their life-cycle is like and other aspects of queue behavior.
The amqp gem represents queues as instances of {AMQP::Queue}.
h2. Queue names and declaring queues
Every AMQP queue has a name that identifies it. Queue names often contain several segments separated by a dot ".", in a similar fashion to URI path segments being separated by a slash "/", although almost any string can represent a segment (with some limitations - see below).
Before a queue can be used, it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the *same as those in the declaration*. When the existing queue attributes are not the same as those in the declaration a channel-level exception is raised. This case is explained later in this guide.
h3. Explicitly named queues
Applications may pick queue names or ask the broker to generate a name for them.
To declare a queue with a particular name, for example, "images.resize", pass it to the Queue class constructor:
<pre> queue = AMQP::Queue.new(channel, "images.resize", :auto_delete => true) </pre>
Full example: <script src="gist.github.com/998721.js"> </script>
h3. Server-named queues
To ask an AMQP broker to generate a unique queue name for you, pass an *empty string* as the queue name argument:
<pre> <code> AMQP::Queue.new(channel, "", :auto_delete => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"
end </code> </pre>
Full example: <script src="gist.github.com/998720.js"> </script>
The amqp gem allows server-named queues to be declared without callbacks:
<pre> queue = AMQP::Queue.new(channel, "", :auto_delete => true) </pre>
In this case, as soon as the AMQP broker reply (`queue.declare-ok` AMQP method) arrives, the queue object name will be assigned to the value that the broker generated. Many AMQP operations require a queue name, so before an {AMQP::Queue} instance receives its name, those operations are delayed. This example demonstrates this:
<pre> <code> queue = channel.queue("") queue.bind("builds").subscribe do |metadata, payload|
# message handling implementation...
end </code> </pre>
In this example, binding will be performed as soon as the queue has received its name generated by the broker. If a particular piece of code relies on the queue name being available immediately a callback should be used.
h3. Reserved queue name prefix
Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED) and a reply message similar to this:
<pre>ACCESS_REFUSED - queue name ‘amq.queue’ contains reserved prefix ‘amq.*’</pre>
h3. Queue re-declaration with different attributes
When queue declaration attributes are different from those that the queue already has, a channel-level exception with code 406 (PRECONDITION_FAILED) will be raised. The reply text will be similar to this:
<pre>PRECONDITION_FAILED - parameters for queue ‘amqpgem.examples.channel_exception’ in vhost ’/’ not equivalent</pre>
h2. Queue life-cycle patterns
According to the AMQP v0.9.1 specification, there are two common message queue life-cycle patterns:
* Durable message queues that are shared by many consumers and have an independent existence: i.e. they will continue to exist and collect messages whether or not there are consumers to receive them. * Temporary message queues that are private to one consumer and are tied to that consumer. When the consumer disconnects, the message queue is deleted.
There are some variations of these, such as shared message queues that are deleted when the last of many consumers disconnects.
Let us examine the example of a well-known service like an event collector (event logger). A logger is usually up and running regardless of the existence of services that want to log anything at a particular point in time. Other applications know which queues to use in order to communicate with the logger and can rely on those queues being available and able to survive broker restarts. In this case, explicitly named durable queues are optimal and the coupling that is created between applications is not an issue.
Another example of a well-known long-lived service is a distributed metadata/directory/locking server like "Apache Zookeeper":zookeeper.apache.org, "Google‘s Chubby":labs.google.com/papers/chubby.html or DNS. Services like this benefit from using well-known, not server-generated, queue names and so do any other applications that use them.
A different sort of scenario is in "a cloud setting" when some kind of worker/instance might start and stop at any time so that other applications cannot rely on it being available. In this case, it is possible to use well-known queue names, but a much better solution is to use server-generated, short-lived queues that are bound to topic or fanout exchanges in order to receive relevant messages.
Imagine a service that processes an endless stream of events - Twitter is one example. When traffic increases, development operations may start additional application instances in the cloud to handle the load. Those new instances want to subscribe to receive messages to process, but the rest of the system does not know anything about them and cannot rely on them being online or try to address them directly. The new instances process events from a shared stream and are the same as their peers. In a case like this, there is no reason for message consumers not to use queue names generated by the broker.
In general, use of explicitly named or server-named queues depends on the messaging pattern that your application needs. {www.eaipatterns.com/ Enterprise Integration Patterns} discusses many messaging patterns in depth and the RabbitMQ FAQ also has a section on {www.rabbitmq.com/faq.html#scenarios use cases}.
h2. Declaring a durable shared queue
To declare a durable shared queue, you pass a queue name that is a non-blank string and use the ":durable" option:
<pre> queue = AMQP::Queue.new(channel, "images.resize", :durable => true) </pre>
Full example: <script src="gist.github.com/998723.js"> </script>
the same example rewritten to use {AMQP::Channel#queue}:
<pre> <code> channel.queue("images.resize", :durable => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
end </code> </pre>
<script src="gist.github.com/998724.js"> </script>
h2. Declaring a temporary exclusive queue
To declare a server-named, exclusive, auto-deleted queue, pass "" (empty string) as the queue name and use the ":exclusive" and ":auto_delete" options:
<pre> <code> AMQP::Queue.new(channel, "", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
end </code> </pre>
Full example:
<script src="gist.github.com/998725.js"> </script>
The same example can be rewritten to use {AMQP::Channel#queue}:
<pre> <code> channel.queue("", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
puts "#{queue.name} is ready to go."
end </code> </pre>
Full example:
<script src="gist.github.com/998726.js"> </script>
Exclusive queues may only be accessed by the current connection and are deleted when that connection closes. The declaration of an exclusive queue by other connections is not allowed and will result in a channel-level exception with the code 405 (RESOURCE_LOCKED) and a reply message similar to
<pre>RESOURCE_LOCKED - cannot obtain exclusive access to locked queue ‘amqpgem.examples.queue’ in vhost ’/’</pre>
The following example demonstrates this: <script src="gist.github.com/1008529.js"> </script>
h2. Binding queues to exchanges
In order to receive messages, a queue needs to be bound to at least one exchange. Most of the time binding is explcit (done by applications). To bind a queue to an exchange, use {AMQP::Queue#bind} where the argument passed can be either an {AMQP::Exchange} instance or a string.
<pre> <code> queue.bind(exchange) do |bind_ok|
puts "Just bound #{queue.name} to #{exchange.name}"
end </code> </pre>
Full example: <script src="gist.github.com/998727.js"> </script>
The same example using a string without callback:
<pre> queue.bind("amq.fanout") </pre>
Full example: <script src="gist.github.com/998729.js"> </script>
h2. Subscribing to receive messages ("push API")
To set up a queue subscription to enable an application to receive messages as they arrive in a queue, one uses the {AMQP::Queue#subscribe} method. Then when a message arrives, the message header (metadata) and body (payload) are passed to the handler:
<pre> <code> queue.subscribe do |metadata, payload|
puts "Received a message: #{payload.inspect}."
end </code> </pre>
Full example: <script src="gist.github.com/998731.js"> </script>
Subscriptions for message delivery are usually referred to as consumers in the AMQP v0.9.1 specification, client library documentation and books. Consumers last as long as the channel that they were declared on, or until the client cancels them (unsubscribes).
Consumers are identified by consumer tags. If you need to obtain the consumer tag of a subscribed queue then use {AMQP::Queue#consumer_tag}.
h3. Accessing message metadata
The `header` object in the example above provides access to message metadata and delivery information:
* Message content type * Message content encoding * Message routing key * Message delivery mode (persistent or not) * Consumer tag this delivery is for * Delivery tag * Message priority * Whether or not message is redelivered * Producer application id
and so on. An example to demonstrate how to access some of those attributes:
<pre> <code> # producer exchange.publish("Hello, world!",
:app_id => "amqpgem.example", :priority => 8, :type => "kinda.checkin", # headers table keys can be anything :headers => { :coordinates => { :latitude => 59.35, :longitude => 18.066667 }, :participants => 11, :venue => "Stockholm" }, :timestamp => Time.now.to_i)
</code> </pre>
<pre> <code> # consumer queue.subscribe do |metadata, payload|
puts "metadata.routing_key : #{metadata.routing_key}" puts "metadata.content_type: #{metadata.content_type}" puts "metadata.priority : #{metadata.priority}" puts "metadata.headers : #{metadata.headers.inspect}" puts "metadata.timestamp : #{metadata.timestamp.inspect}" puts "metadata.type : #{metadata.type}" puts "metadata.delivery_tag: #{metadata.delivery_tag}" puts "metadata.redelivered : #{metadata.redelivered?}" puts "metadata.app_id : #{metadata.app_id}" puts "metadata.exchange : #{metadata.exchange}" puts puts "Received a message: #{payload}."
end </code> </pre>
Full example: <script src="gist.github.com/998739.js"> </script>
h3. Exclusive consumers
Consumers can request exclusive access to the queue (meaning only this consumer can access the queue). This is useful when you want a long-lived shared queue to be temporarily accessible by just one application (or thread, or process). If the application employing the exclusive consumer crashes or loses the TCP connection to the broker, then the channel is closed and the exclusive consumer is cancelled.
To exclusively receive messages from the queue, pass the ":exclusive" option to {AMQP::Queue#subscribe}:
<pre> <code> queue.subscribe(:exclusive => true) do |metadata, payload|
# message handling logic...
end </code> </pre>
TBD: describe what happens when exclusivity property is violated and how to handle it.
h3. Using multiple consumers per queue
Historically, amqp gem versions before v0.8.0.RC14 (current master branch in the repository) have had a "one consumer per Queue instance" limitation. Previously, to work around this problem, application developers had to open multiple channels and work with multiple queue instances on different channels. This is not very convenient and is surprising for developers familiar with AMQP clients for other languages.
With more and more Ruby implementations dropping the "GIL":en.wikipedia.org/wiki/Global_Interpreter_Lock, load balancing between multiple consumers in the same queue in the same OS process has become more and more common. In certain cases, even applications that do not need any concurrency benefit from having multiple consumers on the same queue in the same process.
Starting from amqp gem v0.8.0.RC14, it is possible to add any number of consumers by instantiating {AMQP::Consumer} directly:
<pre> <code> # non-exclusive consumer, consumer tag is generated consumer1 = AMQP::Consumer.new(channel, queue)
# non-exclusive consumer, consumer tag is explicitly given consumer2 = AMQP::Consumer.new(channel, queue, "#{queue.name}-consumer-#{rand}-#{Time.now}")
# exclusive consumer, consumer tag is generated consumer3 = AMQP::Consumer.new(channel, queue, nil, true) </code> </pre>
Instantiated consumers do not begin consuming messages immediately. This is because in certain cases, it is useful to add a consumer but make it active at a later time. To consume messages, use the {AMQP::Consumer#consume} method in combination with {AMQP::Consumer#on_delivery}:
<pre> <code> consumer1.consume.on_delivery do |metadata, payload|
@consumer1_mailbox << payload
end </code> </pre>
{AMQP::Consumer#on_delivery} takes a block that is used exactly like the block passed to {AMQP::Queue#subscribe}. In fact, {AMQP::Queue#subscribe} uses {AMQP::Consumer} under the hood, adding a _default consumer_ to the queue.
<span class="note"> Default consumers do not have any special properties, they just provide a convenient way for application developers to register multiple consumers and a means of preserving backwards compatibility. Application developers are always free to use AMQP::Consumer instances directly, or intermix them with AMQP::Queue#subscribe. </span>
Most of the public API methods on {AMQP::Consumer} return self, so it is possible to use method chaining extensively. An example from "amqp gem spec suite":github.com/ruby-amqp/amqp/tree/master/spec:
<pre> consumer1 = AMQP::Consumer.new(@channel, @queue).consume.on_delivery { |metadata, payload| mailbox1 << payload } consumer2 = AMQP::Consumer.new(@channel, @queue).consume.on_delivery { |metadata, payload| mailbox2 << payload } </pre>
To cancel a particular consumer, use {AMQP::Consumer#cancel} method. To cancel a default queue consumer, use {AMQP::Queue#unsubscribe}.
h3. Message acknowledgements
Consumer applications - applications that receive and process messages - may occasionally fail to process individual messages, or will just crash. There is also the possibility of network issues causing problems. This raises a question - "When should the AMQP broker remove messages from queues?" The AMQP v0.9.1 specification proposes two choices:
* After broker sends a message to an application (using either basic.deliver or basic.get-ok methods). * After the application sends back an acknowledgement (using basic.ack AMQP method).
The former choice is called the *automatic acknowledgement model*, while the latter is called the *explicit acknowledgement model*. With the explicit model, the application chooses when it is time to send an acknowledgement. It can be right after receiving a message, or after persisting it to a data store before processing, or after fully processing the message (for example, successfully fetching a Web page, processing and storing it into some persistent data store).
!github.com/ruby-amqp/amqp/raw/master/docs/diagrams/006_amqp_091_message_acknowledgements.png!
If a consumer dies without sending an acknowledgement, the AMQP broker will redeliver it to another consumer, or, if none are available at the time, the broker will wait until at least one consumer is registered for the same queue before attempting redelivery.
The acknowledgement model is chosen when a new consumer is registered for a queue. By default, {AMQP::Queue#subscribe} will use the automatic model. To switch to the explicit model, the ":ack" option should be used:
<pre> <code> queue.subscribe(:ack => true) do |metadata, payload|
# message handling logic...
end </code> </pre>
To demonstrate how redelivery works, let us have a look at the following code example:
<script src="gist.github.com/999396.js"> </script>
So what is going on here? This example uses 3 AMQP connections to imitate 3 applications, 1 producer and two consumers. Each AMQP connection opens a single channel:
<pre> <code> # open three connections to imitate three apps connection1 = AMQP.connect connection2 = AMQP.connect connection3 = AMQP.connect
channel_exception_handler = Proc.new { |ch, channel_close| EventMachine.stop; raise "channel error: #{channel_close.reply_text}" }
# open two channels channel1 = AMQP::Channel.new(connection1) channel1.on_error(&channel_exception_handler) # …
channel2 = AMQP::Channel.new(connection2) channel2.on_error(&channel_exception_handler) # …
# app 3 will just publish messages channel3 = AMQP::Channel.new(connection3) channel3.on_error(&channel_exception_handler) </code> </pre>
The consumers share a queue and the producer publishes messages to the queue periodically using an `amq.direct` exchange. Both "applications" subscribe to receive messages using the explicit acknowledgement model. The AMQP broker by default will send each message to the next consumer in sequence (this kind of load balancing is known as *round-robin*). This means that some messages will be delivered to consumer 1 and some to consumer 2.
<pre> <code> exchange = channel3.direct("amq.direct")
# …
queue1 = channel1.queue("amqpgem.examples.acknowledgements.explicit", :auto_delete => false) # purge the queue so that we do not get any redeliveries from previous runs queue1.purge queue1.bind(exchange).subscribe(:ack => true) do |metadata, payload|
# do some work sleep(0.2) # acknowledge some messages, they will be removed from the queue if rand > 0.5 # FYI: there is a shortcut, metadata.ack channel1.acknowledge(metadata.delivery_tag, false) puts "[consumer1] Got message ##{metadata.headers['i']}, ack-ed" else # odd messages are not ack-ed and will remain in the queue for redelivery # when app #1 connection is closed (either properly or due to a crash) puts "[consumer1] Got message ##{metadata.headers['i']}, SKIPPED" end
end
queue2 = channel2.queue!("amqpgem.examples.acknowledgements.explicit", :auto_delete => false) queue2.subscribe(:ack => true) do |metadata, payload|
metadata.ack # app 2 always acks messages puts "[consumer2] Received #{payload}, redelivered = #{metadata.redelivered}, ack-ed"
end </code> </pre>
To demonstrate message redelivery we make consumer 1 randomly select which messages to acknowledge. After 4 seconds we disconnect it (to imitate a crash). When that happens, the AMQP broker redelivers unacknowledged messages to consumer 2 which acknowledges them unconditionally. After 10 seconds, this example closes all outstanding connections and exits.
An extract of output produced by this example:
<pre>