AMQ::Client::Exchange
h2. What are AMQP exchanges?
AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.
Entities that forward messages to consumers (or consumers fetch messages from on demand) are called {Queue queues}. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.
h2. AMQP bindings
Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.
h2. Exchange types
There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.
h2. Direct exchanges
Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called “routing key”. When messages arrive to a direct exchange, broker takes that message’s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.
Because very often queues are bound with the same routing key as queue’s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.
As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will raise an AMQP::Error and fail. In practice these default exchanges are never used directly by client code.
h2. Fanout exchanges
Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it’s own copy of the message.
h2. Topic exchanges
Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.
As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.
The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.
When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.
h2. Headers exchanges
When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to “all”.
A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).
As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.
h2. Key methods
Key methods of Exchange class are
h2. Exchange durability and persistence of messages.
Learn more in our {file:docs/Durability.textile Durability guide}.
h2. RabbitMQ extensions.
AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in {file:docs/VendorSpecificExtensions.textile}
@note Please make sure you read a section on exchanges durability vs. messages
persistence.
@see www.rabbitmq.com/faq.html#managing-concepts-exchanges Exchanges explained in the RabbitMQ FAQ @see www.rabbitmq.com/faq.html#Binding-and-Routing Bindings and routing explained in the RabbitMQ FAQ @see Channel#default_exchange @see Channel#direct @see Channel#fanout @see Channel#topic @see Channel#headers @see Queue @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.1) @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.5) @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3)
The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.
@param [AMQP::Channel] channel Channel to use. If not given, new AMQP channel
will be opened on the default AMQP connection (accessible as AMQP.connection).
@example Publishing a messages to the tasks queue
channel = AMQP::Channel.new(connection) tasks_queue = channel.queue("tasks") AMQP::Exchange.default(channel).publish("make clean", routing_key => "tasks")
@see Exchange @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4) @note Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct
exchange that doesn't have any special routing semantics.
@return [Exchange] An instance that corresponds to the default exchange (of type direct). @api public
# File lib/amqp/exchange.rb, line 169 def self.default(channel = nil) self.new(channel || AMQP::Channel.new, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true) end
See {Exchange Exchange class documentation} for introduction, information about exchange types, what uses cases they are good for and so on.
h2. Predeclared exchanges
If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won’t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.
@example Instantiating a fanout exchange using constructor
AMQP.connect do |connection| AMQP::Channel.new(connection) do |channel| AMQP::Exchange.new(channel, :fanout, "search.index.updates") do |exchange, declare_ok| # by now exchange is ready and waiting end end end
@example Instantiating a direct exchange using {Channel#direct}
AMQP.connect do |connection| AMQP::Channel.new(connection) do |channel| channel.direct("email.replies_listener") do |exchange, declare_ok| # by now exchange is ready and waiting end end end
@param [Channel] channel AMQP channel this exchange is associated with @param [Symbol] type Exchange type @param [String] name Exchange name
@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
already exist. The client can use this to check whether an exchange exists without modifying the server state.
@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).
@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.
@option opts [Boolean] :internal (false) If set, the exchange may not be used directly by publishers, but
only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. *This is a RabbitMQ-specific extension.*
@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@option opts [Boolean] :no_declare (true) If set, exchange declaration command won’t be sent to the broker. Allows to forcefully
avoid declaration. We recommend that only experienced developers consider this option.
@option opts [String] :default_routing_key (nil) Default routing key that will be used by {Exchange#publish} when no routing key is not passed explicitly.
It is perfectly fine for applications to always specify routing key to {Exchange#publish}.
@option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Some brokers implement
AMQP extensions using x-prefixed declaration arguments.
@raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
@yield [exchange, declare_ok] Yields successfully declared exchange instance and AMQP method (exchange.declare-ok) instance. The latter is optional. @yieldparam [Exchange] exchange Exchange that is successfully declared and is ready to be used. @yieldparam [AMQP::Protocol::Exchange::DeclareOk] declare_ok AMQP exchange.declare-ok) instance.
@see Channel#default_exchange @see Channel#direct @see Channel#fanout @see Channel#topic @see Channel#headers @see Queue @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3)
@return [Exchange] @api public
# File lib/amqp/exchange.rb, line 298 def initialize(channel, type, name, opts = {}, &block) @channel = channel @type = type @opts = self.class.add_default_options(type, name, opts, block) @default_routing_key = opts[:routing_key] || opts[:key] || AMQ::Protocol::EMPTY_STRING @name = name unless name.empty? @status = :unknown @default_publish_options = (opts.delete(:default_publish_options) || { :routing_key => @default_routing_key, :mandatory => false, :immediate => false }).freeze @default_headers = (opts.delete(:default_headers) || { :content_type => DEFAULT_CONTENT_TYPE, :persistent => false, :priority => 0 }).freeze super(channel.connection, channel, name, type) # The AMQP 0.8 specification (as well as 0.9.1) in 1.1.4.2 mentiones # that Exchange.Declare-Ok confirms the name of the exchange (because # of automaticallynamed), which is logical to interpret that this # functionality should be the same as for Queue (though it isn't # explicitely told in the specification). In fact, RabbitMQ (and # probably other implementations as well) doesn't support it and # there is a default exchange with an empty name (so-called default # or nameless exchange), so if we'd send Exchange.Declare(exchange=""), # then RabbitMQ interpret it as if we'd try to redefine this default # exchange so it'd produce an error. unless name == "amq.#{type}" or name.empty? or opts[:no_declare] @status = :opening unless @opts[:no_declare] @channel.once_open do if block shim = Proc.new do |exchange, declare_ok| case block.arity when 1 then block.call(exchange) else block.call(exchange, declare_ok) end end self.declare(passive = @opts[:passive], durable = @opts[:durable], auto_delete = @opts[:auto_delete], nowait = @opts[:nowait], @opts[:arguments], &shim) else self.declare(passive = @opts[:passive], durable = @opts[:durable], auto_delete = @opts[:auto_delete], nowait = @opts[:nowait], @opts[:arguments]) end end end else # Call the callback immediately, as given exchange is already # declared. @status = :opened block.call(self) if block end @on_declare = block end
@return [Boolean] true if this exchange is automatically deleted when it is no longer used @api public
# File lib/amqp/exchange.rb, line 538 def auto_deleted? !!@opts[:auto_delete] end
Compatibility alias for on_declare.
@api public @deprecated @return [call]
# File lib/amqp/exchange.rb, line 202 def callback @on_declare end
@return [Channel] @api public
# File lib/amqp/exchange.rb, line 362 def channel @channel end
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.
@example Deleting an exchange
exchange = AMQP::Channel.direct("search.indexing") exchange.delete
@option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should
not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.
@option opts [Boolean] :if_unused (false) If set, the server will only delete the exchange if it has no queue
bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.
@return [NilClass] nil @api public
# File lib/amqp/exchange.rb, line 510 def delete(opts = {}, &block) @channel.once_open do super(opts.fetch(:if_unused, false), opts.fetch(:nowait, false), &block) end # backwards compatibility nil end
Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).
Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.
h2. Data serialization
Note that this method calls to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so “BSON”:bsonspec.org may be a good fit.
h2. Publishing and message persistence
In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:
Persistent messages
Messages acknowledgements
Transactions
(a RabbitMQ-specific extension) Publisher confirms
This is a broad topic and we dedicate a separate guide, {file:docs/Durability.textile Durability and message persistence}, to it.
h2. Publishing callback and guarantees it DOES NOT offer
Exact moment when message is published is not determined and depends on many factors, including machine’s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ
@note Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a “delivery callback”.
The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions or lightweight "publisher confirms" RabbitMQ extension supported by amqp gem. See {file:docs/Durability.textile Durability and message persistence} and {file:docs/Exchanges.textile Working With Exchanges} guides for details.
h2. Event loop blocking
When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:
Run EventMachine in a separate thread.
Use EventMachine.next_tick.
Use EventMachine.defer to offload operation to EventMachine thread pool.
TBD: this subject is worth a separate guide
h2. Sending one-off messages
If you need to send a one-off message and then stop the event loop, pass a block to {Exchange#publish} that will be executed after message is pushed down the network stack, and use {AMQP::Session#disconnect} to properly tear down AMQP connection (see example under Examples section below).
@example Publishing a one-off message and properly closing AMQP connection then stopping the event loop:
exchange.publish(data) do connection.disconnect { EventMachine.stop } end
@param [to_s] payload Message payload (content). Note that this method calls to_s on payload argument value.
You are encouraged to take care of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library).
@option options [String] :routing_key (nil) Specifies message routing key. Routing key determines
what queues messages are delivered to (exact routing algorithms vary between exchange types).
@option options [Boolean] :mandatory (false) This flag tells the server how to react if the message cannot be
routed to a queue. If message is mandatory, the server will return unroutable message back to the client with basic.return AMQPmethod. If message is not mandatory, the server silently drops the message.
@option options [Boolean] :immediate (false) This flag tells the server how to react if the message cannot be
routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.
@option options [Boolean] :persistent (false) When true, this message will be persisted to disk and remain in the queue until
it is consumed. When false, the message is only kept in a transient store and will lost in case of server restart. When performance and latency are more important than durability, set :persistent => false. If durability is more important, set :persistent => true.
@option options [String] :content_type (application/octet-stream) Content-type of message payload.
@example Publishing without routing key
exchange = channel.fanout('search.indexer') # fanout exchanges deliver messages to bound queues unconditionally, # so routing key is unnecessary here exchange.publish("some data")
@example Publishing with a routing key
exchange = channel.direct('search.indexer') exchange.publish("some data", :routing_key => "search.index.updates")
@return [Exchange] self
@note Please make sure you read {file:docs/Durability.textile Durability an message persistence} guide that covers exchanges durability vs. messages
persistence.
@api public
# File lib/amqp/exchange.rb, line 475 def publish(payload, options = {}, &block) opts = @default_publish_options.merge(options) @channel.once_open do properties = @default_headers.merge(options) properties[:delivery_mode] = properties.delete(:persistent) ? 2 : 1 super(payload.to_s, opts[:key] || opts[:routing_key] || @default_routing_key, properties, opts[:mandatory], opts[:immediate]) # don't pass block to AMQ::Client::Exchange#publish because it will be executed # immediately and we want to do it later. See ruby-amqp/amqp/#67 MK. EventMachine.next_tick(&block) if block end self end
Resets queue state. Useful for error handling. @api plugin
# File lib/amqp/exchange.rb, line 545 def reset initialize(@channel, @type, @name, @opts) end
@return [Boolean] true if this exchange is transient (non-durable) @note Please make sure you read {Exchange Exchange class} documentation section on exchanges durability vs. messages
persistence.
@api public
# File lib/amqp/exchange.rb, line 531 def transient? !self.durable? end
Generated with the Darkfish Rdoc Generator 2.