session.py 6.84 KB
Newer Older
Michael Graff's avatar
Michael Graff committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (C) 2009  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys
import socket
import struct
Michael Graff's avatar
Michael Graff committed
19
import os
Michael Graff's avatar
Michael Graff committed
20

21
import isc.cc.message
Michael Graff's avatar
Michael Graff committed
22 23

class ProtocolError(Exception): pass
Jelte Jansen's avatar
Jelte Jansen committed
24
class NetworkError(Exception): pass
25
class SessionError(Exception): pass
Michael Graff's avatar
Michael Graff committed
26 27

class Session:
Michael Graff's avatar
Michael Graff committed
28
    def __init__(self, port=0):
Michael Graff's avatar
Michael Graff committed
29 30
        self._socket = None
        self._lname = None
Shane Kerr's avatar
Shane Kerr committed
31
        self._recvbuffer = bytearray()
Michael Graff's avatar
Michael Graff committed
32
        self._recvlength = 0
Michael Graff's avatar
Michael Graff committed
33
        self._sequence = 1
34
        self._closed = False
35
        self._queue = []
Michael Graff's avatar
Michael Graff committed
36

Michael Graff's avatar
Michael Graff committed
37
        if port == 0:
38 39 40 41
	        if 'B10_FROM_SOURCE' in os.environ:
	            port = int(os.environ["ISC_MSGQ_PORT"])
	        else:
	            port = 9912
Michael Graff's avatar
Michael Graff committed
42

43 44
        try:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45
            self._socket.connect(tuple(['127.0.0.1', port]))
Michael Graff's avatar
Michael Graff committed
46

47
            self.sendmsg({ "type": "getlname" })
48
            env, msg = self.recvmsg(False)
49 50
            if not env:
                raise ProtocolError("Could not get local name")
51 52 53
            self._lname = msg["lname"]
            if not self._lname:
                raise ProtocolError("Could not get local name")
Shane Kerr's avatar
Shane Kerr committed
54
        except socket.error as se:
55
                raise SessionError(se)
Michael Graff's avatar
Michael Graff committed
56 57 58 59 60

    @property
    def lname(self):
        return self._lname

61 62 63 64 65
    def close(self):
        self._socket.close()
        self._lname = None
        self._closed = True

66
    def sendmsg(self, env, msg = None):
67 68
        XXmsg = msg
        XXenv = env
69 70
        if self._closed:
            raise SessionError("Session has been closed.")
71
        if type(env) == dict:
72
            env = isc.cc.message.to_wire(env)
Michael Graff's avatar
Michael Graff committed
73
        if type(msg) == dict:
74
            msg = isc.cc.message.to_wire(msg)
Michael Graff's avatar
Michael Graff committed
75
        self._socket.setblocking(1)
76 77 78 79 80 81 82 83
        length = 2 + len(env);
        if msg:
            length += len(msg)
        self._socket.send(struct.pack("!I", length))
        self._socket.send(struct.pack("!H", len(env)))
        self._socket.send(env)
        if msg:
            self._socket.send(msg)
Michael Graff's avatar
Michael Graff committed
84

85
    def recvmsg(self, nonblock = True, seq = None):
86
        #print("[XX] queue len: " + str(len(self._queue)))
87 88
        if len(self._queue) > 0:
            if seq == None:
89 90
                #print("[XX] return first")
                return self._queue.pop(0)
91 92
            else:
                i = 0;
93 94
                #print("[XX] check rest")
                for env, msg in self._queue:
95
                    if "reply" in env and seq == env["reply"]:
96
                        return self._queue.pop(i)
97 98
                    else:
                        i = i + 1
99
                #print("[XX] not found")
100 101
        if self._closed:
            raise SessionError("Session has been closed.")
Michael Graff's avatar
Michael Graff committed
102
        data = self._receive_full_buffer(nonblock)
103 104 105 106
        if data and len(data) > 2:
            header_length = struct.unpack('>H', data[0:2])[0]
            data_length = len(data) - 2 - header_length
            if data_length > 0:
107 108 109 110 111 112
                env = isc.cc.message.from_wire(data[2:header_length+2])
                msg = isc.cc.message.from_wire(data[header_length + 2:])
                if seq == None or "reply" in env and seq == env["reply"]:
                    return env, msg
                else:
                    self._queue.append((env,msg))
