Parent

Class/Module Index [+]

Quicksearch

Stomp::Connection

Low level connection which maps commands and supports synchronous receives

Attributes

autoflush[RW]

Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.

connection_frame[R]

The CONNECTED frame from the broker.

disconnect_receipt[R]

Any disconnect RECEIPT frame if requested.

hb_received[R]

Heartbeat receive has been on time.

hb_sent[R]

Heartbeat send has been successful.

jruby[R]

JRuby detected

protocol[R]

The Stomp Protocol version.

session[R]

A unique session ID, assigned by the broker.

Public Class Methods

default_port(ssl) click to toggle source

default_port returns the default port used by the gem for TCP or SSL.

# File lib/stomp/connection.rb, line 41
def self.default_port(ssl)
  ssl ? 61612 : 61613
end
new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

A new Connection object can be initialized using two forms:

Hash (this is the recommended Connection initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :reliable => true,
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :connect_timeout => 0,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
  :dmh => false,
  :closed_check => true,
  :hbser => false,
  :stompconn => false,
  :usecrlf => false,
  :max_hbread_fails => 0,
  :max_hbrlck_fails => 0,
  :fast_hbs_adjust => 0.0,
}

e.g. c = Stomp::Connection.new(hash)

Positional parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)
# File lib/stomp/connection.rb, line 88
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @protocol = Stomp::SPL_10 # Assumed at first
  @hb_received = true       # Assumed at first
  @hb_sent = true           # Assumed at first
  @hbs = @hbr = false       # Sending/Receiving heartbeats. Assume no for now.
  @jruby = false            # Assumed at first
  if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/
    @jruby = true
  end
  if login.is_a?(Hash)
    hashed_initialize(login)
  else
    @host = host
    @port = port
    @login = login
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5              # To override, use hashed parameters
    @connect_timeout = 0    # To override, use hashed parameters
    @logger = nil                   # To override, use hashed parameters
    @autoflush = false    # To override, use hashed parameters or setter
    @closed_check = true  # Run closed check in each protocol method
    @hbser = false        # Raise if heartbeat send exception
    @stompconn = false    # If true, use STOMP rather than CONNECT
    @usecrlf = false      # If true, use \r\n as line ends (1.2 only)
    @max_hbread_fails = 0 # 0 means never retry for HB read failures
    @max_hbrlck_fails = 0 # 0 means never retry for HB read lock failures
    @fast_hbs_adjust = 0.0 # Fast heartbeat senders sleep adjustment 
    warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
  end

  # Use Mutexes:  only one lock per each thread.
  # Reverted to original implementation attempt using Mutex.
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new

  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

open is syntactic sugar for 'Connection.new', see 'initialize' for usage.

# File lib/stomp/connection.rb, line 160
def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort aborts a transaction by name.

# File lib/stomp/connection.rb, line 258
def abort(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_abort)
    @logger.on_abort(log_params, headers)
  end
  transmit(Stomp::CMD_ABORT, headers)
end
ack(message_id, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement i.e. connection.subscribe("/queue/a", :ack => 'client'). Accepts an optional transaction header ( :transaction => 'some_transaction_id' ) Behavior is protocol level dependent, see the specifications or comments below.

# File lib/stomp/connection.rb, line 190
def ack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys

  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    when Stomp::SPL_11
      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
    else # Stomp::SPL_10
      # ACK has one required header, message-id, which must contain a value 
      # matching the message-id for the MESSAGE being acknowledged.
      headers[:'message-id'] = message_id
  end
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_ack)
    @logger.on_ack(log_params, headers)
  end
  transmit(Stomp::CMD_ACK, headers)
end
begin(name, headers = {}) click to toggle source

Begin starts a transaction, and requires a name for the transaction

# File lib/stomp/connection.rb, line 175
def begin(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_begin)
    @logger.on_begin(log_params, headers)
  end
  transmit(Stomp::CMD_BEGIN, headers)
end
client_ack?(message) click to toggle source

client_ack? determines if headers contain :ack => "client".

# File lib/stomp/connection.rb, line 371
def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end
closed?() click to toggle source

closed? tests if this connection is closed.

# File lib/stomp/connection.rb, line 170
def closed?
  @closed
end
commit(name, headers = {}) click to toggle source

Commit commits a transaction by name.

# File lib/stomp/connection.rb, line 246
def commit(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_commit)
    @logger.on_commit(log_params, headers)
  end
  transmit(Stomp::CMD_COMMIT, headers)
end
disconnect(headers = {}) click to toggle source

disconnect closes this connection. If requested, a disconnect RECEIPT will be received.

# File lib/stomp/connection.rb, line 378
def disconnect(headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  _headerCheck(headers)
  if @protocol >= Stomp::SPL_11
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
  end
  transmit(Stomp::CMD_DISCONNECT, headers)
  @disconnect_receipt = receive if headers[:receipt]
  if @logger && @logger.respond_to?(:on_disconnect)
    @logger.on_disconnect(log_params)
  end
  close_socket
end
hashed_initialize(params) click to toggle source

hashed_initialize prepares a new connection with a Hash of initialization parameters.

# File lib/stomp/connection.rb, line 138
def hashed_initialize(params)

  @parameters = refine_params(params)
  @reliable =  @parameters[:reliable]
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @connect_timeout =  @parameters[:connect_timeout]
  @logger =  @parameters[:logger]
  @autoflush = @parameters[:autoflush]
  @closed_check = @parameters[:closed_check]
  @hbser = @parameters[:hbser]
  @stompconn = @parameters[:stompconn]
  @usecrlf = @parameters[:usecrlf]
  @max_hbread_fails = @parameters[:max_hbread_fails]
  @max_hbrlck_fails = @parameters[:max_hbrlck_fails]
  @fast_hbs_adjust = @parameters[:fast_hbs_adjust]
  #sets the first host to connect
  change_host
end
hbrecv_count() click to toggle source

hbrecv_count returns the current connection's heartbeat receive count.

# File lib/stomp/connection.rb, line 494
def hbrecv_count()
  return 0 unless @hbrecv_count
  @hbrecv_count
end
hbrecv_interval() click to toggle source

hbrecv_interval returns the connection's heartbeat receive interval.

# File lib/stomp/connection.rb, line 482
def hbrecv_interval()
  return 0 unless @hbrecv_interval
  @hbrecv_interval / 1000.0 # ms
end
hbsend_count() click to toggle source

hbsend_count returns the current connection's heartbeat send count.

# File lib/stomp/connection.rb, line 488
def hbsend_count()
  return 0 unless @hbsend_count
  @hbsend_count
end
hbsend_interval() click to toggle source

hbsend_interval returns the connection's heartbeat send interval.

# File lib/stomp/connection.rb, line 476
def hbsend_interval()
  return 0 unless @hbsend_interval
  @hbsend_interval / 1000.0 # ms
end
nack(message_id, headers = {}) click to toggle source

STOMP 1.1+ NACK.

# File lib/stomp/connection.rb, line 220
def nack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys
  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    else # Stomp::SPL_11 only
      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
  end
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_nack)
    @logger.on_nack(log_params, headers)
  end
  transmit(Stomp::CMD_NACK, headers)
end
open?() click to toggle source

open? tests if this connection is open.

# File lib/stomp/connection.rb, line 165
def open?
  !@closed
end
poll() click to toggle source

poll returns a pending message if one is available, otherwise returns nil.

# File lib/stomp/connection.rb, line 396
def poll()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive()
end
publish(destination, message, headers = {}) click to toggle source

Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/connection.rb, line 318
def publish(destination, message, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = destination
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_publish)
    @logger.on_publish(log_params, message, headers)
  end
  transmit(Stomp::CMD_SEND, headers, message)
end
receive() click to toggle source

receive returns the next Message off of the wire. this can return nil in cases where:

  • the broker has closed the connection

  • the connection is not reliable

# File lib/stomp/connection.rb, line 408
def receive()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  super_result = __old_receive()
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    if @logger && @logger.respond_to?(:on_miscerr)
      @logger.on_miscerr(log_params, "es_recv: " + errstr)
    else
      $stderr.print errstr
    end
    # !!! This initiates a re-connect !!!
    # The call to __old_receive() will in turn call socket().  Before
    # that we should change the target host, otherwise the host that
    # just failed may be attempted first.
    _reconn_prep()
    #
    super_result = __old_receive()
  end
  #
  if super_result.nil? && !@reliable
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
    close_socket()
    @closed = true
    warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test)
  end
  if @logger && @logger.respond_to?(:on_receive)
    @logger.on_receive(log_params, super_result)
  end
  return super_result
end
set_logger(logger) click to toggle source

set_logger selects a new callback logger instance.

# File lib/stomp/connection.rb, line 441
def set_logger(logger)
  @logger = logger
end
sha1(data) click to toggle source

sha1 returns a SHA1 digest for arbitrary string data.

# File lib/stomp/connection.rb, line 457
def sha1(data)
  Digest::SHA1.hexdigest(data)
end
subscribe(name, headers = {}, subId = nil) click to toggle source

Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 271
def subscribe(name, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = name
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId if headers[:id].nil?
  end
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_subscribe)
    @logger.on_subscribe(log_params, headers)
  end

  # Store the subscription so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
    @subscriptions[subId] = headers
  end

  transmit(Stomp::CMD_SUBSCRIBE, headers)
end
unreceive(message, options = {}) click to toggle source

Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).

# File lib/stomp/connection.rb, line 333
def unreceive(message, options = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options)
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys
  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  # Prevent duplicate 'subscription' headers on subsequent receives
  message.headers.delete(:subscription) if message.headers[:subscription]

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if message.headers[:retry_count] <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body, 
        message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body, 
        message.headers.merge(:transaction => transaction_id, 
        :original_destination => message.headers[:destination], 
        :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end
unsubscribe(dest, headers = {}, subId = nil) click to toggle source

Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 296
def unsubscribe(dest, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = dest
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId unless headers[:id]
  end
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_unsubscribe)
    @logger.on_unsubscribe(log_params, headers)
  end
  transmit(Stomp::CMD_UNSUBSCRIBE, headers)
  if @reliable
    subId = dest if subId.nil?
    @subscriptions.delete(subId)
  end
end
uuid() click to toggle source

uuid returns a type 4 UUID.

# File lib/stomp/connection.rb, line 462
def uuid()
  b = []
  0.upto(15) do |i|
    b << rand(255)
  end
  b[6] = (b[6] & 0x0F) | 0x40
  b[8] = (b[8] & 0xbf) | 0x80
  #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
  rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
  b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
  rs
end
valid_utf8?(s) click to toggle source

valid_utf8? returns an indicator if the given string is a valid UTF8 string.

# File lib/stomp/connection.rb, line 446
def valid_utf8?(s)
  case RUBY_VERSION
  when /1\.8/
    rv = _valid_utf8?(s)
  else
    rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
  end
  rv
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.