session.py 12.8 KB
Newer Older
Michael Graff's avatar
Michael Graff committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright (C) 2009  Internet Systems Consortium.
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

import sys
import socket
import struct
19
import errno
Michael Graff's avatar
Michael Graff committed
20
import os
Jelte Jansen's avatar
Jelte Jansen committed
21
import threading
22
import bind10_config
Michael Graff's avatar
Michael Graff committed
23

24
import isc.cc.message
25
26
27
import isc.log
from isc.cc.logger import logger
from isc.log_messages.pycc_messages import *
28
from isc.cc.proto_defs import *
Michael Graff's avatar
Michael Graff committed
29
30

class ProtocolError(Exception): pass
Jelte Jansen's avatar
Jelte Jansen committed
31
class NetworkError(Exception): pass
32
class SessionError(Exception): pass
33
class SessionTimeout(Exception): pass
Michael Graff's avatar
Michael Graff committed
34
35

class Session:
36
    MSGQ_DEFAULT_TIMEOUT = 4000
37

38
    def __init__(self, socket_file=None):
Michael Graff's avatar
Michael Graff committed
39
40
41
        self._socket = None
        self._lname = None
        self._sequence = 1
42
        self._closed = False
43
        self._queue = []
Jelte Jansen's avatar
Jelte Jansen committed
44
        self._lock = threading.RLock()
45
        self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
Jelte Jansen's avatar
update    
Jelte Jansen committed
46
47
        self._recv_len_size = 0
        self._recv_size = 0
JINMEI Tatuya's avatar
JINMEI Tatuya committed
48

49
50
51
52
        if socket_file is None:
            if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
                self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
            else:
53
                self.socket_file = bind10_config.BIND10_MSGQ_SOCKET_FILE
54
55
        else:
            self.socket_file = socket_file
Michael Graff's avatar
Michael Graff committed
56

57
        try:
58
59
            self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self._socket.connect(self.socket_file)
60
            self.sendmsg({ CC_HEADER_TYPE: CC_COMMAND_GET_LNAME })
61
            env, msg = self.recvmsg(False)
62
63
            if not env:
                raise ProtocolError("Could not get local name")
64
            self._lname = msg[CC_PAYLOAD_LNAME]
65
66
            if not self._lname:
                raise ProtocolError("Could not get local name")
67
68
            logger.debug(logger.DBGLVL_TRACE_BASIC, PYCC_LNAME_RECEIVED,
                         self._lname)
Shane Kerr's avatar
Shane Kerr committed
69
        except socket.error as se:
70
71
72
            if self._socket:
                self._socket.close()
            raise SessionError(se)
Michael Graff's avatar
Michael Graff committed
73
74
75
76
77

    @property
    def lname(self):
        return self._lname

78
79
80
81
82
    def close(self):
        self._socket.close()
        self._lname = None
        self._closed = True

83
    def sendmsg(self, env, msg=None):
Jelte Jansen's avatar
Jelte Jansen committed
84
85
86
87
88
        with self._lock:
            if self._closed:
                raise SessionError("Session has been closed.")
            if type(env) == dict:
                env = isc.cc.message.to_wire(env)
89
90
            if len(env) > 65535:
                raise ProtocolError("Envelope too large")
Jelte Jansen's avatar
Jelte Jansen committed
91
92
93
            if type(msg) == dict:
                msg = isc.cc.message.to_wire(msg)
            length = 2 + len(env);
94
            if msg is not None:
Jelte Jansen's avatar
Jelte Jansen committed
95
                length += len(msg)
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

            # Build entire message.
            data = struct.pack("!I", length)
            data += struct.pack("!H", len(env))
            data += env
            if msg is not None:
                data += msg

            # Send it in the blocking mode.  On some systems send() may
            # actually send only part of the data, so we need to repeat it
            # until all data have been sent out.
            self._socket.setblocking(1)
            while len(data) > 0:
                cc = self._socket.send(data)
                data = data[cc:]
Michael Graff's avatar
Michael Graff committed
111

112
    def recvmsg(self, nonblock = True, seq = None):
113
114
115
116
117
118
119
120
121
122
123
124
125
        """Reads a message. If nonblock is true, and there is no
           message to read, it returns (None, None).
           If seq is not None, it should be a value as returned by
           group_sendmsg(), in which case only the response to
           that message is returned, and others will be queued until
           the next call to this method.
           If seq is None, only messages that are *not* responses
           will be returned, and responses will be queued.
           The queue is checked for relevant messages before data
           is read from the socket.
           Raises a SessionError if there is a JSON decode problem in
           the message that is read, or if the session has been closed
           prior to the call of recvmsg()"""
Jelte Jansen's avatar
Jelte Jansen committed
126
127
        with self._lock:
            if len(self._queue) > 0:
128
                i = 0;
129
                for env, msg in self._queue:
