querying.py 5.44 KB
Newer Older
Jelte Jansen's avatar
Jelte Jansen committed
1
2
3
4
from lettuce import *
import subprocess
import re

Jelte Jansen's avatar
Jelte Jansen committed
5
6
7
8
9
10
11
12
13
14
15
16
17
# This script provides querying functionality
# The most important step is
#
# query for <name> [type X] [class X] [to <addr>[:port]] should have rcode <rc>
#
# By default, it will send queries to 127.0.0.1:47806 unless specified
# otherwise. The rcode is always checked. If the result is not NO_ANSWER,
# the result will be stored in last_query_result, which can then be inspected
# more closely, for instance with the step
#
# last query should have <property> <value>
#

Jelte Jansen's avatar
Jelte Jansen committed
18
19
20
21
22
23
#
# define a class to easily access different parts
# We may consider using our full library for this, but for now
# simply store several parts of the response as text values in
# this structure
#
Jelte Jansen's avatar
Jelte Jansen committed
24
25
26
27
28
# The following attributes are 'parsed' from the response, all as strings,
# and end up as direct attributes of the QueryResult object:
# opcode, rcode, id, flags, qdcount, ancount, nscount, adcount
# (flags is one string with all flags)
#
Jelte Jansen's avatar
Jelte Jansen committed
29
30
# this will set 'rcode' as the result code, we 'define' one additional
# rcode, "NO_ANSWER", if the dig process returned an error code itself
Jelte Jansen's avatar
Jelte Jansen committed
31
32
33
34
35
36
37
38
39
40
41
42
# In this case none of the other attributes will be set.
#
# The different sections will be lists of strings, one for each RR in the
# section. The question section will start with ';', as per dig output
#
# See server_from_sqlite3.feature for various examples to perform queries
class QueryResult(object):
    status_re = re.compile("opcode: ([A-Z])+, status: ([A-Z]+), id: ([0-9]+)")
    flags_re = re.compile("flags: ([a-z ]+); QUERY: ([0-9]+), ANSWER: " +
                          "([0-9]+), AUTHORITY: ([0-9]+), ADDITIONAL: ([0-9]+)")

    def __init__(self, name, qtype, qclass, address, port):
43
        args = [ 'dig', '+tries=1', '@' + address, '-p', str(port) ]
Jelte Jansen's avatar
Jelte Jansen committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        if qtype is not None:
            args.append('-t')
            args.append(str(qtype))
        if qclass is not None:
            args.append('-c')
            args.append(str(qclass))
        args.append(name)
        dig_process = subprocess.Popen(args, 1, None, None, subprocess.PIPE,
                                       None)
        result = dig_process.wait()
        if result != 0:
            self.rcode = "NO_ANSWER"
        else:
            self.rcode = None
Jelte Jansen's avatar
Jelte Jansen committed
58
59
60
61
62
63
            parsing = "HEADER"
            self.question_section = []
            self.answer_section = []
            self.authority_section = []
            self.additional_section = []
            self.line_handler = self.parse_header
Jelte Jansen's avatar
Jelte Jansen committed
64
            for out in dig_process.stdout:
Jelte Jansen's avatar
Jelte Jansen committed
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
                self.line_handler(out)

    def parse_header(self, line):
        status_match = self.status_re.search(line)
        flags_match = self.flags_re.search(line)
        if status_match is not None:
            self.opcode = status_match.group(1)
            self.rcode = status_match.group(2)
        elif flags_match is not None:
            self.flags = flags_match.group(1)
            self.qdcount = flags_match.group(2)
            self.ancount = flags_match.group(3)
            self.nscount = flags_match.group(4)
            self.adcount = flags_match.group(5)
        elif line == ";; QUESTION SECTION:\n":
            self.line_handler = self.parse_question

    def parse_question(self, line):
        if line == ";; ANSWER SECTION:\n":
            self.line_handler = self.parse_answer
        elif line != "\n":
            self.question_section.append(line)

    def parse_answer(self, line):
        if line == ";; AUTHORITY SECTION:\n":
            self.line_handler = self.parse_authority
        elif line != "\n":
            self.answer_section.append(line)
Jelte Jansen's avatar
Jelte Jansen committed
93

Jelte Jansen's avatar
Jelte Jansen committed
94
95
96
97
98
    def parse_authority(self, line):
        if line == ";; ADDITIONAL SECTION:\n":
            self.line_handler = self.parse_additional
        elif line != "\n":
            self.additional_section.append(line)
Jelte Jansen's avatar
Jelte Jansen committed
99

Jelte Jansen's avatar
Jelte Jansen committed
100
101
102
103
104
105
106
107
108
    def parse_authority(self, line):
        if line.startswith(";; Query time"):
            self.line_handler = self.parse_footer
        elif line != "\n":
            self.additional_section.append(line)

    def parse_footer(self, line):
        pass

109
110
@step('A query for ([\w.]+) (?:type ([A-Z]+) )?(?:class ([A-Z]+) )?' +
      '(?:to ([^:]+)(?::([0-9]+))? )?should have rcode ([\w.]+)')
Jelte Jansen's avatar
Jelte Jansen committed
111
112
113
114
115
116
117
118
119
120
def query(step, query_name, qtype, qclass, addr, port, rcode):
    if qtype is None:
        qtype = "A"
    if qclass is None:
        qclass = "IN"
    if addr is None:
        addr = "127.0.0.1"
    if port is None:
        port = 47806
    query_result = QueryResult(query_name, qtype, qclass, addr, port)
121
122
    assert query_result.rcode == rcode,\
        "Expected: " + rcode + ", got " + query_result.rcode
Jelte Jansen's avatar
Jelte Jansen committed
123
124
    world.last_query_result = query_result

125
@step('The SOA serial for ([\w.]+) should be ([0-9]+)')
Jelte Jansen's avatar
Jelte Jansen committed
126
127
128
129
130
131
132
133
134
def query_soa(step, query_name, serial):
    query_result = QueryResult(query_name, "SOA", "IN", "127.0.0.1", "47806")
    assert "NOERROR" == query_result.rcode,\
        "Got " + query_result.rcode + ", expected NOERROR"
    assert len(query_result.answer_section) == 1,\
        "Too few or too many answers in SOA response"
    soa_parts = query_result.answer_section[0].split()
    assert serial == soa_parts[6],\
        "Got SOA serial " + soa_parts[6] + ", expected " + serial
Jelte Jansen's avatar
Jelte Jansen committed
135

136
@step('last query should have (\S+) (.+)')
Jelte Jansen's avatar
Jelte Jansen committed
137
138
139
140
141
142
def check_last_query(step, item, value):
    assert world.last_query_result is not None
    assert item in world.last_query_result.__dict__
    lq_val = world.last_query_result.__dict__[item]
    assert str(value) == str(lq_val),\
           "Got: " + str(lq_val) + ", expected: " + str(value)