session.cc 15 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Copyright (C) 2009  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

// $Id$
16

17
#include <config.h>
Jelte Jansen's avatar
Jelte Jansen committed
18
#include "session_config.h"
19

20
#include <stdint.h>
21

22
23
24
25
26
27
// XXX: there seems to be a strange dependency between ASIO and std library
// definitions.  On some platforms if we include std headers before ASIO
// headers unexpected behaviors will happen.
// A middle term solution is to generalize our local wrapper interface
// (currently only available for the auth server), where all such portability
// issues are hidden, and to have other modules use the wrapper.
28
#include <unistd.h>             // for some IPC/network system calls
29
30
31
32
#include <asio.hpp>
#include <asio/error_code.hpp>
#include <asio/system_error.hpp>

33
#include <cstdio>
34
#include <vector>
35
36
37
#include <iostream>
#include <sstream>

JINMEI Tatuya's avatar
JINMEI Tatuya committed
38
39
#include <sys/un.h>

40
41
#include <boost/bind.hpp>
#include <boost/function.hpp>
42

43
44
45
46
47
#include <exceptions/exceptions.h>

#include "data.h"
#include "session.h"

48
using namespace std;
Jelte Jansen's avatar
Jelte Jansen committed
49
50
using namespace isc::cc;
using namespace isc::data;
51

52
// some of the asio names conflict with socket API system calls
Jelte Jansen's avatar
   
Jelte Jansen committed
53
// (e.g. write(2)) so we don't import the entire asio namespace.
54
55
using asio::io_service;
using asio::ip::tcp;
56

57
58
59
60
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

