Object
@private
API
same as in RabbitMQ Java client
# File lib/bunny/transport.rb, line 30 def initialize(session, host, port, opts) @session = session @session_thread = opts[:session_thread] @host = host @port = port @opts = opts @logger = session.logger @tls_enabled = tls_enabled?(opts) @tls_certificate_path = tls_certificate_path_from(opts) @tls_key_path = tls_key_path_from(opts) @tls_certificate = opts[:tls_certificate] || opts[:ssl_cert_string] @tls_key = opts[:tls_key] || opts[:ssl_key_string] @tls_certificate_store = opts[:tls_certificate_store] default_ca_file = ENV[OpenSSL::X509::DEFAULT_CERT_FILE_ENV] || OpenSSL::X509::DEFAULT_CERT_FILE default_ca_path = ENV[OpenSSL::X509::DEFAULT_CERT_DIR_ENV] || OpenSSL::X509::DEFAULT_CERT_DIR @tls_ca_certificates = opts.fetch(:tls_ca_certificates, [ default_ca_file, File.join(default_ca_path, 'ca-certificates.crt'), # Ubuntu/Debian File.join(default_ca_path, 'ca-bundle.crt'), # Amazon Linux & Fedora/RHEL File.join(default_ca_path, 'ca-bundle.pem') # OpenSUSE ]) @verify_peer = opts[:verify_ssl] || opts[:verify_peer] @read_write_timeout = opts[:socket_timeout] || 3 @read_write_timeout = nil if @read_write_timeout == 0 @connect_timeout = self.timeout_from(opts) @connect_timeout = nil if @connect_timeout == 0 @disconnect_timeout = @read_write_timeout || @connect_timeout @writes_mutex = @session.mutex_impl.new maybe_initialize_socket prepare_tls_context if @tls_enabled end
# File lib/bunny/transport.rb, line 181 def close(reason = nil) @socket.close if open? end
# File lib/bunny/transport.rb, line 100 def configure_socket(&block) block.call(@socket) if @socket end
# File lib/bunny/transport.rb, line 104 def configure_tls_context(&block) block.call(@tls_context) if @tls_context end
# File lib/bunny/transport.rb, line 83 def connect if uses_ssl? @socket.connect @socket.post_connection_check(host) if uses_tls? && @verify_peer @status = :connected @socket else # no-op end end
# File lib/bunny/transport.rb, line 96 def connected? :not_connected == @status && open? end
# File lib/bunny/transport.rb, line 193 def flush @socket.flush if @socket end
# File lib/bunny/transport.rb, line 251 def initialize_socket begin @socket = Bunny::Timeout.timeout(@connect_timeout, ClientTimeout) do Bunny::Socket.open(@host, @port, :keepalive => @opts[:keepalive], :socket_timeout => @connect_timeout) end rescue StandardError, ClientTimeout => e @status = :not_connected raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port) end @socket end
# File lib/bunny/transport.rb, line 266 def maybe_initialize_socket initialize_socket if !@socket || closed? end
# File lib/bunny/transport.rb, line 185 def open? @socket && !@socket.closed? end
# File lib/bunny/transport.rb, line 270 def post_initialize_socket @socket = if uses_tls? wrap_in_tls_socket(@socket) else @socket end end
# File lib/bunny/transport.rb, line 197 def read_fully(*args) @socket.read_fully(*args) end
Exposed primarily for Bunny::Channel @private
# File lib/bunny/transport.rb, line 209 def read_next_frame(opts = {}) header = @socket.read_fully(7) # TODO: network issues here will sometimes cause # the socket method return an empty string. We need to log # and handle this better. # type, channel, size = begin # AMQ::Protocol::Frame.decode_header(header) # rescue AMQ::Protocol::EmptyResponseError => e # puts "Got AMQ::Protocol::EmptyResponseError, header is #{header.inspect}" # end type, channel, size = AMQ::Protocol::Frame.decode_header(header) payload = @socket.read_fully(size) frame_end = @socket.read_fully(1) # 1) the size is miscalculated if payload.bytesize != size raise BadLengthError.new(size, payload.bytesize) end # 2) the size is OK, but the string doesn't end with FINAL_OCTET raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET AMQ::Protocol::Frame.new(type, payload, channel) end
# File lib/bunny/transport.rb, line 201 def read_ready?(timeout = nil) io = IO.select([@socket].compact, nil, nil, timeout) io && io[0].include?(@socket) end
Sends frame to the peer.
@raise [ConnectionClosedError] @private
# File lib/bunny/transport.rb, line 160 def send_frame(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write(frame.encode) end end
Sends frame to the peer without timeout control.
@raise [ConnectionClosedError] @private
# File lib/bunny/transport.rb, line 172 def send_frame_without_timeout(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write_without_timeout(frame.encode) end end
# File lib/bunny/transport.rb, line 77 def uses_ssl? @tls_enabled end
# File lib/bunny/transport.rb, line 72 def uses_tls? @tls_enabled end
Writes data to the socket. If read/write timeout was specified, Bunny::ClientTimeout will be raised if the operation times out.
@raise [ClientTimeout]
# File lib/bunny/transport.rb, line 112 def write(data) begin if @read_write_timeout Bunny::Timeout.timeout(@read_write_timeout, Bunny::ClientTimeout) do if open? @writes_mutex.synchronize { @socket.write(data) } @socket.flush end end else if open? @writes_mutex.synchronize { @socket.write(data) } @socket.flush end end rescue SystemCallError, Bunny::ClientTimeout, Bunny::ConnectionError, IOError => e @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})" close @status = :not_connected if @session.automatically_recover? @session.handle_network_failure(e) else @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end
Writes data to the socket without timeout checks
# File lib/bunny/transport.rb, line 141 def write_without_timeout(data) begin @writes_mutex.synchronize { @socket.write(data) } @socket.flush rescue SystemCallError, Bunny::ConnectionError, IOError => e close if @session.automatically_recover? @session.handle_network_failure(e) else @session_thread.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e)) end end end
# File lib/bunny/transport.rb, line 307 def check_local_path!(s) raise ArgumentError, "cannot read TLS certificate or key from #{s}" unless File.file?(s) && File.readable?(s) end
# File lib/bunny/transport.rb, line 349 def initialize_tls_certificate_store(certs) certs = certs.select { |path| File.readable? path } @logger.debug "Using CA certificates at #{certs.join(', ')}" if certs.empty? @logger.error "No CA certificates found, add one with :tls_ca_certificates" end OpenSSL::X509::Store.new.tap do |store| certs.each { |path| store.add_file(path) } end end
# File lib/bunny/transport.rb, line 322 def initialize_tls_context(ctx) ctx.cert = OpenSSL::X509::Certificate.new(@tls_certificate) if @tls_certificate ctx.key = OpenSSL::PKey::RSA.new(@tls_key) if @tls_key ctx.cert_store = if @tls_certificate_store @tls_certificate_store else initialize_tls_certificate_store(@tls_ca_certificates) end if !@tls_certificate @logger.warn Using TLS but no client certificate is provided! If RabbitMQ is configured to verify peer certificate, connection upgrade will fail! end if @tls_certificate && !@tls_key @logger.warn "Using TLS but no client private key is provided!" end # setting TLS/SSL version only works correctly when done # vis set_params. MK. ctx.set_params(:ssl_version => @opts.fetch(:tls_protocol, DEFAULT_TLS_PROTOCOL)) ctx.set_params(:verify_mode => OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT) if @verify_peer ctx end
# File lib/bunny/transport.rb, line 292 def prepare_tls_context read_tls_keys! @tls_context = initialize_tls_context(OpenSSL::SSL::SSLContext.new) end
# File lib/bunny/transport.rb, line 311 def read_tls_keys! if @tls_certificate_path check_local_path!(@tls_certificate_path) @tls_certificate = File.read(@tls_certificate_path) end if @tls_key_path check_local_path!(@tls_key_path) @tls_key = File.read(@tls_key_path) end end
# File lib/bunny/transport.rb, line 360 def timeout_from(options) options[:connect_timeout] || options[:connection_timeout] || options[:timeout] || DEFAULT_CONNECTION_TIMEOUT end
# File lib/bunny/transport.rb, line 284 def tls_certificate_path_from(opts) opts[:tls_cert] || opts[:ssl_cert] || opts[:tls_cert_path] || opts[:ssl_cert_path] || opts[:tls_certificate_path] || opts[:ssl_certificate_path] end
# File lib/bunny/transport.rb, line 280 def tls_enabled?(opts) opts[:tls] || opts[:ssl] || (opts[:port] == AMQ::Protocol::TLS_PORT) || false end
# File lib/bunny/transport.rb, line 288 def tls_key_path_from(opts) opts[:tls_key] || opts[:ssl_key] || opts[:tls_key_path] || opts[:ssl_key_path] end
# File lib/bunny/transport.rb, line 298 def wrap_in_tls_socket(socket) raise ArgumentError, "cannot wrap nil into TLS socket, @tls_context is nil. This is a Bunny bug." unless socket raise "cannot wrap a socket into TLS socket, @tls_context is nil. This is a Bunny bug." unless @tls_context s = Bunny::SSLSocket.new(socket, @tls_context) s.sync_close = true s end
Generated with the Darkfish Rdoc Generator 2.