class Mongo::Protocol::Msg

MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.

OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.

@api private

@since 2.5.0

Constants

DATABASE_IDENTIFIER

The identifier for the database name to execute the command on.

@since 2.5.0

FLAGS

Available flags for a OP_MSG message.

INTERNAL_KEYS

Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging.

@api private

OP_CODE

The operation code required to specify a OP_MSG message. @return [ Fixnum ] the operation code.

@since 2.5.0

Public Class Methods

new(flags, options, global_args, *sections) click to toggle source

Creates a new OP_MSG protocol message

@example Create a OP_MSG wire protocol message

Msg.new([:more_to_come], {}, { ismaster: 1 },
        { type: 1, payload: { identifier: 'documents', sequence: [..] } })

@param [ Array<Symbol> ] flags The flag bits. Current supported values are :more_to_come and :checksum_present. @param [ Hash ] options The options. There are currently no supported

options, this is a placeholder for the future.

@param [ BSON::Document, Hash ] global_args The global arguments,

becomes a section of payload type 0

@param [ BSON::Document, Hash ] sections Zero or more sections, in the format

{ type: 1, payload: { identifier: <String>, sequence: <Array<BSON::Document, Hash>> } } or
{ type: 0, payload: <BSON::Document, Hash> }

@option options [ true, false ] validating_keys Whether keys should be validated.

@api private

@since 2.5.0

Calls superclass method
# File lib/mongo/protocol/msg.rb, line 61
def initialize(flags, options, global_args, *sections)
  @flags = flags || []
  @options = options
  @global_args = global_args
  @sections = [ { type: 0, payload: global_args } ] + sections
  @request_id = nil
  super
end

Public Instance Methods

compress!(compressor, zlib_compression_level = nil) click to toggle source

Compress this message.

@param [ String, Symbol ] compressor The compressor to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.

@return [ Compressed, self ] A Protocol::Compressed message or self, depending on whether

this message can be compressed.

@since 2.5.0

# File lib/mongo/protocol/msg.rb, line 140
def compress!(compressor, zlib_compression_level = nil)
  if compressor && compression_allowed?(command.keys.first)
    Compressed.new(self, compressor, zlib_compression_level)
  else
    self
  end
end
payload() click to toggle source

Return the event payload for monitoring.

@example Return the event payload.

message.payload

@return [ BSON::Document ] The event payload.

@since 2.5.0

# File lib/mongo/protocol/msg.rb, line 90
def payload
  # Reorder keys in global_args for better logging - see
  # https://jira.mongodb.org/browse/RUBY-1591.
  # Note that even without the reordering, the payload is not an exact
  # match to what is sent over the wire because the command as used in
  # the published eent combines keys from multiple sections of the
  # payload sent over the wire.
  ordered_command = {}
  skipped_command = {}
  command.each do |k, v|
    if INTERNAL_KEYS.member?(k.to_s)
      skipped_command[k] = v
    else
      ordered_command[k] = v
    end
  end
  ordered_command.update(skipped_command)

  BSON::Document.new(
    command_name: ordered_command.keys.first.to_s,
    database_name: global_args[DATABASE_IDENTIFIER],
    command: ordered_command,
    request_id: request_id,
    reply: sections[0]
  )
end
replyable?() click to toggle source

Whether the message expects a reply from the database.

@example Does the message require a reply?

message.replyable?

@return [ true, false ] If the message expects a reply.

@since 2.5.0

# File lib/mongo/protocol/msg.rb, line 78
def replyable?
  @replyable ||= !flags.include?(:more_to_come)
end
serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil) click to toggle source

Serializes message into bytes that can be sent on the wire.

@param [ BSON::ByteBuffer ] buffer where the message should be inserted. @param [ Integer ] max_bson_size The maximum bson object size.

@return [ BSON::ByteBuffer ] buffer containing the serialized message.

@since 2.5.0

Calls superclass method Mongo::Protocol::Message#serialize
# File lib/mongo/protocol/msg.rb, line 125
def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil)
  super
  add_check_sum(buffer)
  buffer
end

Private Instance Methods

add_check_sum(buffer) click to toggle source
# File lib/mongo/protocol/msg.rb, line 163
def add_check_sum(buffer)
  if flags.include?(:checksum_present)
    #buffer.put_int32(checksum)
  end
end
command() click to toggle source
# File lib/mongo/protocol/msg.rb, line 150
def command
  @command ||= global_args.dup.tap do |cmd|
    cmd.delete(DATABASE_IDENTIFIER)
    sections.each do |section|
      if section[:type] == 1
        identifier = section[:payload][:identifier]
        cmd[identifier] ||= []
        cmd[identifier] += section[:payload][:sequence]
      end
    end
  end
end
global_args() click to toggle source
# File lib/mongo/protocol/msg.rb, line 169
def global_args
  @global_args ||= (sections[0] || {})
end