[1290] expand querying tool

......@@ -2,18 +2,45 @@ from lettuce import *
import subprocess
import re
# This script provides querying functionality
# The most important step is
# query for <name> [type X] [class X] [to <addr>[:port]] should have rcode <rc>
# By default, it will send queries to unless specified
# otherwise. The rcode is always checked. If the result is not NO_ANSWER,
# the result will be stored in last_query_result, which can then be inspected
# more closely, for instance with the step
# last query should have <property> <value>
# define a class to easily access different parts
# We may consider using our full library for this, but for now
# simply store several parts of the response as text values in
# this structure
# The following attributes are 'parsed' from the response, all as strings,
# and end up as direct attributes of the QueryResult object:
# opcode, rcode, id, flags, qdcount, ancount, nscount, adcount
# (flags is one string with all flags)
# this will set 'rcode' as the result code, we 'define' one additional
# rcode, "NO_ANSWER", if the dig process returned an error code itself
# we will extend as necessary
class QueryResult:
def __init__(self, name, qtype = None, qclass = None, port = 47806):
args = [ 'dig', '@localhost', '-p', str(port) ]
# In this case none of the other attributes will be set.
# The different sections will be lists of strings, one for each RR in the
# section. The question section will start with ';', as per dig output
# See server_from_sqlite3.feature for various examples to perform queries
class QueryResult(object):
status_re = re.compile("opcode: ([A-Z])+, status: ([A-Z]+), id: ([0-9]+)")
flags_re = re.compile("flags: ([a-z ]+); QUERY: ([0-9]+), ANSWER: " +
"([0-9]+), AUTHORITY: ([0-9]+), ADDITIONAL: ([0-9]+)")
def __init__(self, name, qtype, qclass, address, port):
args = [ 'dig', '@' + address, '-p', str(port) ]
if qtype is not None:
......@@ -27,16 +54,88 @@ class QueryResult:
if result != 0:
self.rcode = "NO_ANSWER"
rcode_re = re.compile("status: ([A-Z]+)")
self.rcode = None
parsing = "HEADER"
self.question_section = []
self.answer_section = []
self.authority_section = []
self.additional_section = []
self.line_handler = self.parse_header
for out in dig_process.stdout:
rcode_match =
if rcode_match is not None:
self.rcode =
def parse_header(self, line):
status_match =
flags_match =
if status_match is not None:
self.opcode =
self.rcode =
elif flags_match is not None:
self.flags =
self.qdcount =
self.ancount =
self.nscount =
self.adcount =
elif line == ";; QUESTION SECTION:\n":
self.line_handler = self.parse_question
def parse_question(self, line):
if line == ";; ANSWER SECTION:\n":
self.line_handler = self.parse_answer
elif line != "\n":
def parse_answer(self, line):
if line == ";; AUTHORITY SECTION:\n":
self.line_handler = self.parse_authority
elif line != "\n":
def parse_authority(self, line):
if line == ";; ADDITIONAL SECTION:\n":
self.line_handler = self.parse_additional
elif line != "\n":
@step(u'A query for ([\w.]+) should have rcode ([\w.]+)')
def query(step, query_name, rcode):
query_result = QueryResult(query_name)
def parse_authority(self, line):
if line.startswith(";; Query time"):
self.line_handler = self.parse_footer
elif line != "\n":
def parse_footer(self, line):
@step(u'A query for ([\w.]+) (?:type ([A-Z]+) )?(?:class ([A-Z]+) )?' +
'(?:to ([^:]+)(?::([0-9]+))? )?should have rcode ([\w.]+)')
def query(step, query_name, qtype, qclass, addr, port, rcode):
if qtype is None:
qtype = "A"
if qclass is None:
qclass = "IN"
if addr is None:
addr = ""
if port is None:
port = 47806
query_result = QueryResult(query_name, qtype, qclass, addr, port)
assert query_result.rcode == rcode, "Got " + query_result.rcode
world.last_query_result = query_result
@step(u'The SOA serial for ([\w.]+) should be ([0-9]+)')
def query_soa(step, query_name, serial):
query_result = QueryResult(query_name, "SOA", "IN", "", "47806")
assert "NOERROR" == query_result.rcode,\
"Got " + query_result.rcode + ", expected NOERROR"
assert len(query_result.answer_section) == 1,\
"Too few or too many answers in SOA response"
soa_parts = query_result.answer_section[0].split()
assert serial == soa_parts[6],\
"Got SOA serial " + soa_parts[6] + ", expected " + serial
@step(u'last query should have (\S+) (.+)')
def check_last_query(step, item, value):
assert world.last_query_result is not None
assert item in world.last_query_result.__dict__
lq_val = world.last_query_result.__dict__[item]
assert str(value) == str(lq_val),\
"Got: " + str(lq_val) + ", expected: " + str(value)
......@@ -11,11 +11,35 @@ Feature: SQLite3 backend
I should see a database file
Scenario: queries
# This scenario performs a number of queries and inspects the results
# This is not only to test, but also to show the different options
# we have to inspect the data
When I start bind10 with configuration
Then wait for bind10 auth to start
A query for should have rcode REFUSED
A query for should have rcode NOERROR
The last query should have qdcount 1
The last query should have ancount 1
The last query should have nscount 3
The last query should have adcount 0
The SOA serial for should be 1234
A query for should have rcode NXDOMAIN
The last query should have qdcount 1
The last query should have ancount 0
The last query should have nscount 1
The last query should have adcount 0
The last query should have flags qr aa rd
A query for type TXT should have rcode NOERROR
The last query should have ancount 0
A query for class CH should have rcode REFUSED
A query for to should have rcode NOERROR
A query for to should have rcode NOERROR
A query for type A class IN to should have rcode NOERROR
Scenario: changing database
# This scenario contains a lot of 'wait for' steps
......@@ -13,6 +13,7 @@ def initialize(feature):
# run the bind10 instance
world.bind10 = None
world.bind10_output = []
world.last_query_result = None
# Some tests can modify the settings. If the tests fail half-way, or
# don't clean up, this can leave configurations or data in a bad state,