61
62
63
64
65
namespace isc {
namespace cc {

class SessionImpl {
public:
Jelte Jansen's avatar
Jelte Jansen committed
66
    SessionImpl() : sequence_(-1) { queue_ = Element::createFromString("[]"); }
67
    virtual ~SessionImpl() {}
68
    virtual void establish(const char& socket_file) = 0;
69
70
71
72
73
74
    virtual int getSocket() = 0;
    virtual void disconnect() = 0;
    virtual void writeData(const void* data, size_t datalen) = 0;
    virtual size_t readDataLength() = 0;
    virtual void readData(void* data, size_t datalen) = 0;
    virtual void startRead(boost::function<void()> user_handler) = 0;
Jelte Jansen's avatar
Jelte Jansen committed
75
    
76
77
    int sequence_; // the next sequence number to use
    std::string lname_;
Jelte Jansen's avatar
Jelte Jansen committed
78
    ElementPtr queue_;
79
80
};

81
82
83
84
85
class ASIOSession : public SessionImpl {
public:
    ASIOSession(io_service& io_service) :
        io_service_(io_service), socket_(io_service_), data_length_(0)
    {}
86
    virtual void establish(const char& socket_file);
87
88
89
90
91
92
93
    virtual void disconnect();
    virtual int getSocket() { return (socket_.native()); }
    virtual void writeData(const void* data, size_t datalen);
    virtual size_t readDataLength();
    virtual void readData(void* data, size_t datalen);
    virtual void startRead(boost::function<void()> user_handler);
private:
94
    void internalRead(const asio::error_code& error,
95
96
97
98
                      size_t bytes_transferred);

private:
    io_service& io_service_;
Jelte Jansen's avatar
   
Jelte Jansen committed
99
    asio::local::stream_protocol::socket socket_;
100
101
    uint32_t data_length_;
    boost::function<void()> user_handler_;
102
    asio::error_code error_;
103
};
104

105
106


107
void
108
ASIOSession::establish(const char& socket_file) {
109
    try {
Jelte Jansen's avatar
   
Jelte Jansen committed
110
111
        socket_.connect(asio::local::stream_protocol::endpoint(&socket_file), error_);
    } catch (asio::system_error& se) {
112
113
        isc_throw(SessionError, se.what());
    }
114
    if (error_) {
115
        isc_throw(SessionError, "Unable to connect to message queue.");
116
    }
117
118
119
}

void
120
121
122
ASIOSession::disconnect() {
    socket_.close();
    data_length_ = 0;
123
124
125
}

void
126
127
ASIOSession::writeData(const void* data, size_t datalen) {
    try {
128
        asio::write(socket_, asio::buffer(data, datalen));
129
130
    } catch (const asio::system_error& asio_ex) {
        isc_throw(SessionError, "ASIO write failed: " << asio_ex.what());
131
    }
132
133
}

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
size_t
ASIOSession::readDataLength() {
    size_t ret_len = data_length_;
    
    if (ret_len == 0) {
        readData(&data_length_, sizeof(data_length_));
        if (data_length_ == 0) {
            isc_throw(SessionError, "ASIO read: data length is not ready");
        }
        ret_len = ntohl(data_length_);
    }

    data_length_ = 0;
    return (ret_len);
}

void
ASIOSession::readData(void* data, size_t datalen) {
    try {
153
        asio::read(socket_, asio::buffer(data, datalen));
154
    } catch (const asio::system_error& asio_ex) {
155
156
        // to hide boost specific exceptions, we catch them explicitly
        // and convert it to SessionError.
157
        isc_throw(SessionError, "ASIO read failed: " << asio_ex.what());
158
159
160
161
162
163
164
    }
}

void
ASIOSession::startRead(boost::function<void()> user_handler) {
    data_length_ = 0;
    user_handler_ = user_handler;
165
    async_read(socket_, asio::buffer(&data_length_,
166
167
                                            sizeof(data_length_)),
               boost::bind(&ASIOSession::internalRead, this,
168
169
                           asio::placeholders::error,
                           asio::placeholders::bytes_transferred));
170
171
172
}

void
173
ASIOSession::internalRead(const asio::error_code& error,
174
                          size_t bytes_transferred)
175
{
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    if (!error) {
        assert(bytes_transferred == sizeof(data_length_));
        data_length_ = ntohl(data_length_);
        if (data_length_ == 0) {
            isc_throw(SessionError, "Invalid message length (0)");
        }
        user_handler_();
    } else {
        isc_throw(SessionError, "asynchronous read failed");
    }
}

class SocketSession : public SessionImpl {
public:
    SocketSession() : sock_(-1) {}
    virtual ~SocketSession() { disconnect(); }
    virtual int getSocket() { return (sock_); }
193
    void establish(const char& socket_file);
194
195
196
197
198
199
200
201
202
203
    virtual void disconnect()
    {
        if (sock_ >= 0) {
            close(sock_);
        }
        sock_ = -1;
    }
    virtual void writeData(const void* data, size_t datalen);
    virtual void readData(void* data, size_t datalen);
    virtual size_t readDataLength();
JINMEI Tatuya's avatar
JINMEI Tatuya committed
204
    virtual void startRead(boost::function<void()> user_handler UNUSED_PARAM)
205
206
207
208
209
210
    {} // nothing to do for this class
private:
    int sock_;
};

namespace {                     // maybe unnecessary.
211
212
// This is a helper class to make the establish() method (below) exception-safe
// with the RAII approach.
213
class SessionHolder {
214
public:
215
216
217
218
219
220
221
222
223
    SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
    ~SessionHolder()
    {
        if (impl_obj_ != NULL) {
            impl_obj_->disconnect();
        }
    }
    void clear() { impl_obj_ = NULL; }
    SessionImpl* impl_obj_;
224
};
225
226
227
}

void
228
SocketSession::establish(const char& socket_file) {
229
    struct sockaddr_un s_un;
230
#ifdef HAVE_SA_LEN
231
    s_un.sun_len = sizeof(struct sockaddr_un);
232
#endif
233

234
    if (strlen(&socket_file) >= sizeof(s_un.sun_path)) {
235
236
        isc_throw(SessionError, "Unable to connect to message queue; "
                  "socket file path too long: " << socket_file);
Michael Graff's avatar
Michael Graff committed
237
    }
238
239
    s_un.sun_family = AF_UNIX;
    strncpy(s_un.sun_path, &socket_file, sizeof(s_un.sun_path) - 1);
240

241
    int s = socket(AF_UNIX, SOCK_STREAM, 0);
242
    if (s < 0) {
243
        isc_throw(SessionError, "socket() failed");
244
    }
245

246
    if (connect(s, (struct sockaddr *)&s_un, sizeof(s_un)) < 0) {
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
        close(s);
        isc_throw(SessionError, "Unable to connect to message queue");
    }

    sock_ = s;
}

void
SocketSession::writeData(const void* data, const size_t datalen) {
    int cc = write(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Write failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

size_t
SocketSession::readDataLength() {
    uint32_t length;
    readData(&length, sizeof(length));
    return (ntohl(length));
}

void
SocketSession::readData(void* data, const size_t datalen) {
    int cc = read(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Read failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

Session::Session() : impl_(new SocketSession)
{}

Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
{}

Session::~Session() {
    delete impl_;
}

void
Session::disconnect() {
    impl_->disconnect();
}

int
Session::getSocket() const {
    return (impl_->getSocket());
}

void
Session::startRead(boost::function<void()> read_callback) {
    impl_->startRead(read_callback);
}

void
305
Session::establish(const char* socket_file) {
306
307
308
309
310
311
312
313
    if (socket_file == NULL) {
        socket_file = getenv("BIND10_MSGQ_SOCKET_FILE");
    }
    if (socket_file == NULL) {
        socket_file = BIND10_MSGQ_SOCKET_FILE;
    }

    impl_->establish(*socket_file);
314
315
316
317
318

    // once established, encapsulate the implementation object so that we
    // can safely release the internal resource when exception happens
    // below.
    SessionHolder session_holder(impl_);
319
320
321
322

    //
    // send a request for our local name, and wait for a response
    //
323
324
    ElementPtr get_lname_msg =
        Element::createFromString("{ \"type\": \"getlname\" }");
325
326
    sendmsg(get_lname_msg);

327
328
    ElementPtr routing, msg;
    recvmsg(routing, msg, false);
329

330
331
    impl_->lname_ = msg->get("lname")->stringValue();

332
333
    // At this point there's no risk of resource leak.
    session_holder.clear();
334
335
336
337
338
339
}

//
// Convert to wire format and send this on the TCP stream with its length prefix
//
void
340
Session::sendmsg(ElementPtr& msg) {
341
    std::string header_wire = msg->toWire();
342
    unsigned int length = 2 + header_wire.length();
343
    unsigned int length_net = htonl(length);
344
345
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);
346

347
348
349
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
350
351
}

352
void
353
Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
354
355
    std::string header_wire = env->toWire();
    std::string body_wire = msg->toWire();
356
357
358
359
360
    unsigned int length = 2 + header_wire.length() + body_wire.length();
    unsigned int length_net = htonl(length);
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);

361
362
363
364
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
    impl_->writeData(body_wire.data(), body_wire.length());
365
366
}

367
bool
Jelte Jansen's avatar
Jelte Jansen committed
368
369
370
Session::recvmsg(ElementPtr& msg, bool nonblock, int seq) {
    ElementPtr l_env;
    return recvmsg(l_env, msg, nonblock, seq);
371
372
}

373
bool
Jelte Jansen's avatar
Jelte Jansen committed
374
375
Session::recvmsg(ElementPtr& env, ElementPtr& msg,
                 bool nonblock, int seq) {
376
    size_t length = impl_->readDataLength();
Jelte Jansen's avatar
Jelte Jansen committed
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
    ElementPtr l_env, l_msg;
    if (hasQueuedMsgs()) {
        ElementPtr q_el;
        for (int i = 0; i < impl_->queue_->size(); i++) {
            q_el = impl_->queue_->get(i);
            if (( seq == -1 &&
                  !q_el->get(0)->contains("reply")
                ) || (
                  q_el->get(0)->contains("reply") &&
                  q_el->get(0)->get("reply")->intValue() == seq
                )
               ) {
                   env = q_el->get(0);
                   msg = q_el->get(1);
                   impl_->queue_->remove(i);
                   return true;
            }
        }
    }
    
397
398
    unsigned short header_length_net;
    impl_->readData(&header_length_net, sizeof(header_length_net));
399
400

    unsigned short header_length = ntohs(header_length_net);
401
402
403
404
    if (header_length > length || length < 2) {
        isc_throw(SessionError, "Length parameters invalid: total=" << length
                  << ", header=" << header_length);
    }
405
406
407

    // remove the header-length bytes from the total length
    length -= 2;
408
    std::vector<char> buffer(length);
409
    impl_->readData(&buffer[0], length);
410

411
    std::string header_wire = std::string(&buffer[0], header_length);
412
413
    std::string body_wire = std::string(&buffer[0] + header_length,
                                        length - header_length);
414
415
    std::stringstream header_wire_stream;
    header_wire_stream << header_wire;
Jelte Jansen's avatar
Jelte Jansen committed
416
    l_env = Element::fromWire(header_wire_stream, header_length);
417
    
418
419
    std::stringstream body_wire_stream;
    body_wire_stream << body_wire;
Jelte Jansen's avatar
Jelte Jansen committed
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
    l_msg = Element::fromWire(body_wire_stream, length - header_length);
    if ((seq == -1 &&
         !l_env->contains("reply")
        ) || (
         l_env->contains("reply") &&
         l_env->get("reply")->intValue() == seq
        )
       ) {
        env = l_env;
        msg = l_msg;
        return true;
    } else {
        ElementPtr q_el = Element::createFromString("[]");
        q_el->add(l_env);
        q_el->add(l_msg);
        impl_->queue_->add(q_el);
        return recvmsg(env, msg, nonblock, seq);
    }
438
439
440
    // XXXMLG handle non-block here, and return false for short reads
}

441
void
442
Session::subscribe(std::string group, std::string instance) {
443
444
445
446
447
448
449
450
451
452
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("subscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

void
453
Session::unsubscribe(std::string group, std::string instance) {
454
455
456
457
458
459
460
461
462
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("unsubscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

Jelte Jansen's avatar
Jelte Jansen committed
463
int
464
Session::group_sendmsg(ElementPtr msg, std::string group,
465
                       std::string instance, std::string to)
466
467
{
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
Jelte Jansen's avatar
Jelte Jansen committed
468
469
    int nseq = ++impl_->sequence_;
    
470
    env->set("type", Element::create("send"));
471
    env->set("from", Element::create(impl_->lname_));
472
473
474
    env->set("to", Element::create(to));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));
Jelte Jansen's avatar
Jelte Jansen committed
475
    env->set("seq", Element::create(nseq));
476
    //env->set("msg", Element::create(msg->toWire()));
477

478
    sendmsg(env, msg);
Jelte Jansen's avatar
Jelte Jansen committed
479
    return nseq;
480
481
482
}

bool
483
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
Jelte Jansen's avatar
Jelte Jansen committed
484
                       bool nonblock, int seq)
485
{
Jelte Jansen's avatar
Jelte Jansen committed
486
    return (recvmsg(envelope, msg, nonblock, seq));
487
488
}

Jelte Jansen's avatar
Jelte Jansen committed
489
int
490
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
491
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
Jelte Jansen's avatar
Jelte Jansen committed
492
493
    int nseq = ++impl_->sequence_;
    
494
    env->set("type", Element::create("send"));
495
    env->set("from", Element::create(impl_->lname_));
496
497
498
    env->set("to", Element::create(envelope->get("from")->stringValue()));
    env->set("group", Element::create(envelope->get("group")->stringValue()));
    env->set("instance", Element::create(envelope->get("instance")->stringValue()));
Jelte Jansen's avatar
Jelte Jansen committed
499
    env->set("seq", Element::create(nseq));
500
    env->set("reply", Element::create(envelope->get("seq")->intValue()));
501

502
    sendmsg(env, newmsg);
503

Jelte Jansen's avatar
Jelte Jansen committed
504
    return nseq;
505
}
Jelte Jansen's avatar
Jelte Jansen committed
506
507
508
509
510
511
512

bool
Session::hasQueuedMsgs()
{
    return (impl_->queue_->size() > 0);
}

513
}
514
}