session.cc 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (C) 2009  Internet Systems Consortium, Inc. ("ISC")
//
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
// PERFORMANCE OF THIS SOFTWARE.

// $Id$
16

17 18
#include "config.h"

19
#include <stdint.h>
20 21

#include <cstdio>
22
#include <vector>
23 24 25
#include <iostream>
#include <sstream>

26
#ifdef HAVE_BOOSTLIB
27 28 29
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/asio.hpp>
30
#endif
31 32 33 34 35 36

#include <exceptions/exceptions.h>

#include "data.h"
#include "session.h"

37
using namespace std;
Jelte Jansen's avatar
Jelte Jansen committed
38 39
using namespace isc::cc;
using namespace isc::data;
40

41
#ifdef HAVE_BOOSTLIB
42 43 44 45
// some of the boost::asio names conflict with socket API system calls
// (e.g. write(2)) so we don't import the entire boost::asio namespace.
using boost::asio::io_service;
using boost::asio::ip::tcp;
46
#endif
47

48 49 50 51
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

52 53 54 55 56
namespace isc {
namespace cc {

class SessionImpl {
public:
57 58 59 60 61 62 63 64 65 66
    SessionImpl() : sequence_(-1) {}
    virtual ~SessionImpl() {}
    virtual void establish() = 0; 
    virtual int getSocket() = 0;
    virtual void disconnect() = 0;
    virtual void writeData(const void* data, size_t datalen) = 0;
    virtual size_t readDataLength() = 0;
    virtual void readData(void* data, size_t datalen) = 0;
    virtual void startRead(boost::function<void()> user_handler) = 0;

67 68 69 70
    int sequence_; // the next sequence number to use
    std::string lname_;
};

71
#ifdef HAVE_BOOSTLIB
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
class ASIOSession : public SessionImpl {
public:
    ASIOSession(io_service& io_service) :
        io_service_(io_service), socket_(io_service_), data_length_(0)
    {}
    virtual void establish();
    virtual void disconnect();
    virtual int getSocket() { return (socket_.native()); }
    virtual void writeData(const void* data, size_t datalen);
    virtual size_t readDataLength();
    virtual void readData(void* data, size_t datalen);
    virtual void startRead(boost::function<void()> user_handler);
private:
    void internalRead(const boost::system::error_code& error,
                      size_t bytes_transferred);

private:
    io_service& io_service_;
    tcp::socket socket_;
    uint32_t data_length_;
    boost::function<void()> user_handler_;
    boost::system::error_code error_;
};
95

96 97 98 99 100 101 102
void
ASIOSession::establish() {
    socket_.connect(tcp::endpoint(boost::asio::ip::address_v4::loopback(),
                                  9912), error_);
    if (error_) {
        isc_throw(SessionError, "Unable to connect to message queue");
    }
103 104 105
}

void
106 107 108
ASIOSession::disconnect() {
    socket_.close();
    data_length_ = 0;
109 110 111
}

void
112 113 114 115 116 117
ASIOSession::writeData(const void* data, size_t datalen) {
    try {
        boost::asio::write(socket_, boost::asio::buffer(data, datalen));
    } catch (const boost::system::system_error& boost_ex) {
        isc_throw(SessionError, "ASIO write failed: " << boost_ex.what());
    }
118 119
}

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
size_t
ASIOSession::readDataLength() {
    size_t ret_len = data_length_;
    
    if (ret_len == 0) {
        readData(&data_length_, sizeof(data_length_));
        if (data_length_ == 0) {
            isc_throw(SessionError, "ASIO read: data length is not ready");
        }
        ret_len = ntohl(data_length_);
    }

    data_length_ = 0;
    return (ret_len);
}

void
ASIOSession::readData(void* data, size_t datalen) {
    try {
        boost::asio::read(socket_, boost::asio::buffer(data, datalen));
    } catch (const boost::system::system_error& boost_ex) {
        // to hide boost specific exceptions, we catch them explicitly
        // and convert it to SessionError.
        isc_throw(SessionError, "ASIO read failed: " << boost_ex.what());
    }
}

void
ASIOSession::startRead(boost::function<void()> user_handler) {
    data_length_ = 0;
    user_handler_ = user_handler;
    async_read(socket_, boost::asio::buffer(&data_length_,
                                            sizeof(data_length_)),
               boost::bind(&ASIOSession::internalRead, this,
                           boost::asio::placeholders::error,
                           boost::asio::placeholders::bytes_transferred));
}

void
ASIOSession::internalRead(const boost::system::error_code& error,
                          size_t bytes_transferred)
161
{
162 163 164 165 166 167 168 169 170 171 172
    if (!error) {
        assert(bytes_transferred == sizeof(data_length_));
        data_length_ = ntohl(data_length_);
        if (data_length_ == 0) {
            isc_throw(SessionError, "Invalid message length (0)");
        }
        user_handler_();
    } else {
        isc_throw(SessionError, "asynchronous read failed");
    }
}
173
#endif
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

class SocketSession : public SessionImpl {
public:
    SocketSession() : sock_(-1) {}
    virtual ~SocketSession() { disconnect(); }
    virtual int getSocket() { return (sock_); }
    void establish();
    virtual void disconnect()
    {
        if (sock_ >= 0) {
            close(sock_);
        }
        sock_ = -1;
    }
    virtual void writeData(const void* data, size_t datalen);
    virtual void readData(void* data, size_t datalen);
    virtual size_t readDataLength();
    virtual void startRead(boost::function<void()> user_handler)
    {} // nothing to do for this class
private:
    int sock_;
};

namespace {                     // maybe unnecessary.
198 199
// This is a helper class to make the establish() method (below) exception-safe
// with the RAII approach.
200
class SessionHolder {
201
public:
202 203 204 205 206 207 208 209 210
    SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
    ~SessionHolder()
    {
        if (impl_obj_ != NULL) {
            impl_obj_->disconnect();
        }
    }
    void clear() { impl_obj_ = NULL; }
    SessionImpl* impl_obj_;
211
};
212 213 214
}

void
215 216
SocketSession::establish() {
    int s;
217 218
    struct sockaddr_in sin;

219 220
    s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (s < 0) {
221
        isc_throw(SessionError, "socket() failed");
222
    }
223 224 225 226

    sin.sin_family = AF_INET;
    sin.sin_port = htons(9912);
    sin.sin_addr.s_addr = INADDR_ANY;
227 228 229 230 231

#ifdef HAVE_SIN_LEN
    sin.sin_len = sizeof(struct sockaddr_in);
#endif

232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    if (connect(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        close(s);
        isc_throw(SessionError, "Unable to connect to message queue");
    }

    sock_ = s;
}

void
SocketSession::writeData(const void* data, const size_t datalen) {
    int cc = write(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Write failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

size_t
SocketSession::readDataLength() {
    uint32_t length;
    readData(&length, sizeof(length));
    return (ntohl(length));
}

void
SocketSession::readData(void* data, const size_t datalen) {
    int cc = read(sock_, data, datalen);
    if (cc != datalen) {
        isc_throw(SessionError, "Read failed: expect " << datalen <<
                  ", actual " << cc);
    }
}

Session::Session() : impl_(new SocketSession)
{}

268
#ifdef HAVE_BOOSTLIB
269 270
Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
{}
271
#endif
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299

Session::~Session() {
    delete impl_;
}

void
Session::disconnect() {
    impl_->disconnect();
}

int
Session::getSocket() const {
    return (impl_->getSocket());
}

void
Session::startRead(boost::function<void()> read_callback) {
    impl_->startRead(read_callback);
}

void
Session::establish() {
    impl_->establish();

    // once established, encapsulate the implementation object so that we
    // can safely release the internal resource when exception happens
    // below.
    SessionHolder session_holder(impl_);
300 301 302 303

    //
    // send a request for our local name, and wait for a response
    //
304 305
    ElementPtr get_lname_msg =
        Element::createFromString("{ \"type\": \"getlname\" }");
306 307
    sendmsg(get_lname_msg);

308 309
    ElementPtr routing, msg;
    recvmsg(routing, msg, false);
310

311 312 313
    impl_->lname_ = msg->get("lname")->stringValue();
    cout << "My local name is:  " << impl_->lname_ << endl;

314 315
    // At this point there's no risk of resource leak.
    session_holder.clear();
316 317 318 319 320 321
}

//
// Convert to wire format and send this on the TCP stream with its length prefix
//
void
322
Session::sendmsg(ElementPtr& msg) {
323
    std::string header_wire = msg->toWire();
324
    unsigned int length = 2 + header_wire.length();
325
    unsigned int length_net = htonl(length);
326 327
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);
328

329 330 331
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
332 333
}

334
void
335
Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
336 337
    std::string header_wire = env->toWire();
    std::string body_wire = msg->toWire();
338 339 340 341 342
    unsigned int length = 2 + header_wire.length() + body_wire.length();
    unsigned int length_net = htonl(length);
    unsigned short header_length = header_wire.length();
    unsigned short header_length_net = htons(header_length);

343 344 345 346
    impl_->writeData(&length_net, sizeof(length_net));
    impl_->writeData(&header_length_net, sizeof(header_length_net));
    impl_->writeData(header_wire.data(), header_length);
    impl_->writeData(body_wire.data(), body_wire.length());
347 348
}

349
bool
350 351
Session::recvmsg(ElementPtr& msg, bool nonblock) {
    size_t length = impl_->readDataLength();
352

353 354
    unsigned short header_length_net;
    impl_->readData(&header_length_net, sizeof(header_length_net));
355 356 357

    unsigned short header_length = ntohs(header_length_net);
    if (header_length != length) {
358 359
        isc_throw(SessionError, "Length parameters invalid: total=" << length
                  << ", header=" << header_length);
360
    }
JINMEI Tatuya's avatar
JINMEI Tatuya committed
361 362

    std::vector<char> buffer(length);
363
    impl_->readData(&buffer[0], length);
364

JINMEI Tatuya's avatar
JINMEI Tatuya committed
365
    std::string wire = std::string(&buffer[0], length);
366
    std::stringstream wire_stream;
JINMEI Tatuya's avatar
JINMEI Tatuya committed
367
    wire_stream << wire;
368

369
    msg = Element::fromWire(wire_stream, length);
370 371 372 373 374

    return (true);
    // XXXMLG handle non-block here, and return false for short reads
}

375
bool
376 377
Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock) {
    size_t length = impl_->readDataLength();
378

379 380
    unsigned short header_length_net;
    impl_->readData(&header_length_net, sizeof(header_length_net));
381 382

    unsigned short header_length = ntohs(header_length_net);
383 384 385 386
    if (header_length > length || length < 2) {
        isc_throw(SessionError, "Length parameters invalid: total=" << length
                  << ", header=" << header_length);
    }
387 388 389

    // remove the header-length bytes from the total length
    length -= 2;
390
    std::vector<char> buffer(length);
391
    impl_->readData(&buffer[0], length);
392

393
    std::string header_wire = std::string(&buffer[0], header_length);
394 395
    std::string body_wire = std::string(&buffer[0] + header_length,
                                        length - header_length);
396 397
    std::stringstream header_wire_stream;
    header_wire_stream << header_wire;
398
    env = Element::fromWire(header_wire_stream, header_length);
399
    
400 401
    std::stringstream body_wire_stream;
    body_wire_stream << body_wire;
402
    msg = Element::fromWire(body_wire_stream, length - header_length);
403 404 405 406 407

    return (true);
    // XXXMLG handle non-block here, and return false for short reads
}

408
void
409
Session::subscribe(std::string group, std::string instance) {
410 411 412 413 414 415 416 417 418 419
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("subscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

void
420
Session::unsubscribe(std::string group, std::string instance) {
421 422 423 424 425 426 427 428 429 430
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("unsubscribe"));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));

