... | ... | @@ -64,22 +64,52 @@ QueryWorker::run(): |
|
|
xid = query_params->xid
|
|
|
|
|
|
// Start the bulk lease query based on dequeued parameters.
|
|
|
query = LeaseMgr::startBulkLeaseQuery(query_params);
|
|
|
status = LeaseMgr::startBulkLeaseQuery(query_params, bulk_query);
|
|
|
|
|
|
// Process each lease row. Behind the scenes the query object
|
|
|
// will fetch the next chunk of leases as needed, even though
|
|
|
// we process it here one row at a time.
|
|
|
// Query error encountered. Queue an outbound error reply.
|
|
|
if (status != success) {
|
|
|
response = makeErrorResponse(xid, LEASEQUERY_REPLY, status);
|
|
|
connection_->pushToSend(xid, response);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
// Process each lease row (if any). Behind the scenes the query
|
|
|
// object will fetch the next chunk of leases as needed, even
|
|
|
// though we process it here one row at a time. We keep
|
|
|
// generating BLQ messages and queuing them to be bundled into
|
|
|
// TCP packets and sent until we exhaust query results.
|
|
|
bulk_response = null;
|
|
|
while (lease = query->getNextRow()) {
|
|
|
connection_->pushToSend(xid, lease);
|
|
|
if (!response) {
|
|
|
// First client starts with a REPLY.
|
|
|
bulk_response = makeResponse(xid, LEASEQUERY_REPLY);
|
|
|
} else {
|
|
|
// If we've changed clients, start a new message.
|
|
|
if (clientChanged(response, lease)) {
|
|
|
connection_->pushToSend(response);
|
|
|
response = makeResponse(xid, LEASEQUERY_DATA);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (!addBinding(response, lease)) {
|
|
|
// No room, queue this message, start another one
|
|
|
connection_->pushToSend(response);
|
|
|
response = makeResponse(xid, LEASEQUERY_DATA);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Push last response
|
|
|
if (response && !response->empty()) {
|
|
|
connection_->pushToSend(response);
|
|
|
}
|
|
|
|
|
|
// Exhausted query results (if any)
|
|
|
connection_->pushToSend(xid, LEASEQUERYDONE);
|
|
|
// Exhausted query results (if any). Always finish with a DONE.
|
|
|
connection_->pushToSend(makeResponse(xid, LEASEQUERYDONE));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
TcpConnection::pushToSend(xid, binding) {
|
|
|
Buffer packed = packBinding(binding);
|
|
|
TcpConnection::pushToSend(xid, response) {
|
|
|
Buffer packed = response->pack());
|
|
|
|
|
|
// Try to push packed message onto the current packet.
|
|
|
if (current_response_ && (!current_response_->push(packed)) {
|
... | ... | |