113
                    return self.recvmsg(nonblock, seq)
114
            else:
115
                return isc.cc.message.from_wire(data[2:header_length+2]), None
116
        return None, None
Michael Graff's avatar
Michael Graff committed
117 118 119 120 121 122 123

    def _receive_full_buffer(self, nonblock):
        if nonblock:
            self._socket.setblocking(0)
        else:
            self._socket.setblocking(1)

Michael Graff's avatar
Michael Graff committed
124
        if self._recvlength == 0:
Michael Graff's avatar
Michael Graff committed
125 126 127 128 129 130
            length = 4
            length -= len(self._recvbuffer)
            try:
                data = self._socket.recv(length)
            except:
                return None
Jelte Jansen's avatar
Jelte Jansen committed
131 132
            if data == "": # server closed connection
                raise ProtocolError("Read of 0 bytes: connection closed")
Michael Graff's avatar
Michael Graff committed
133 134 135 136 137

            self._recvbuffer += data
            if len(self._recvbuffer) < 4:
                return None
            self._recvlength = struct.unpack('>I', self._recvbuffer)[0]
Shane Kerr's avatar
Shane Kerr committed
138
            self._recvbuffer = bytearray()
Michael Graff's avatar
Michael Graff committed
139 140 141

        length = self._recvlength - len(self._recvbuffer)
        while (length > 0):
Jelte Jansen's avatar
Jelte Jansen committed
142 143 144 145 146 147
            try:
                data = self._socket.recv(length)
            except:
                return None
            if data == "": # server closed connection
                raise ProtocolError("Read of 0 bytes: connection closed")
Michael Graff's avatar
Michael Graff committed
148 149 150
            self._recvbuffer += data
            length -= len(data)
        data = self._recvbuffer
Shane Kerr's avatar
Shane Kerr committed
151
        self._recvbuffer = bytearray()
Michael Graff's avatar
Michael Graff committed
152
        self._recvlength = 0
Michael Graff's avatar
Michael Graff committed
153 154 155 156 157 158
        return (data)

    def _next_sequence(self):
        self._sequence += 1
        return self._sequence

159
    def group_subscribe(self, group, instance = "*"):
Michael Graff's avatar
Michael Graff committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173
        self.sendmsg({
            "type": "subscribe",
            "group": group,
            "instance": instance,
        })

    def group_unsubscribe(self, group, instance = "*"):
        self.sendmsg({
            "type": "unsubscribe",
            "group": group,
            "instance": instance,
        })

    def group_sendmsg(self, msg, group, instance = "*", to = "*"):
Jelte Jansen's avatar
Jelte Jansen committed
174
        seq = self._next_sequence()
Michael Graff's avatar
Michael Graff committed
175 176 177 178 179 180
        self.sendmsg({
            "type": "send",
            "from": self._lname,
            "to": to,
            "group": group,
            "instance": instance,
Jelte Jansen's avatar
Jelte Jansen committed
181
            "seq": seq,
182
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
183
        return seq
Michael Graff's avatar
Michael Graff committed
184

185 186
    def group_recvmsg(self, nonblock = True, seq = None):
        env, msg  = self.recvmsg(nonblock, seq)
Jelte Jansen's avatar
Jelte Jansen committed
187
        if env == None:
188 189 190
            # return none twice to match normal return value
            # (so caller won't get a type error on no data)
            return (None, None)
Jelte Jansen's avatar
Jelte Jansen committed
191
        return (msg, env)
Michael Graff's avatar
Michael Graff committed
192

Jelte Jansen's avatar
Jelte Jansen committed
193 194 195 196 197 198 199 200 201 202
    def group_reply(self, routing, msg):
        seq = self._next_sequence()
        self.sendmsg({
            "type": "send",
            "from": self._lname,
            "to": routing["from"],
            "group": routing["group"],
            "instance": routing["instance"],
            "seq": seq,
            "reply": routing["seq"],
203
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
204 205
        return seq

Michael Graff's avatar
Michael Graff committed
206 207 208
if __name__ == "__main__":
    import doctest
    doctest.testmod()