Commit 1e4827d9 authored by Michal 'vorner' Vaner's avatar Michal 'vorner' Vaner

[master] Ruby msgq is no longer needed

parent e909ff99
# Copyright (C) 2009 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
unless respond_to?('relative_feature') # nodoc
def require_relative(relative_feature)
c = caller.first
fail "Can't parse #{c}" unless c.rindex(/:\d+(:in `.*')?$/)
file = $`
if /\A\((.*)\)/ =~ file # eval, etc.
raise LoadError, "require_relative is called in #{$1}"
end
absolute = File.expand_path(relative_feature, File.dirname(file))
require absolute
end
end
class CC
def self.set_utf8(str) #nodoc
if str.respond_to?('force_encoding')
str.force_encoding(Encoding::UTF_8)
end
end
def self.set_binary(str) #nodoc
if str.respond_to?('force_encoding')
str.force_encoding(Encoding::BINARY)
end
end
end
require_relative 'cc/message'
require_relative 'cc/session'
if $0 == __FILE__
cc = CC::Session.new
puts "Our local name: #{cc.lname}"
cc.group_subscribe("test")
counter = 0
while counter < 10000 do
cc.group_sendmsg({ :counter => counter }, "test", "foo")
routing, data = cc.group_recvmsg(false)
counter += 1
end
end
# Copyright (C) 2009 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
class CC
class DecodeError < Exception ; end
end
class CC
class Message
PROTOCOL_VERSION = 0x536b616e
ITEM_BLOB = 0x01
ITEM_HASH = 0x02
ITEM_LIST = 0x03
ITEM_NULL = 0x04
ITEM_BOOL = 0x05
ITEM_INT = 0x06
ITEM_UTF8 = 0x08
ITEM_MASK = 0x0f
ITEM_LENGTH_32 = 0x00
ITEM_LENGTH_16 = 0x10
ITEM_LENGTH_8 = 0x20
ITEM_LENGTH_MASK = 0x30
def initialize(msg = nil)
@data = [PROTOCOL_VERSION].pack("N")
if msg.is_a?(Hash)
@data += CC::Message::encode_hash(msg)
elsif msg.is_a?(String)
@data = msg
else
raise ArgumentError, "initializer is not a Hash or String"
end
end
def to_wire
CC::set_binary(@data)
@data
end
#
# Encode a message. The item passed in should be a hash, and can contain
# any number of items of any type afterwards. All keys in the hash must
# be of type String or Symbol, and the values may be of any type. If
# the value is a Hash or Array, it will be encoded as a message type
# HASH or LIST. If it is nil, it will be encoded as NULL, and if it is
# any other type, its to_s method will be called on it and it will be
# encoded as a UTF8 item.
#
def self.to_wire(msg)
encoded = [PROTOCOL_VERSION].pack("N")
encoded += encode_hash(msg)
encoded.force_encoding('binary')
encoded
end
#
# Decode a wire format message.
#
def self.from_wire(msg)
if msg.length < 4
raise CC::DecodeError, "Data is too short to decode"
end
msg.force_encoding('binary')
version, msg = msg.unpack("N a*")
unless version == PROTOCOL_VERSION
raise CC::DecodeError, "Incorrect protocol version"
end
decode_hash(msg)
end
private
# encode a simple string with a length prefix
def self.encode_tag(tag)
tag = tag.to_s
[tag.length, tag].pack("C/a*")
end
def self.encode_length_and_type(data, type)
if data.nil?
[ITEM_NULL].pack("C")
else
len = data.length
if len < 0x00000100
[type | ITEM_LENGTH_8, len, data].pack("C C/a*")
elsif len < 0x00010000
[type | ITEM_LENGTH_16, len, data].pack("C n/a*")
else
[type | ITEM_LENGTH_32, len, data].pack("C N/a*")
end
end
end
# pack a string, including its type and length.
def self.pack_utf8(str)
encode_length_and_type(str.to_s.encode('binary'), ITEM_UTF8)
end
def self.pack_bool(bool)
encode_length_and_type(encode_bool(bool), ITEM_BOOL)
end
def self.pack_int(int)
encode_length_and_type(encode_int(int), ITEM_INT)
end
def self.pack_blob(str)
encode_length_and_type(str.to_s, ITEM_BLOB)
end
def self.pack_array(arr)
encode_length_and_type(encode_array(arr), ITEM_LIST)
end
def self.pack_hash(hash)
encode_length_and_type(encode_hash(hash), ITEM_HASH)
end
def self.encode_data(data)
str.to_s
end
def self.encode_utf8(str)
str.to_s.encode('binary')
end
def self.pack_nil
encode_length_and_type(nil, ITEM_NULL)
end
def self.encode_item(item)
case item
when nil
ret = pack_nil
when Hash
ret = pack_hash(item)
when Array
ret = pack_array(item)
when String
if item.encoding == 'utf-8'
ret = pack_utf8(item)
else
ret = pack_blob(item)
end
when FalseClass
ret = pack_bool(item)
when TrueClass
ret = pack_bool(item)
when Integer
ret = pack_int(item)
else
ret = pack_blob(item.to_s)
end
ret
end
def self.encode_hash(msg)
unless msg.is_a?Hash
raise ArgumentError, "Should be a hash"
end
buffer = ""
msg.each do |key, value|
buffer += encode_tag(key)
buffer += encode_item(value)
end
buffer
end
def self.encode_bool(msg)
unless msg.class == FalseClass or msg.class == TrueClass
raise ArgumentError, "Should be true or false"
end
if msg
[0x01].pack("C")
else
[0x00].pack("C")
end
end
def self.encode_int(int)
int.to_s.encode('binary')
end
def self.encode_array(msg)
unless msg.is_a?Array
raise ArgumentError, "Should be an array"
end
buffer = ""
msg.each do |value|
buffer += encode_item(value)
end
buffer
end
def self.decode_tag(str)
if str.length < 1
raise CC::DecodeError, "Data underrun while decoding"
end
length = str.unpack("C")[0]
if str.length - 1 < length
raise CC::DecodeError, "Data underrun while decoding"
end
tag, remainder = str.unpack("x a#{length} a*")
[ tag.encode('utf-8'), remainder ]
end
def self.decode_item(msg)
if msg.length < 1
raise CC::DecodeError, "Data underrun while decoding"
end
type_and_length_format = msg.unpack("C")[0]
type = type_and_length_format & ITEM_MASK
length_format = type_and_length_format & ITEM_LENGTH_MASK
if type == ITEM_NULL
msg = msg.unpack("x a*")[0]
else
if length_format == ITEM_LENGTH_8
if msg.length - 1 < 1
raise CC::DecodeError, "Data underrun while decoding"
end
length, msg = msg.unpack("x C a*")
elsif length_format == ITEM_LENGTH_16
if msg.length - 1 < 2
raise CC::DecodeError, "Data underrun while decoding"
end
length, msg = msg.unpack("x n a*")
elsif length_format == ITEM_LENGTH_32
if msg.length - 1 < 4
raise CC::DecodeError, "Data underrun while decoding"
end
length, msg = msg.unpack("x N a*")
end
if msg.length < length
raise CC::DecodeError, "Data underrun while decoding"
end
item, msg = msg.unpack("a#{length} a*")
end
# unpack item based on type
case type
when ITEM_BLOB
value = item
when ITEM_UTF8
value = item.encode('utf-8')
when ITEM_BOOL
value = decode_bool(item)
when ITEM_INT
value = decode_int(item)
when ITEM_HASH
value = decode_hash(item)
when ITEM_LIST
value = decode_array(item)
when ITEM_NULL
value = nil
else
raise CC::DecodeError, "Unknown item type in decode: #{type}"
end
[value, msg]
end
def self.decode_bool(msg)
return msg == [0x01].pack("C")
end
def self.decode_int(msg)
return Integer(msg.encode('utf-8'))
end
def self.decode_hash(msg)
ret = {}
while msg.length > 0
tag, msg = decode_tag(msg)
value, msg = decode_item(msg)
ret[tag] = value
end
ret
end
def self.decode_array(msg)
ret = []
while msg.length > 0
value, msg = decode_item(msg)
ret << value
end
ret
end
end # class Message
end # class CC
if $0 == __FILE__
target = {
"from" => "sender@host",
"to" => "recipient@host",
"seq" => 1234,
"data" => {
"list" => [ 1, 2, nil, true, false, "this" ],
"description" => "Fun for all",
},
}
wire = CC::Message.to_wire(target)
puts wire.inspect
puts CC::Message.from_wire(wire).inspect
end
# Copyright (C) 2009 Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
require 'socket'
class CC
class ProtocolError < Exception ; end
end
class CC
class Session
attr_reader :socket
attr_reader :lname
#
# :host => host to connect to (defaults to "127.0.0.1")
# :port => port to connect to (defaults to 9913)
#
def initialize(args = {})
@socket = nil # TCP socket.
@lname = nil # local name, or nil if not connected.
@recvbuffer = "" # data buffer for partial reads.
@recvlength = nil # if non-nil, we have a length to fill buffer to.
@sendbuffer = "" # pending output data.
@sequence = "a" # per message sequence id, always unique
options = {
:host => "127.0.0.1",
:port => 9912
}.merge(args)
@socket = TCPSocket.new(options[:host], options[:port])
#
# Get our local name.
#
sendmsg({ :type => :getlname })
msg = recvmsg(false)
@lname = msg["lname"]
if @lname.nil?
raise CC::ProtocolError, "Could not get local name"
end
CC::set_utf8(@lname)
end
#
# Send a message to the controller. The item to send can either be a
# CC::Message object, or a Hash. If a Hash, it will be internally
# converted to a CC::Message before transmitting.
#
# A return value of true means the entire message was not
# transmitted, and a call to send_pending will have to be
# made to send remaining data. This should only happen when
# the socket is in non-blocking mode.
#
def sendmsg(msg)
if msg.is_a?(Hash)
msg = CC::Message.new(msg)
end
unless msg.is_a?(CC::Message)
raise ArgumentError, "msg is not a CC::Message or a Hash"
end
wire = msg.to_wire
@sendbuffer << [wire.length].pack("N")
@sendbuffer << wire
send_pending
end
#
# Send as much data as we can.
def send_pending
return false if @sendbuffer.length == 0
sent = @socket.send(@sendbuffer, 0)
@sendbuffer = @sendbuffer[sent .. -1]
@sendbuffer.length == 0 ? true : false
end
def recvmsg(nonblock = true)
data = receive_full_buffer(nonblock)
if data
CC::Message::from_wire(data)
else
nil
end
end
def group_subscribe(group, instance = "*", subtype = "normal")
sendmsg({ :type => "subscribe",
:group => group,
:instance => instance,
:subtype => subtype,
})
end
def group_unsubscribe(group, instance = "*")
sendmsg({ :type => "unsubscribe",
:group => group,
:instance => instance,
})
end
def group_sendmsg(msg, group, instance = "*", to = "*")
seq = next_sequence
sendmsg({ :type => "send",
:from => @lname,
:to => to,
:group => group,
:instance => instance,
:seq => seq,
:msg => CC::Message.to_wire(msg),
})
seq
end
def group_replymsg(routing, msg)
seq = next_sequence
sendmsg({ :type => "send",
:from => @lname,
:to => routing["from"],
:group => routing["group"],
:instance => routing["instance"],
:seq => seq,
:reply => routing["seq"],
:msg => CC::Message.to_wire(msg),
})
seq
end
def group_recvmsg(nonblock = true)
msg = recvmsg(nonblock)
return nil if msg.nil?
data = CC::Message::from_wire(msg["msg"])
msg.delete("msg")
return [data, msg]
end
private
def next_sequence
@sequence.next!
end
#
# A rather tricky function. If we have something waiting in our buffer,
# and it will satisfy the current request, we will read it all in. If
# not, we will read only what we need to satisfy a single message.
#
def receive_full_buffer(nonblock)
# read the length prefix if we need it still.
if @recvlength.nil?
length = 4
length -= @recvbuffer.length
data = nil
begin
if nonblock
data = @socket.recv_nonblock(length)
else
data = @socket.recv(length)
end
rescue Errno::EINPROGRESS
rescue Errno::EAGAIN
end
return nil if data == nil
@recvbuffer += data
return nil if @recvbuffer.length < 4
@recvlength = @recvbuffer.unpack('N')[0]
@recvbuffer = ""
CC::set_binary(@recvbuffer)
end
#
# we have a length target. Loop reading data until we get enough to
# fill our buffer.
#
length = @recvlength - @recvbuffer.length
while (length > 0) do
data = nil
begin
if nonblock
data = @socket.recv_nonblock(length)
else
data = @socket.recv(length)
end
rescue Errno::EINPROGRESS
rescue Errno::EAGAIN
end
return nil if data == 0 # blocking I/O
@recvbuffer += data
length -= data.length
end
data = @recvbuffer
@recvbuffer = ""
@recvlength = nil
data
end
end # class Session
end # class CC
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment