Module | Mongo::Networking |
In: |
lib/mongo/networking.rb
|
STANDARD_HEADER_SIZE | = | 16 |
RESPONSE_HEADER_SIZE | = | 20 |
BINARY_ENCODING | = | Encoding.find("binary") |
Sends a message to the database and waits for the response.
@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database. @param [String] log_message this is currently a no-op and will be removed. @param [Socket] socket a socket to use in lieu of checking out a new one. @param [Boolean] command (false) indicate whether this is a command. If this is a command,
the message will be sent to the primary node.
@param [Boolean] command (false) indicate whether the cursor should be exhausted. Set
this to true only when the OP_QUERY_EXHAUST flag is set.
@return [Array]
An array whose indexes include [0] documents returned, [1] number of document received, and [3] a cursor_id.
# File lib/mongo/networking.rb, line 112 def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary, exhaust=false) request_id = add_message_headers(message, operation) packed_message = message.to_s result = '' sock = nil begin if socket sock = socket should_checkin = false else if command || read == :primary sock = checkout_writer elsif read == :secondary sock = checkout_reader else sock = checkout_tagged(read) end should_checkin = true end send_message_on_socket(packed_message, sock) result = receive(sock, request_id, exhaust) rescue SystemStackError, NoMemoryError, SystemCallError => ex close raise ex ensure if should_checkin if command || read == :primary checkin_writer(sock) elsif read == :secondary checkin_reader(sock) else # TODO: sock = checkout_tagged(read) end end end result end
Send a message to MongoDB, adding the necessary headers.
@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database.
@option opts [Symbol] :connection (:writer) The connection to which
this message should be sent. Valid options are :writer and :reader.
@return [Integer] number of bytes sent
# File lib/mongo/networking.rb, line 19 def send_message(operation, message, opts={}) if opts.is_a?(String) warn "Connection#send_message no longer takes a string log message. " + "Logging is now handled within the Collection and Cursor classes." opts = {} end connection = opts.fetch(:connection, :writer) add_message_headers(message, operation) packed_message = message.to_s sock = nil begin if connection == :writer sock = checkout_writer else sock = checkout_reader end send_message_on_socket(packed_message, sock) rescue SystemStackError, NoMemoryError, SystemCallError => ex close raise ex ensure if sock if connection == :writer checkin_writer(sock) else checkin_reader(sock) end end end end
Sends a message to the database, waits for a response, and raises an exception if the operation has failed.
@param [Integer] operation a MongoDB opcode. @param [BSON::ByteBuffer] message a message to send to the database. @param [String] db_name the name of the database. used on call to get_last_error. @param [Hash] last_error_params parameters to be sent to getLastError. See DB#error for
available options.
@see DB#get_last_error for valid last error params.
@return [Hash] The document returned by the call to getlasterror.
# File lib/mongo/networking.rb, line 66 def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false) docs = num_received = cursor_id = '' add_message_headers(message, operation) last_error_message = BSON::ByteBuffer.new build_last_error_message(last_error_message, db_name, last_error_params) last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY) packed_message = message.append!(last_error_message).to_s sock = nil begin sock = checkout_writer send_message_on_socket(packed_message, sock) docs, num_received, cursor_id = receive(sock, last_error_id) checkin_writer(sock) rescue ConnectionFailure, OperationFailure, OperationTimeout => ex checkin_writer(sock) raise ex rescue SystemStackError, NoMemoryError, SystemCallError => ex close raise ex end if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg']) close if error == "not master" error = "wtimeout" if error == "timeout" raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0]) end docs[0] end