session.py 9.83 KB
Newer Older
Michael Graff's avatar
Michael Graff committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (C) 2009  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys
import socket
import struct
19
import errno
Michael Graff's avatar
Michael Graff committed
20
import os
Jelte Jansen's avatar
Jelte Jansen committed
21
import threading
22
import bind10_config
Michael Graff's avatar
Michael Graff committed
23

24
import isc.cc.message
Michael Graff's avatar
Michael Graff committed
25 26

class ProtocolError(Exception): pass
Jelte Jansen's avatar
Jelte Jansen committed
27
class NetworkError(Exception): pass
28
class SessionError(Exception): pass
29
class SessionTimeout(Exception): pass
Michael Graff's avatar
Michael Graff committed
30 31

class Session:
32 33
    MSGQ_DEFAULT_TIMEOUT = 4000
    
34
    def __init__(self, socket_file=None):
Michael Graff's avatar
Michael Graff committed
35 36 37
        self._socket = None
        self._lname = None
        self._sequence = 1
38
        self._closed = False
39
        self._queue = []
Jelte Jansen's avatar
Jelte Jansen committed
40
        self._lock = threading.RLock()
41
        self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
Jelte Jansen's avatar
update  
Jelte Jansen committed
42 43
        self._recv_len_size = 0
        self._recv_size = 0
JINMEI Tatuya's avatar
JINMEI Tatuya committed
44

45 46 47 48
        if socket_file is None:
            if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
                self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
            else:
49
                self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
50 51
        else:
            self.socket_file = socket_file
Michael Graff's avatar
Michael Graff committed
52

53
        try:
54 55
            self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self._socket.connect(self.socket_file)
56
            self.sendmsg({ "type": "getlname" })
57
            env, msg = self.recvmsg(False)
58 59
            if not env:
                raise ProtocolError("Could not get local name")
60 61 62
            self._lname = msg["lname"]
            if not self._lname:
                raise ProtocolError("Could not get local name")
Shane Kerr's avatar
Shane Kerr committed
63
        except socket.error as se:
64
                raise SessionError(se)
Michael Graff's avatar
Michael Graff committed
65 66 67 68 69

    @property
    def lname(self):
        return self._lname

70 71 72 73 74
    def close(self):
        self._socket.close()
        self._lname = None
        self._closed = True

75
    def sendmsg(self, env, msg = None):
Jelte Jansen's avatar
Jelte Jansen committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
        with self._lock:
            if self._closed:
                raise SessionError("Session has been closed.")
            if type(env) == dict:
                env = isc.cc.message.to_wire(env)
            if type(msg) == dict:
                msg = isc.cc.message.to_wire(msg)
            self._socket.setblocking(1)
            length = 2 + len(env);
            if msg:
                length += len(msg)
            self._socket.send(struct.pack("!I", length))
            self._socket.send(struct.pack("!H", len(env)))
            self._socket.send(env)
            if msg:
                self._socket.send(msg)
Michael Graff's avatar
Michael Graff committed
92

93
    def recvmsg(self, nonblock = True, seq = None):
Jelte Jansen's avatar
Jelte Jansen committed
94 95
        with self._lock:
            if len(self._queue) > 0:
96
                i = 0;
97
                for env, msg in self._queue:
Jelte Jansen's avatar
Jelte Jansen committed
98 99 100
                    if seq != None and "reply" in env and seq == env["reply"]:
                        return self._queue.pop(i)
                    elif seq == None and "reply" not in env:
101
                        return self._queue.pop(i)
102 103
                    else:
                        i = i + 1
Jelte Jansen's avatar
Jelte Jansen committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
            if self._closed:
                raise SessionError("Session has been closed.")
            data = self._receive_full_buffer(nonblock)
            if data and len(data) > 2:
                header_length = struct.unpack('>H', data[0:2])[0]
                data_length = len(data) - 2 - header_length
                if data_length > 0:
                    env = isc.cc.message.from_wire(data[2:header_length+2])
                    msg = isc.cc.message.from_wire(data[header_length + 2:])
                    if (seq == None and "reply" not in env) or (seq != None and "reply" in env and seq == env["reply"]):
                        return env, msg
                    else:
                        tmp = None
                        if "reply" in env:
                            tmp = env["reply"]
                        self._queue.append((env,msg))
                        return self.recvmsg(nonblock, seq)
121
                else:
Jelte Jansen's avatar
Jelte Jansen committed
122 123
                    return isc.cc.message.from_wire(data[2:header_length+2]), None
            return None, None
Michael Graff's avatar
Michael Graff committed
124

Jelte Jansen's avatar
update  
Jelte Jansen committed
125 126 127 128 129 130 131 132 133
    def _receive_bytes(self, size):
        """Try to get size bytes of data from the socket.
           Raises a ProtocolError if the size is 0.
           Raises any error from recv().
           Returns whatever data was available (if >0 bytes).
           """
        data = self._socket.recv(size)
        if len(data) == 0: # server closed connection
            raise ProtocolError("Read of 0 bytes: connection closed")
134
        return data
Jelte Jansen's avatar
update  
Jelte Jansen committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
        
    def _receive_len_data(self):
        """Reads self._recv_len_size bytes of data from the socket into
           self._recv_len_data
           This is done through class variables so in the case of
           an EAGAIN we can continue on a subsequent call.
           Raises a ProtocolError, a socket.error (which may be
           timeout or eagain), or reads until we have all data we need.
           """
        while self._recv_len_size > 0:
            new_data = self._receive_bytes(self._recv_len_size)
            self._recv_len_data += new_data
            self._recv_len_size -= len(new_data)

    def _receive_data(self):
        """Reads self._recv_size bytes of data from the socket into
           self._recv_data.
           This is done through class variables so in the case of
           an EAGAIN we can continue on a subsequent call.
           Raises a ProtocolError, a socket.error (which may be
           timeout or eagain), or reads until we have all data we need.
        """
        while self._recv_size > 0:
            new_data = self._receive_bytes(self._recv_size)
            self._recv_data += new_data
            self._recv_size -= len(new_data)
161

Michael Graff's avatar
Michael Graff committed
162 163 164 165 166
    def _receive_full_buffer(self, nonblock):
        if nonblock:
            self._socket.setblocking(0)
        else:
            self._socket.setblocking(1)
167 168 169 170
            if self._socket_timeout == 0.0:
                self._socket.settimeout(None)
            else:
                self._socket.settimeout(self._socket_timeout)
Michael Graff's avatar
Michael Graff committed
171

172
        try:
Jelte Jansen's avatar
update  
Jelte Jansen committed
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
            # we might be in a call following an EAGAIN, in which case
            # we simply continue. In the first case, either
            # recv_size or recv_len size are not zero
            if self._recv_size == 0:
                if self._recv_len_size == 0:
                    # both zero, start a new full read
                    self._recv_len_size = 4
                    self._recv_len_data = bytearray()
                self._receive_len_data()

                self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
                self._recv_data = bytearray()
            self._receive_data()

            # no EAGAIN, so copy data and reset internal counters
            data = self._recv_data

            self._recv_len_size = 0
            self._recv_size = 0

193
            return (data)
Jelte Jansen's avatar
update  
Jelte Jansen committed
194

195 196 197
        except socket.timeout:
            raise SessionTimeout("recv() on cc session timed out")
        except socket.error as se:
Jelte Jansen's avatar
update  
Jelte Jansen committed
198 199 200 201 202 203 204 205 206
            # Only keep data in case of EAGAIN
            if se.errno == errno.EAGAIN:
                return None
            # unknown state otherwise, best to drop data
            self._recv_len_size = 0
            self._recv_size = 0
            # ctrl-c can result in EINTR, return None to prevent
            # stacktrace output
            if se.errno == errno.EINTR:
Jelte Jansen's avatar
Jelte Jansen committed
207
                return None
208
            raise se
Michael Graff's avatar
Michael Graff committed
209 210 211 212 213

    def _next_sequence(self):
        self._sequence += 1
        return self._sequence

214
    def group_subscribe(self, group, instance = "*"):
Michael Graff's avatar
Michael Graff committed
215 216 217 218 219 220 221 222 223 224 225 226 227 228
        self.sendmsg({
            "type": "subscribe",
            "group": group,
            "instance": instance,
        })

    def group_unsubscribe(self, group, instance = "*"):
        self.sendmsg({
            "type": "unsubscribe",
            "group": group,
            "instance": instance,
        })

    def group_sendmsg(self, msg, group, instance = "*", to = "*"):
Jelte Jansen's avatar
Jelte Jansen committed
229
        seq = self._next_sequence()
Michael Graff's avatar
Michael Graff committed
230 231 232 233 234 235
        self.sendmsg({
            "type": "send",
            "from": self._lname,
            "to": to,
            "group": group,
            "instance": instance,
Jelte Jansen's avatar
Jelte Jansen committed
236
            "seq": seq,
237
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
238
        return seq
Michael Graff's avatar
Michael Graff committed
239

Jelte Jansen's avatar
Jelte Jansen committed
240 241 242
    def has_queued_msgs(self):
        return len(self._queue) > 0

243 244
    def group_recvmsg(self, nonblock = True, seq = None):
        env, msg  = self.recvmsg(nonblock, seq)
Jelte Jansen's avatar
Jelte Jansen committed
245
        if env == None:
246 247 248
            # return none twice to match normal return value
            # (so caller won't get a type error on no data)
            return (None, None)
Jelte Jansen's avatar
Jelte Jansen committed
249
        return (msg, env)
Michael Graff's avatar
Michael Graff committed
250

Jelte Jansen's avatar
Jelte Jansen committed
251 252 253 254 255 256 257 258 259 260
    def group_reply(self, routing, msg):
        seq = self._next_sequence()
        self.sendmsg({
            "type": "send",
            "from": self._lname,
            "to": routing["from"],
            "group": routing["group"],
            "instance": routing["instance"],
            "seq": seq,
            "reply": routing["seq"],
261
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
262 263
        return seq

264 265 266 267 268 269 270 271 272
    def set_timeout(self, milliseconds):
        """Sets the socket timeout for blocking reads to the given
           number of milliseconds"""
        self._socket_timeout = milliseconds / 1000.0

    def get_timeout(self):
        """Returns the current timeout for blocking reads (in milliseconds)"""
        return self._socket_timeout * 1000.0

Michael Graff's avatar
Michael Graff committed
273 274 275
if __name__ == "__main__":
    import doctest
    doctest.testmod()