session.cc 13.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Copyright (C) 2009  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

// $Id$
16

17
18
#include "config.h"

19
#include <stdint.h>
20
21

#include <cstdio>
22
#include <vector>
23
24
25
#include <iostream>
#include <sstream>

26
#ifdef HAVE_BOOSTLIB
27
28
29
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/asio.hpp>
30
#endif
31
32
33
34
35
36

#include <exceptions/exceptions.h>

#include "data.h"
#include "session.h"

37
using namespace std;
Jelte Jansen's avatar
Jelte Jansen committed
38
39
using namespace isc::cc;
using namespace isc::data;
40

41
#ifdef HAVE_BOOSTLIB
42
43
44
45
// some of the boost::asio names conflict with socket API system calls
// (e.g. write(2)) so we don't import the entire boost::asio namespace.
using boost::asio::io_service;
using boost::asio::ip::tcp;
46
#endif
47

48
49
50
51
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

52
53
54
55
56
namespace isc {
namespace cc {

class SessionImpl {
public:
57
58
59
60
61
62
63
64
65
66
    SessionImpl() : sequence_(-1) {}
    virtual ~SessionImpl() {}
    virtual void establish() = 0; 
    virtual int getSocket() = 0;
    virtual void disconnect() = 0;
    virtual void writeData(const void* data, size_t datalen) = 0;
    virtual size_t readDataLength() = 0;
    virtual void readData(void* data, size_t datalen) = 0;
    virtual void startRead(boost::function<void()> user_handler) = 0;

67
68
69
70
    int sequence_; // the next sequence number to use
    std::string lname_;
};

71
#ifdef HAVE_BOOSTLIB
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ASIOSession : public SessionImpl {
public:
    ASIOSession(io_service& io_service) :
        io_service_(io_service), socket_(io_service_), data_length_(0)
    {}
    virtual void establish();
    virtual void disconnect();
    virtual int getSocket() { return (socket_.native()); }
    virtual void writeData(const void* data, size_t datalen);
    virtual size_t readDataLength();
    virtual void readData(void* data, size_t datalen);
    virtual void startRead(boost::function<void()> user_handler);
private:
    void internalRead(const boost::system::error_code& error,
                      size_t bytes_transferred);

private:
    io_service& io_service_;
    tcp::socket socket_;
    uint32_t data_length_;
    boost::function<void()> user_handler_;
    boost::system::error_code error_;
};
95

96
97
98
99
100
101
102
void
ASIOSession::establish() {
    socket_.connect(tcp::endpoint(boost::asio::ip::address_v4::loopback(),
                                  9912), error_);
    if (error_) {
        isc_throw(SessionError, "Unable to connect to message queue");
    }
103
104
105
}

void
106
107
108
ASIOSession::disconnect() {
    socket_.close();
    data_length_ = 0;
109
110
111
}

void
112
113
114
115
116
117
ASIOSession::writeData(const void* data, size_t datalen) {
    try {
        boost::asio::write(socket_, boost::asio::buffer(data, datalen));
    } catch (const boost::system::system_error& boost_ex) {
        isc_throw(SessionError, "ASIO write failed: " << boost_ex.what());
    }
118
119
}

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
size_t
ASIOSession::readDataLength() {
    size_t ret_len = data_length_;
    
    if (ret_len == 0) {
        readData(&data_length_, sizeof(data_length_));
        if (data_length_ == 0) {
            isc_throw(SessionError, "ASIO read: data length is not ready");
        }
        ret_len = ntohl(data_length_);
    }

    data_length_ = 0;
    return (ret_len);
}

void
ASIOSession::readData(void* data, size_t datalen) {
    try {
        boost::asio::read(socket_, boost::asio::buffer(data, datalen));
    } catch (const boost::system::system_error& boost_ex) {
        // to hide boost specific exceptions, we catch them explicitly
        // and convert it to SessionError.
        isc_throw(SessionError, "ASIO read failed: " << boost_ex.what());
    }
}

void
ASIOSession::startRead(boost::function<void()> user_handler) {
    data_length_ = 0;
    user_handler_ = user_handler;
    async_read(socket_, boost::asio::buffer(&data_length_,
                                            sizeof(data_length_)),
               boost::bind(&ASIOSession::internalRead, this,
                           boost::asio::placeholders::error,
                           boost::asio::placeholders::bytes_transferred));
}

void
ASIOSession::internalRead(const boost::system::error_code& error,
                          size_t bytes_transferred)
161
{
162
163
164
165
166
167
168
169
170
171
172
    if (!error) {
        assert(bytes_transferred == sizeof(data_length_));
        data_length_ = ntohl(data_length_);
        if (data_length_ == 0) {
            isc_throw(SessionError, "Invalid message length (0)");
        }
        user_handler_();
    } else {
        isc_throw(SessionError, "asynchronous read failed");
    }
}
173
#endif
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190

class SocketSession : public SessionImpl {
public:
    SocketSession() : sock_(-1) {}
    virtual ~SocketSession() { disconnect(); }
    virtual int getSocket() { return (sock_); }
    void establish();
    virtual void disconnect()
    {
        if (sock_ >= 0) {
            close(sock_);
        }
        sock_ = -1;
    }
    virtual void writeData(const void* data, size_t datalen);
    virtual void readData(void* data, size_t datalen);
    virtual size_t readDataLength();
JINMEI Tatuya's avatar
JINMEI Tatuya committed
191
    virtual void startRead(boost::function<void()> user_handler UNUSED_PARAM)
192
193
194
195
196
197
    {} // nothing to do for this class
private:
    int sock_;
};

namespace {                     // maybe unnecessary.
198
199
// This is a helper class to make the establish() method (below) exception-safe
// with the RAII approach.
200
class SessionHolder {
201
public:
202
203
204
205
206
207
208
209
210
    SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
    ~SessionHolder()
    {
        if (impl_obj_ != NULL) {
            impl_obj_->disconnect();
        }
    }
    void clear() { impl_obj_ = NULL; }
    SessionImpl* impl_obj_;
211
};
212
213
214
}

void
215
216
SocketSession::establish() {
    int s;
217
218
    struct sockaddr_in sin;

219
220
    s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (s < 0) {
221
        isc_throw(SessionError, "socket() failed");
222
    }
Michael Graff's avatar
Michael Graff committed
223
224
225
226
227
    
    int port = atoi(getenv("ISC_MSGQ_PORT"));
    if (port == 0) {
        port = 9912;
    }
228
229

    sin.sin_family = AF_INET;
Michael Graff's avatar
Michael Graff committed
230
    sin.sin_port = htons(port);
231
    sin.sin_addr.s_addr = INADDR_ANY;
232
233
234
235
236

#ifdef HAVE_SIN_LEN
    sin.sin_len = sizeof(struct sockaddr_in);
#endif

237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
    if (connect(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        close(s);
        isc_throw(SessionError, "Unable to connect to message queue");
    }

    sock_ = s;
}

void
SocketSession::writeData(const void* data, const size_t datalen) {
    int cc = write(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Write failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

size_t
SocketSession::readDataLength() {
    uint32_t length;
    readData(&length, sizeof(length));
    return (ntohl(length));
}

void
SocketSession::readData(void* data, const size_t datalen) {
    int cc = read(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Read failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

Session::Session() : impl_(new SocketSession)
{}

273
#ifdef HAVE_BOOSTLIB
274
275
Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
{}
276
#endif
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304

Session::~Session() {
    delete impl_;
}

void
Session::disconnect() {
    impl_->disconnect();
}

int
Session::getSocket() const {
    return (impl_->getSocket());
}

void
Session::startRead(boost::function<void()> read_callback) {
    impl_->startRead(read_callback);
}

void
Session::establish() {
    impl_->establish();

    // once established, encapsulate the implementation object so that we
    // can safely release the internal resource when exception happens
    // below.
    SessionHolder session_holder(impl_);
305
306
307
308

    //
    // send a request for our local name, and wait for a response
    //
309
310
    ElementPtr get_lname_msg =
        Element::createFromString("{ \"type\": \"getlname\" }");
311
312
    sendmsg(get_lname_msg);

313
314
    ElementPtr routing, msg;
    recvmsg(routing, msg, false);
315

316
317
318
    impl_->lname_ = msg->get("lname")->stringValue();
    cout << "My local name is:  " << impl_->lname_ << endl;

319
320
    // At this point there's no risk of resource leak.
    session_holder.clear();
321
322
323
324
325
326
}

//
// Convert to wire format and send this on the TCP stream with its length prefix
//
void
327
Session::sendmsg(ElementPtr& msg) {
328
    std::string header_wire = msg->toWire();
329
    unsigned int length = 2 + header_wire.length();
330
    unsigned int length_net = htonl(length);
331
332
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);
333

334
335
336
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
337
338
}

339
void
340
Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
341
342
    std::string header_wire = env->toWire();
    std::string body_wire = msg->toWire();
343
344
345
346
347
    unsigned int length = 2 + header_wire.length() + body_wire.length();
    unsigned int length_net = htonl(length);
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);

348
349
350
351
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
    impl_->writeData(body_wire.data(), body_wire.length());
352
353
}

354
bool
355
356
Session::recvmsg(ElementPtr& msg, bool nonblock) {
    size_t length = impl_->readDataLength();
357

358
359
    unsigned short header_length_net;
    impl_->readData(&header_length_net, sizeof(header_length_net));
360
361
362

    unsigned short header_length = ntohs(header_length_net);
    if (header_length != length) {
363
364
        isc_throw(SessionError, "Length parameters invalid: total=" << length
                  << ", header=" << header_length);
365
    }
JINMEI Tatuya's avatar
JINMEI Tatuya committed
366
367

    std::vector<char> buffer(length);
368
    impl_->readData(&buffer[0], length);
369

JINMEI Tatuya's avatar
JINMEI Tatuya committed
370
    std::string wire = std::string(&buffer[0], length);
371
    std::stringstream wire_stream;
JINMEI Tatuya's avatar
JINMEI Tatuya committed
372
    wire_stream << wire;
373

374
    msg = Element::fromWire(wire_stream, length);
375
376
377
378
379

    return (true);
    // XXXMLG handle non-block here, and return false for short reads
}

380
bool
381
382
Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock) {
    size_t length = impl_->readDataLength();
383

384
385
    unsigned short header_length_net;
    impl_->readData(&header_length_net, sizeof(header_length_net));
386
387

    unsigned short header_length = ntohs(header_length_net);
388
389
390
391
    if (header_length > length || length < 2) {
        isc_throw(SessionError, "Length parameters invalid: total=" << length
                  << ", header=" << header_length);
    }
392
393
394

    // remove the header-length bytes from the total length
    length -= 2;
395
    std::vector<char> buffer(length);
396
    impl_->readData(&buffer[0], length);
397

398
    std::string header_wire = std::string(&buffer[0], header_length);
399
400
    std::string body_wire = std::string(&buffer[0] + header_length,
                                        length - header_length);
401
402
    std::stringstream header_wire_stream;
    header_wire_stream << header_wire;
403
    env = Element::fromWire(header_wire_stream, header_length);
404
    
405
406
    std::stringstream body_wire_stream;
    body_wire_stream << body_wire;
407
    msg = Element::fromWire(body_wire_stream, length - header_length);
408
409
410
411
412

    return (true);
    // XXXMLG handle non-block here, and return false for short reads
}

413
void
414
Session::subscribe(std::string group, std::string instance) {
415
416
417
418
419
420
421
422
423
424
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("subscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

void
425
Session::unsubscribe(std::string group, std::string instance) {
426
427
428
429
430
431
432
433
434
435
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("unsubscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

unsigned int
436
Session::group_sendmsg(ElementPtr msg, std::string group,
437
                       std::string instance, std::string to)
438
439
440
441
{
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("send"));
442
    env->set("from", Element::create(impl_->lname_));
443
444
445
    env->set("to", Element::create(to));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));
446
    env->set("seq", Element::create(impl_->sequence_));
447
    //env->set("msg", Element::create(msg->toWire()));
448

449
    sendmsg(env, msg);
450

451
    return (++impl_->sequence_);
452
453
454
}

bool
455
456
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
                       bool nonblock)
457
{
458
    return (recvmsg(envelope, msg, nonblock));
459
460
461
}

unsigned int
462
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
463
464
465
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("send"));
466
    env->set("from", Element::create(impl_->lname_));
467
468
469
    env->set("to", Element::create(envelope->get("from")->stringValue()));
    env->set("group", Element::create(envelope->get("group")->stringValue()));
    env->set("instance", Element::create(envelope->get("instance")->stringValue()));
470
    env->set("seq", Element::create(impl_->sequence_));
471
    env->set("reply", Element::create(envelope->get("seq")->intValue()));
472

473
    sendmsg(env, newmsg);
474

475
    return (++impl_->sequence_);
476
477
}
}
478
}