Commit ad91697c authored by Scott Mann's avatar Scott Mann
Browse files

[trac554] last few changes

parent 725aa66b
...@@ -28,14 +28,15 @@ ...@@ -28,14 +28,15 @@
#include <asiolink/asiolink.h> #include <asiolink/asiolink.h>
#include <asiolink/internal/coroutine.h> #include <asiolink/internal/coroutine.h>
// This file contains UDP-specific implementations of generic classes // This file contains TCP/UDP-specific implementations of generic classes
// defined in asiolink.h. It is *not* intended to be part of the public // defined in asiolink.h. It is *not* intended to be part of the public
// API. // API.
namespace asiolink { namespace asiolink {
// //
// Asynchronous UDP coroutine for upstream queries // Asynchronous UDP/TCP coroutine for upstream fetches
// //
//class IOFetch : public coroutine, public UdpFetch, public TcpFetch {
class IOFetch : public coroutine { class IOFetch : public coroutine {
public: public:
// TODO Maybe this should be more generic than just for IOFetch? // TODO Maybe this should be more generic than just for IOFetch?
...@@ -66,11 +67,12 @@ public: ...@@ -66,11 +67,12 @@ public:
/// delete it if allocated on heap. /// delete it if allocated on heap.
///@param timeout in ms. ///@param timeout in ms.
/// ///
explicit IOFetch(asio::io_service& io_service, IOFetch(asio::io_service& io_service,
const isc::dns::Question& q, const isc::dns::Question& q,
const IOAddress& addr, uint16_t port, const IOAddress& addr, uint16_t port,
isc::dns::OutputBufferPtr buffer, isc::dns::OutputBufferPtr buffer,
Callback* callback, int timeout = -1); Callback* callback, int timeout = -1,
int protocol = IPPROTO_UDP);
void operator()(asio::error_code ec = asio::error_code(), void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0); size_t length = 0);
/// Terminate the query. /// Terminate the query.
...@@ -87,9 +89,32 @@ private: ...@@ -87,9 +89,32 @@ private:
/// to many async_*() functions) and we want keep the same data. Some of /// to many async_*() functions) and we want keep the same data. Some of
/// the data is not copyable too. /// the data is not copyable too.
/// ///
struct IOFetchProtocol; //struct IOFetchProtocol;
boost::shared_ptr<IOFetchProtocol> data_; //boost::shared_ptr<IOFetchProtocol> data_;
//struct UdpData;
//struct TcpData;
boost::shared_ptr<UdpFetch> data_;
boost::shared_ptr<TcpFetch> tcp_data_;
}; };
class UdpFetch : public IOFetch {
public:
struct UdpData;
explicit UdpFetch(asio::io_service& io_service,
const isc::dns::Question& q,
const IOAddress& addr,
uint16_t port,
isc::dns::OutputBufferPtr buffer,
IOFetch::Callback *callback,
int timeout);
};
class TcpFetch : public IOFetch {
public:
struct TcpData;
explicit TcpFetch(io_service& io_service, const Question& q,
const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout);
};
} }
......
...@@ -50,16 +50,51 @@ using namespace isc::dns; ...@@ -50,16 +50,51 @@ using namespace isc::dns;
namespace asiolink { namespace asiolink {
// Private IOFetch data (see internal/iofetch.h for reasons) struct TcpFetch::UdpData {
struct IOFetch::IOFetchProtocol {
// UDP Socket we send query to and expect reply from there // UDP Socket we send query to and expect reply from there
udp::socket socket; udp::socket socket;
// Where was the query sent // Where was the query sent
udp::endpoint remote; udp::endpoint remote;
// What we ask the server
Question question;
// We will store the answer here
OutputBufferPtr buffer;
OutputBufferPtr msgbuf;
// Temporary buffer for answer
boost::shared_array<char> data;
// This will be called when the data arrive or timeouts
Callback* callback;
//Callback* callback;
// Did we already stop operating (data arrived, we timed out, someone
// called stop). This can be so when we are cleaning up/there are
// still pointers to us.
bool stopped;
// Timer to measure timeouts.
deadline_timer timer;
// How many milliseconds are we willing to wait for answer?
int timeout;
UdpData(io_service& service,
const udp::socket::protocol_type& protocol,
const Question &q,
OutputBufferPtr b, Callback *c) :
socket(service, protocol),
question(q),
buffer(b),
msgbuf(new OutputBuffer(512)),
callback(c),
stopped(false),
timer(service)
{ }
};
struct TcpFetch::TcpData {
// TCP Socket // TCP Socket
tcp::socket tsocket; tcp::socket socket;
// tcp endpoint // tcp endpoint
tcp::endpoint tremote; tcp::endpoint remote;
// What we ask the server // What we ask the server
Question question; Question question;
// We will store the answer here // We will store the answer here
...@@ -78,13 +113,11 @@ struct IOFetch::IOFetchProtocol { ...@@ -78,13 +113,11 @@ struct IOFetch::IOFetchProtocol {
// How many milliseconds are we willing to wait for answer? // How many milliseconds are we willing to wait for answer?
int timeout; int timeout;
IOFetchProtocol(io_service& service, TcpData(io_service& service,
const udp::socket::protocol_type& protocol, const tcp::socket::protocol_type& protocol,
const tcp::socket::protocol_type& tprotocol,
const Question &q, const Question &q,
OutputBufferPtr b, Callback *c) : OutputBufferPtr b, Callback *c) :
socket(service, protocol), socket(service, protocol),
tsocket(service, tprotocol),
question(q), question(q),
buffer(b), buffer(b),
msgbuf(new OutputBuffer(512)), msgbuf(new OutputBuffer(512)),
...@@ -96,22 +129,58 @@ struct IOFetch::IOFetchProtocol { ...@@ -96,22 +129,58 @@ struct IOFetch::IOFetchProtocol {
}; };
UdpFetch::UdpFetch(io_service& io_service, const Question& q,
const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout) :
data_(new UdpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback))
{
data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
data_->timeout = timeout;
}
TcpFetch::TcpFetch(io_service& io_service, const Question& q,
const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout) :
tcp_data_(new TcpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback))
{
tcp_data_->remote = TCPEndpoint(addr, port).getASIOEndpoint();
tcp_data_->timeout = timeout;
}
//
/// The following functions implement the \c IOFetch class. /// The following functions implement the \c IOFetch class.
/// ///
/// The constructor /// The constructor
IOFetch::IOFetch(io_service& io_service, const Question& q, IOFetch::IOFetch(io_service& io_service, const Question& q,
const IOAddress& addr, uint16_t port, const IOAddress& addr, uint16_t port,
OutputBufferPtr buffer, Callback *callback, int timeout) : OutputBufferPtr buffer, Callback *callback, int timeout,
data_(new IOFetchProtocol(io_service, int protocol)
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
addr.getFamily() == AF_INET ? tcp::v4() : tcp::v6(),
q, buffer,
callback))
{ {
data_->tremote = TCPEndpoint(addr, port).getASIOEndpoint(); if (protocol == IPPROTO_TCP)
{
TcpFetch(io_service, q, addr, port, buffer, callback, timeout);
/*
tcp_data_ = new TcpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback);
tcp_data_->remote = TCPEndpoint(addr, port).getASIOEndpoint();
tcp_data_->timeout = timeout;
*/
}
else
{
UdpFetch(io_service, q, addr, port, buffer, callback, timeout);
/*
data_(new UdpData(io_service,
addr.getFamily() == AF_INET ? udp::v4() : udp::v6(),
q, buffer, callback));
data_->remote = UDPEndpoint(addr, port).getASIOEndpoint(); data_->remote = UDPEndpoint(addr, port).getASIOEndpoint();
data_->timeout = timeout; data_->timeout = timeout;
*/
}
} }
/// The function operator is implemented with the "stackless coroutine" /// The function operator is implemented with the "stackless coroutine"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment