... | ... | @@ -64,12 +64,13 @@ QueryWorker::run(): |
|
|
xid = query_params->xid
|
|
|
|
|
|
// Start the bulk lease query based on dequeued parameters.
|
|
|
status = LeaseMgr::startBulkLeaseQuery(query_params, bulk_query);
|
|
|
bulk_query = BulkLeaseQuery(query_params);
|
|
|
status = bulk_query->start();
|
|
|
|
|
|
// Query error encountered. Queue an outbound error reply.
|
|
|
if (status != success) {
|
|
|
response = makeErrorResponse(xid, LEASEQUERY_REPLY, status);
|
|
|
connection_->pushToSend(xid, response);
|
|
|
response = makeLeaseQueryReply(bulk_query, status);
|
|
|
connection_->pushToSend(response);
|
|
|
continue;
|
|
|
}
|
|
|
|
... | ... | @@ -82,43 +83,44 @@ QueryWorker::run(): |
|
|
while (lease = query->getNextRow()) {
|
|
|
if (!response) {
|
|
|
// First client starts with a REPLY.
|
|
|
bulk_response = makeResponse(xid, LEASEQUERY_REPLY);
|
|
|
bulk_response = makeLeaseQueryReply(bulk_query, LEASEQUERY_REPLY);
|
|
|
} else {
|
|
|
// If we've changed clients, start a new message.
|
|
|
if (clientChanged(response, lease)) {
|
|
|
connection_->pushToSend(response);
|
|
|
response = makeResponse(xid, LEASEQUERY_DATA);
|
|
|
bulk_response = makeLeaseQueryData(bulk_query);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (!addBinding(response, lease)) {
|
|
|
if (!bulk_response->addBinding(lease)) {
|
|
|
// No room, queue this message, start another one
|
|
|
connection_->pushToSend(response);
|
|
|
response = makeResponse(xid, LEASEQUERY_DATA);
|
|
|
connection_->pushToSend(bulk_response);
|
|
|
bulk_response = makeLeaseQueryData(bulk_query);
|
|
|
bulk_response->addBinding(lease));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Push last response
|
|
|
if (response && !response->empty()) {
|
|
|
connection_->pushToSend(response);
|
|
|
connection_->pushToSend(bulk_response);
|
|
|
}
|
|
|
|
|
|
// Exhausted query results (if any). Always finish with a DONE.
|
|
|
connection_->pushToSend(makeResponse(xid, LEASEQUERYDONE));
|
|
|
connection_->pushToSend(makeLeaseQueryDone(xid));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
TcpConnection::pushToSend(xid, response) {
|
|
|
TcpConnection::pushToSend(response) {
|
|
|
Buffer packed = response->pack());
|
|
|
|
|
|
// Try to push packed message onto the current packet.
|
|
|
if (current_response_ && (!current_response_->push(packed)) {
|
|
|
if (current_response_ && (!current_response_->push(response->getXid(), packed)) {
|
|
|
// Packet is full, send it
|
|
|
doWrite(current_response);
|
|
|
} else {
|
|
|
// Start a new packet
|
|
|
current_response_ = startNewResponse();
|
|
|
current_response_->push(packed));
|
|
|
current_response_->push(response->getXid(), packed));
|
|
|
}
|
|
|
}
|
|
|
```
|
... | ... | |