    sendmsg(env);
}

unsigned int
431
Session::group_sendmsg(ElementPtr msg, std::string group,
432
                       std::string instance, std::string to)
433 434 435 436
{
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("send"));
437
    env->set("from", Element::create(impl_->lname_));
438 439 440
    env->set("to", Element::create(to));
    env->set("group", Element::create(group));
    env->set("instance", Element::create(instance));
441
    env->set("seq", Element::create(impl_->sequence_));
442
    //env->set("msg", Element::create(msg->toWire()));
443

444
    sendmsg(env, msg);
445

446
    return (++impl_->sequence_);
447 448 449
}

bool
450 451
Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
                       bool nonblock)
452
{
453
    return (recvmsg(envelope, msg, nonblock));
454 455 456
}

unsigned int
457
Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
458 459 460
    ElementPtr env = Element::create(std::map<std::string, ElementPtr>());

    env->set("type", Element::create("send"));
461
    env->set("from", Element::create(impl_->lname_));
462 463 464
    env->set("to", Element::create(envelope->get("from")->stringValue()));
    env->set("group", Element::create(envelope->get("group")->stringValue()));
    env->set("instance", Element::create(envelope->get("instance")->stringValue()));
465
    env->set("seq", Element::create(impl_->sequence_));
466
    env->set("reply", Element::create(envelope->get("seq")->intValue()));
467

468
    sendmsg(env, newmsg);
469

470
    return (++impl_->sequence_);
471 472
}
}
473
}