130
131
                    if seq != None and CC_HEADER_REPLY in env and \
                        seq == env[CC_HEADER_REPLY]:
Jelte Jansen's avatar
Jelte Jansen committed
132
                        return self._queue.pop(i)
133
                    elif seq == None and CC_HEADER_REPLY not in env:
134
                        return self._queue.pop(i)
135
136
                    else:
                        i = i + 1
Jelte Jansen's avatar
Jelte Jansen committed
137
138
139
140
141
142
            if self._closed:
                raise SessionError("Session has been closed.")
            data = self._receive_full_buffer(nonblock)
            if data and len(data) > 2:
                header_length = struct.unpack('>H', data[0:2])[0]
                data_length = len(data) - 2 - header_length
143
144
145
146
                try:
                    if data_length > 0:
                        env = isc.cc.message.from_wire(data[2:header_length+2])
                        msg = isc.cc.message.from_wire(data[header_length + 2:])
147
148
149
                        if (seq == None and CC_HEADER_REPLY not in env) or \
                            (seq != None and CC_HEADER_REPLY in env and
                             seq == env[CC_HEADER_REPLY]):
150
151
152
153
                            return env, msg
                        else:
                            self._queue.append((env,msg))
                            return self.recvmsg(nonblock, seq)
Jelte Jansen's avatar
Jelte Jansen committed
154
                    else:
155
156
157
158
159
160
                        return isc.cc.message.from_wire(data[2:header_length+2]), None
                except ValueError as ve:
                    # TODO: when we have logging here, add a debug
                    # message printing the data that we were unable
                    # to parse as JSON
                    raise SessionError(ve)
Jelte Jansen's avatar
Jelte Jansen committed
161
            return None, None
Michael Graff's avatar
Michael Graff committed
162

Jelte Jansen's avatar
update    
Jelte Jansen committed
163
164
165
166
167
168
169
170
171
    def _receive_bytes(self, size):
        """Try to get size bytes of data from the socket.
           Raises a ProtocolError if the size is 0.
           Raises any error from recv().
           Returns whatever data was available (if >0 bytes).
           """
        data = self._socket.recv(size)
        if len(data) == 0: # server closed connection
            raise ProtocolError("Read of 0 bytes: connection closed")
172
        return data
173

Jelte Jansen's avatar
update    
Jelte Jansen committed
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
    def _receive_len_data(self):
        """Reads self._recv_len_size bytes of data from the socket into
           self._recv_len_data
           This is done through class variables so in the case of
           an EAGAIN we can continue on a subsequent call.
           Raises a ProtocolError, a socket.error (which may be
           timeout or eagain), or reads until we have all data we need.
           """
        while self._recv_len_size > 0:
            new_data = self._receive_bytes(self._recv_len_size)
            self._recv_len_data += new_data
            self._recv_len_size -= len(new_data)

    def _receive_data(self):
        """Reads self._recv_size bytes of data from the socket into
           self._recv_data.
           This is done through class variables so in the case of
           an EAGAIN we can continue on a subsequent call.
           Raises a ProtocolError, a socket.error (which may be
           timeout or eagain), or reads until we have all data we need.
        """
        while self._recv_size > 0:
            new_data = self._receive_bytes(self._recv_size)
            self._recv_data += new_data
            self._recv_size -= len(new_data)
199

Michael Graff's avatar
Michael Graff committed
200
201
202
203
204
    def _receive_full_buffer(self, nonblock):
        if nonblock:
            self._socket.setblocking(0)
        else:
            self._socket.setblocking(1)
205
206
207
208
            if self._socket_timeout == 0.0:
                self._socket.settimeout(None)
            else:
                self._socket.settimeout(self._socket_timeout)
Michael Graff's avatar
Michael Graff committed
209

210
        try:
Jelte Jansen's avatar
update    
Jelte Jansen committed
211
212
213
            # we might be in a call following an EAGAIN, in which case
            # we simply continue. In the first case, either
            # recv_size or recv_len size are not zero
214
215
216
            # they may never both be non-zero (we are either starting
            # a full read, or continuing one of the reads
            assert self._recv_size == 0 or self._recv_len_size == 0
217

Jelte Jansen's avatar
update    
Jelte Jansen committed
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
            if self._recv_size == 0:
                if self._recv_len_size == 0:
                    # both zero, start a new full read
                    self._recv_len_size = 4
                    self._recv_len_data = bytearray()
                self._receive_len_data()

                self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
                self._recv_data = bytearray()
            self._receive_data()

            # no EAGAIN, so copy data and reset internal counters
            data = self._recv_data

            self._recv_len_size = 0
            self._recv_size = 0

235
            return (data)
Jelte Jansen's avatar
update    
Jelte Jansen committed
236

237
238
239
        except socket.timeout:
            raise SessionTimeout("recv() on cc session timed out")
        except socket.error as se:
Jelte Jansen's avatar
update    
Jelte Jansen committed
240
241
242
243
244
245
246
247
248
            # Only keep data in case of EAGAIN
            if se.errno == errno.EAGAIN:
                return None
            # unknown state otherwise, best to drop data
            self._recv_len_size = 0
            self._recv_size = 0
            # ctrl-c can result in EINTR, return None to prevent
            # stacktrace output
            if se.errno == errno.EINTR:
Jelte Jansen's avatar
Jelte Jansen committed
249
                return None
250
            raise se
Michael Graff's avatar
Michael Graff committed
251
252
253
254
255

    def _next_sequence(self):
        self._sequence += 1
        return self._sequence

256
    def group_subscribe(self, group, instance=CC_INSTANCE_WILDCARD):
Michael Graff's avatar
Michael Graff committed
257
        self.sendmsg({
258
259
260
            CC_HEADER_TYPE: CC_COMMAND_SUBSCRIBE,
            CC_HEADER_GROUP: group,
            CC_HEADER_INSTANCE: instance,
Michael Graff's avatar
Michael Graff committed
261
262
        })

263
    def group_unsubscribe(self, group, instance=CC_INSTANCE_WILDCARD):
Michael Graff's avatar
Michael Graff committed
264
        self.sendmsg({
265
266
267
            CC_HEADER_TYPE: CC_COMMAND_UNSUBSCRIBE,
            CC_HEADER_GROUP: group,
            CC_HEADER_INSTANCE: instance,
Michael Graff's avatar
Michael Graff committed
268
269
        })

270
271
    def group_sendmsg(self, msg, group, instance=CC_INSTANCE_WILDCARD,
                      to=CC_TO_WILDCARD, want_answer=False):
272
273
274
275
276
277
278
279
280
281
        '''
        Send a message over the CC session.

        Parameters:
        - msg The message to send, encoded as python structures (dicts,
          lists, etc).
        - group The recipient group to send to.
        - instance Instance in the group.
        - to Direct recipient (overrides the above, should contain the
          lname of the recipient).
282
283
284
        - want_answer If an answer is requested. If there's no recipient
          and this is true, the message queue would send an error message
          instead of the answer.
285
286
287
288
289

        Return:
          A sequence number that can be used to wait for an answer
          (see group_recvmsg).
        '''
Jelte Jansen's avatar
Jelte Jansen committed
290
        seq = self._next_sequence()
Michael Graff's avatar
Michael Graff committed
291
        self.sendmsg({
292
293
294
295
296
297
298
            CC_HEADER_TYPE: CC_COMMAND_SEND,
            CC_HEADER_FROM: self._lname,
            CC_HEADER_TO: to,
            CC_HEADER_GROUP: group,
            CC_HEADER_INSTANCE: instance,
            CC_HEADER_SEQ: seq,
            CC_HEADER_WANT_ANSWER: want_answer
299
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
300
        return seq
Michael Graff's avatar
Michael Graff committed
301

Jelte Jansen's avatar
Jelte Jansen committed
302
303
304
    def has_queued_msgs(self):
        return len(self._queue) > 0

305
306
    def group_recvmsg(self, nonblock = True, seq = None):
        env, msg  = self.recvmsg(nonblock, seq)
Jelte Jansen's avatar
Jelte Jansen committed
307
        if env == None:
308
309
310
            # return none twice to match normal return value
            # (so caller won't get a type error on no data)
            return (None, None)
Jelte Jansen's avatar
Jelte Jansen committed
311
        return (msg, env)
Michael Graff's avatar
Michael Graff committed
312

Jelte Jansen's avatar
Jelte Jansen committed
313
314
315
    def group_reply(self, routing, msg):
        seq = self._next_sequence()
        self.sendmsg({
316
317
318
319
320
321
322
            CC_HEADER_TYPE: CC_COMMAND_SEND,
            CC_HEADER_FROM: self._lname,
            CC_HEADER_TO: routing[CC_HEADER_FROM],
            CC_HEADER_GROUP: routing[CC_HEADER_GROUP],
            CC_HEADER_INSTANCE: routing[CC_HEADER_INSTANCE],
            CC_HEADER_SEQ: seq,
            CC_HEADER_REPLY: routing[CC_HEADER_SEQ],
323
        }, isc.cc.message.to_wire(msg))
Jelte Jansen's avatar
Jelte Jansen committed
324
325
        return seq

326
327
328
329
330
331
332
333
334
    def set_timeout(self, milliseconds):
        """Sets the socket timeout for blocking reads to the given
           number of milliseconds"""
        self._socket_timeout = milliseconds / 1000.0

    def get_timeout(self):
        """Returns the current timeout for blocking reads (in milliseconds)"""
        return self._socket_timeout * 1000.0

Michael Graff's avatar
Michael Graff committed
335
336
337
if __name__ == "__main__":
    import doctest
    doctest.testmod()