... | @@ -42,6 +42,10 @@ The following diagram illustrates the new class hierarchy: |
... | @@ -42,6 +42,10 @@ The following diagram illustrates the new class hierarchy: |
|
|
|
|
|
When a query packet arrives its queries will be pushed onto a queue. The queue is monitored a pool by one or more query worker threads. Each worker thread is responsible for submitting a single query against the lease store, iteratively fetching the results, and pushing results as XID/binding pairs to connection's outbound queue. The connection thread will add the binding pairs to the current TCP packet until it is full and then send it asynchronously. Subsequent pairs are adding to a new packet. This continues until there are no more queued queries.
|
|
When a query packet arrives its queries will be pushed onto a queue. The queue is monitored a pool by one or more query worker threads. Each worker thread is responsible for submitting a single query against the lease store, iteratively fetching the results, and pushing results as XID/binding pairs to connection's outbound queue. The connection thread will add the binding pairs to the current TCP packet until it is full and then send it asynchronously. Subsequent pairs are adding to a new packet. This continues until there are no more queued queries.
|
|
|
|
|
|
|
|
The following diagram drills down a bit further into QueryWorker and BulkLeaseQuery classes:
|
|
|
|
|
|
|
|
![blq_query_classes.svg](uploads/b5a67ea1a5b61859da67ab5da88a2ad7/blq_query_classes.svg)
|
|
|
|
|
|
A conceptualization of the processing for V6 in pseudo code follows:
|
|
A conceptualization of the processing for V6 in pseudo code follows:
|
|
|
|
|
|
```
|
|
```
|
... | @@ -59,7 +63,7 @@ LeaseQueryConnection::run() |
... | @@ -59,7 +63,7 @@ LeaseQueryConnection::run() |
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
QueryWorker::run():
|
|
QueryWorker6::run():
|
|
while (query_params = getNextQuery()) {
|
|
while (query_params = getNextQuery()) {
|
|
xid = query_params->xid
|
|
xid = query_params->xid
|
|
|
|
|
... | @@ -69,8 +73,8 @@ QueryWorker::run(): |
... | @@ -69,8 +73,8 @@ QueryWorker::run(): |
|
|
|
|
|
// Query error encountered. Queue an outbound error reply.
|
|
// Query error encountered. Queue an outbound error reply.
|
|
if (status != success) {
|
|
if (status != success) {
|
|
response = makeLeaseQueryReply(bulk_query, status);
|
|
bulk_response = makeLeaseQueryReply(bulk_query, status);
|
|
connection_->pushToSend(response);
|
|
connection_->pushToSend(bulk_response);
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
... | @@ -81,13 +85,13 @@ QueryWorker::run(): |
... | @@ -81,13 +85,13 @@ QueryWorker::run(): |
|
// TCP packets and sent until we exhaust query results.
|
|
// TCP packets and sent until we exhaust query results.
|
|
bulk_response = null;
|
|
bulk_response = null;
|
|
while (lease = query->getNextRow()) {
|
|
while (lease = query->getNextRow()) {
|
|
if (!response) {
|
|
if (!bulk_response) {
|
|
// First client starts with a REPLY.
|
|
// First client starts with a REPLY.
|
|
bulk_response = makeLeaseQueryReply(bulk_query, LEASEQUERY_REPLY);
|
|
bulk_response = makeLeaseQueryReply(bulk_query, LEASEQUERY_REPLY);
|
|
} else {
|
|
} else {
|
|
// If we've changed clients, start a new message.
|
|
// If we've changed clients, start a new message.
|
|
if (clientChanged(response, lease)) {
|
|
if (clientChanged(bulk_response, lease)) {
|
|
connection_->pushToSend(response);
|
|
connection_->pushToSend(bulk_response);
|
|
bulk_response = makeLeaseQueryData(bulk_query);
|
|
bulk_response = makeLeaseQueryData(bulk_query);
|
|
}
|
|
}
|
|
}
|
|
}
|
... | @@ -100,8 +104,8 @@ QueryWorker::run(): |
... | @@ -100,8 +104,8 @@ QueryWorker::run(): |
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Push last response
|
|
// Push last bulk_response
|
|
if (response && !response->empty()) {
|
|
if (bulk_response && !bulk_response->empty()) {
|
|
connection_->pushToSend(bulk_response);
|
|
connection_->pushToSend(bulk_response);
|
|
}
|
|
}
|
|
|
|
|
... | @@ -110,22 +114,59 @@ QueryWorker::run(): |
... | @@ -110,22 +114,59 @@ QueryWorker::run(): |
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
LeaseQueryConnection::pushToSend(response) {
|
|
LeaseQueryConnection::pushToSend(bulk_response) {
|
|
Buffer packed = response->pack());
|
|
Buffer packed = bulk_response->pack());
|
|
|
|
|
|
// Try to push packed message onto the current packet.
|
|
// Try to push packed message onto the current packet.
|
|
if (current_response_ && (!current_response_->push(response->getXid(), packed)) {
|
|
if (current_packet_ && (!current_packet_->push(bulk_response->getXid(), packed)) {
|
|
// Packet is full, send it
|
|
// Packet is full, send it
|
|
doWrite(current_response);
|
|
doWrite(current_packet);
|
|
} else {
|
|
} else {
|
|
// Start a new packet
|
|
// Start a new packet
|
|
current_response_ = startNewResponse();
|
|
current_packet_ = startNewResponse();
|
|
current_response_->push(response->getXid(), packed));
|
|
current_packet_->push(bulk_response->getXid(), packed));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
```
|
|
|
|
|
|
V4 processing would be very similar but not identical as the messages and contents vary from somewhat from their V6 counter-parts.
|
|
V4 processing would be very similar, really differs only in QueryWorker4::run() due to the difference in message type:
|
|
|
|
|
|
|
|
```
|
|
|
|
QueryWorker4::run():
|
|
|
|
while (query_params = getNextQuery()) {
|
|
|
|
xid = query_params->xid
|
|
|
|
|
|
|
|
// Start the bulk lease query based on dequeued parameters.
|
|
|
|
bulk_query = BulkLeaseQuery(query_params);
|
|
|
|
status = bulk_query->start();
|
|
|
|
|
|
|
|
// Query error encountered. Queue an outbound error reply.
|
|
|
|
if (status != success) {
|
|
|
|
bulk_response = makeLeaseQueryDone(bulk_query, status);
|
|
|
|
connection_->pushToSend(bulk_response);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Process each lease row (if any). Behind the scenes the query
|
|
|
|
// object will fetch the next chunk of leases as needed, even
|
|
|
|
// though we process it here one row at a time. We keep
|
|
|
|
// generating BLQ messages and queuing them to be bundled into
|
|
|
|
// TCP packets and sent until we exhaust query results.
|
|
|
|
while (lease = bulk_query->getNextRow()) {
|
|
|
|
if (isActive(lease)) {
|
|
|
|
bulk_response = makeLeaseQueryActive(bulk_query, lease)
|
|
|
|
} else {
|
|
|
|
bulk_response = makeLeaseQueryUnassigned(bulk_query, lease)
|
|
|
|
}
|
|
|
|
|
|
|
|
connection_->pushToSend(bulk_response);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Exhausted query results (if any). Always finish with a DONE.
|
|
|
|
connection_->pushToSend(makeLeaseQueryDone(xid));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
We may need a gating mechanism (e.g. condition variable in pushToSend()), based on a maximum number of bindings queued for send (per connection) so query workers do not create enormous outbound queues. In other words, we need to avoid outrunning the connection's ability to send by too avoid memory exhausting et al. In practice this may be a non-issue but it is better to allow for it now.
|
|
We may need a gating mechanism (e.g. condition variable in pushToSend()), based on a maximum number of bindings queued for send (per connection) so query workers do not create enormous outbound queues. In other words, we need to avoid outrunning the connection's ability to send by too avoid memory exhausting et al. In practice this may be a non-issue but it is better to allow for it now.
|
|
|
|
|
... | | ... | |