Commit d3d28bdc authored by Jelte Jansen's avatar Jelte Jansen
Browse files

[2713] Addressed review comments

- More tests
- a few bugfixes and refactors
- renamed methods to be private/protected
- documentation updates
parent 54cbca7f
......@@ -20,6 +20,7 @@ This tool implements user management for b10-cmdctl. It is used to
add and remove users from the accounts file.
'''
from bind10_config import SYSCONFPATH
from collections import OrderedDict
import random
from hashlib import sha1
import csv
......@@ -50,16 +51,17 @@ class UserManager:
self.options = options
self.args = args
def print(self, msg):
def __print(self, msg):
if not self.options.quiet:
print(msg)
def gen_password_hash(self, password):
salt = "".join(chr(random.randint(33, 127)) for x in range(64))
def __gen_password_hash(self, password):
salt = "".join(chr(random.randint(ord('!'), ord('~')))\
for x in range(64))
saltedpwd = sha1((password + salt).encode()).hexdigest()
return salt, saltedpwd
def read_user_info(self):
def __read_user_info(self):
"""
Read the existing user info
Raises an IOError if the file can't be read
......@@ -68,80 +70,77 @@ class UserManager:
# check that the file is modified between read and write)
# But taking multiple simultaneous users of this tool on the
# same file seems unnecessary at this point.
self.user_info = []
self.user_info = OrderedDict()
if os.path.exists(self.options.output_file):
# Just let any file read error bubble up; it will
# be caught in the run() method
with open(self.options.output_file) as csvfile:
with open(self.options.output_file, newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
self.user_info.append(row)
self.user_info[row[0]] = row
def save_userinfo(self):
def __save_user_info(self):
"""
Write out the (modified) user info
Raises an IOError if the file can't be written
"""
# Just let any file write error bubble up; it will
# be caught in the run() method
with open(self.options.output_file, 'w') as csvfile:
with open(self.options.output_file, 'w',
newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(self.user_info)
for row in self.user_info.values():
writer.writerow(row)
def username_exists(self, name):
"""
Returns True if the username exists
"""
for row in self.user_info:
if name == row[0]:
return True
return False
def add_user(self, name, password):
def __add_user(self, name, password):
"""
Add the given username/password combination to the stored user_info.
First checks if the username exists, and returns False if so.
If not, it is added, and this method returns True.
"""
if self.username_exists(name):
if name in self.user_info:
return False
salt, pw = self.gen_password_hash(password)
self.user_info.append([name, pw, salt])
salt, pw = self.__gen_password_hash(password)
self.user_info[name] = [name, pw, salt]
return True
def delete_user(self, name):
def __delete_user(self, name):
"""
Removes the row with the given name from the stored user_info
First checks if the username exists, and returns False if not.
Otherwise, it is removed, and this mehtod returns True
"""
if not self.username_exists(name):
if name not in self.user_info:
return False
new_user_info = []
for row in self.user_info:
if row[0] != name:
new_user_info.append(row)
self.user_info = new_user_info
del self.user_info[name]
return True
def prompt_for_username(self, command):
# overridable input() call, used in testing
def _input(self, prompt):
return input(prompt)
# in essence this is private, but made 'protected' for ease
# of testing
def _prompt_for_username(self, command):
# Note, direct prints here are intentional
while True :
name = input("Username to " + command + ": ")
while True:
name = self._input("Username to " + command + ": ")
if name == "":
print("Error username can't be empty")
continue
if command == COMMAND_ADD and self.username_exists(name):
if command == COMMAND_ADD and name in self.user_info:
print("user already exists")
continue
elif command == COMMAND_DELETE and not self.username_exists(name):
elif command == COMMAND_DELETE and name not in self.user_info:
print("user does not exist")
continue
return name
def prompt_for_password(self):
# in essence this is private, but made 'protected' for ease
# of testing
def _prompt_for_password(self):
# Note, direct prints here are intentional
while True:
pwd1 = getpass.getpass("Choose a password: ")
......@@ -154,58 +153,61 @@ class UserManager:
continue
return pwd1
def verify_options_and_args(self):
def __verify_options_and_args(self):
"""
Basic sanity checks on command line arguments.
Returns False if there is a problem, True if everything seems OK.
"""
if len(self.args) < 1:
self.print("Error: no command specified")
self.__print("Error: no command specified")
return False
if len(self.args) > 3:
self.print("Error: extraneous arguments")
self.__print("Error: extraneous arguments")
return False
if self.args[0] not in [ COMMAND_ADD, COMMAND_DELETE ]:
self.print("Error: command must be either add or delete")
self.__print("Error: command must be either add or delete")
return False
if self.args[0] == COMMAND_DELETE and len(self.args) > 2:
self.print("Error: delete only needs username, not a password")
self.__print("Error: delete only needs username, not a password")
return False
return True
def run(self):
if not self.verify_options_and_args():
if not self.__verify_options_and_args():
return BAD_ARGUMENTS
try:
self.print("Using accounts file: " + self.options.output_file)
self.read_user_info()
self.__print("Using accounts file: " + self.options.output_file)
self.__read_user_info()
command = self.args[0]
if len(self.args) > 1:
username = self.args[1]
else:
username = self.prompt_for_username(command)
username = self._prompt_for_username(command)
if command == COMMAND_ADD:
if len(self.args) > 2:
password = self.args[2]
else:
password = self.prompt_for_password()
if not self.add_user(username, password):
password = self.__prompt_for_password()
if not self.__add_user(username, password):
print("Error: username exists")
return USER_EXISTS
elif command == COMMAND_DELETE:
if not self.delete_user(username):
if not self.__delete_user(username):
print("Error: username does not exist")
return USER_DOES_NOT_EXIST
self.save_userinfo()
self.__save_user_info()
return 0
except IOError as ioe:
self.print("Error accessing " + ioe.filename +\
": " + str(ioe.strerror))
self.__print("Error accessing " + ioe.filename +\
": " + str(ioe.strerror))
return FILE_ERROR
except csv.Error as csve:
self.__print("Error parsing csv file: " + str(csve))
return FILE_ERROR
def set_options(parser):
......@@ -226,7 +228,10 @@ def main():
" password\t\tthe password to set for the added user\n"\
"\n"\
"If username or password are not specified, %prog will\n"\
"prompt for them."
"prompt for them. It is recommended practice to let the\n"\
"tool prompt for the password, as command-line\n"\
"arguments can be visible through history or process\n"\
"viewers."
parser = OptionParser(usage=usage, version=VERSION_STRING)
set_options(parser)
(options, args) = parser.parse_args()
......@@ -235,6 +240,5 @@ def main():
return usermgr.run()
if __name__ == '__main__':
result = main()
sys.exit(result)
sys.exit(main())
......@@ -90,7 +90,9 @@
<para>
If a username and password are given (or just a username in case of
deletion), these are used. Otherwise, the tool shall prompt for a
username and/or password.
username and/or password. It is recommended practice to let the
tool prompt for the password, as command-line arguments can be
visible through history or process viewers.
</para>
<variablelist>
......@@ -107,7 +109,7 @@
<term><option>-f <replaceable>filename</replaceable></option></term>
<term><option>--file <replaceable>filename</replaceable></option></term>
<listitem><para>
Define the filename to append the account to. The default is
Specify the accounts file to update. The default is
<filename>cmdctl-accounts.csv</filename> in the system config
directory.
</para></listitem>
......
......@@ -2,14 +2,6 @@ PYCOVERAGE_RUN=@PYCOVERAGE_RUN@
PYTESTS = b10-cmdctl-usermgr_test.py
EXTRA_DIST = $(PYTESTS)
# If necessary (rare cases), explicitly specify paths to dynamic libraries
# required by loadable python modules.
#LIBRARY_PATH_PLACEHOLDER =
#if SET_ENV_LIBRARY_PATH
#LIBRARY_PATH_PLACEHOLDER += $(ENV_LIBRARY_PATH)=$(abs_top_builddir)/src/lib/cryptolink/.libs:$(abs_top_builddir)/src/lib/dns/.libs:$(abs_top_builddir)/src/lib/dns/python/.libs:$(abs_top_builddir)/src/lib/cc/.libs:$(abs_top_builddir)/src/lib/config/.libs:$(abs_top_builddir)/src/lib/log/.libs:$(abs_top_builddir)/src/lib/util/.libs:$(abs_top_builddir)/src/lib/exceptions/.libs:$(abs_top_builddir)/src/lib/util/io/.libs:$(abs_top_builddir)/src/lib/datasrc/.libs:$$$(ENV_LIBRARY_PATH)
#endif
#CLEANFILES = test-keyfile.pem test-certfile.pem
CLEANFILES = *.csv
# test using command-line arguments, so use check-local target instead of TESTS
......
......@@ -15,21 +15,77 @@
import csv
from hashlib import sha1
import getpass
import imp
import os
import subprocess
import stat
import sys
import unittest
from bind10_config import SYSCONFPATH
def run(command):
class PrintCatcher:
def __init__(self):
self.stdout_lines = []
def __enter__(self):
self.orig_stdout_write = sys.stdout.write
def new_write(line):
self.stdout_lines.append(line)
sys.stdout.write = new_write
return self
def __exit__(self, type, value, traceback):
sys.stdout.write = self.orig_stdout_write
return
class OverrideGetpass:
def __init__(self, new_getpass):
self.new_getpass = new_getpass
self.orig_getpass = getpass.getpass
def __enter__(self):
getpass.getpass = self.new_getpass
return self
def __exit__(self, type, value, traceback):
getpass.getpass = self.orig_getpass
# input() is a built-in function and not easily overridable
# so this one uses usermgr for that
class OverrideInput:
def __init__(self, usermgr, new_getpass):
self.usermgr = usermgr
self.new_input = new_getpass
self.orig_input = usermgr._input
def __enter__(self):
self.usermgr._input = self.new_input
return self
def __exit__(self, type, value, traceback):
self.usermgr._input = self.orig_input
def run(command, input=None):
"""
Small helper function that returns a tuple of (rcode, stdout, stderr)
after running the given command (an array of command and arguments, as
passed on to subprocess).
Parameters:
command: an array of command and argument strings, which will be
passed to subprocess.Popen()
input: if not None, a string that is written to the process stdin
stream
"""
if input is not None:
stdin = subprocess.PIPE
else:
stdin = None
subp = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stderr=subprocess.PIPE, stdin=stdin)
if input is not None:
subp.stdin.write(bytes(input, 'UTF-8'))
(stdout, stderr) = subp.communicate()
return (subp.returncode, stdout, stderr)
......@@ -39,6 +95,11 @@ class TestUserMgr(unittest.TestCase):
def setUp(self):
self.delete_output_file()
# For access to the actual module, we load it directly
self.usermgr_module = imp.load_source('usermgr',
'../b10-cmdctl-usermgr.py')
# And instantiate 1 instance (with fake options/args)
self.usermgr = self.usermgr_module.UserManager(object(), object())
def tearDown(self):
self.delete_output_file()
......@@ -51,10 +112,9 @@ class TestUserMgr(unittest.TestCase):
self.assertTrue(os.path.exists(self.OUTPUT_FILE))
csv_entries = []
with open(self.OUTPUT_FILE) as csvfile:
with open(self.OUTPUT_FILE, newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
csv_entries.append(row)
csv_entries = [row for row in reader]
self.assertEqual(len(expected_content), len(csv_entries))
csv_entries.reverse()
......@@ -73,7 +133,7 @@ class TestUserMgr(unittest.TestCase):
self.assertEqual(expected_hash, entry_hash)
def run_check(self, expected_returncode, expected_stdout, expected_stderr,
command):
command, stdin=None):
"""
Runs the given command, and checks return code, and outputs (if provided).
Arguments:
......@@ -83,7 +143,7 @@ class TestUserMgr(unittest.TestCase):
expected_stderr, (multiline) string that is checked against stderr.
May be None, in which case the check is skipped.
"""
(returncode, stdout, stderr) = run(command)
(returncode, stdout, stderr) = run(command, stdin)
if expected_stderr is not None:
self.assertEqual(expected_stderr, stderr.decode())
if expected_stdout is not None:
......@@ -100,7 +160,10 @@ Arguments:
password the password to set for the added user
If username or password are not specified, b10-cmdctl-usermgr will
prompt for them.
prompt for them. It is recommended practice to let the
tool prompt for the password, as command-line
arguments can be visible through history or process
viewers.
Options:
--version show program's version number and exit
......@@ -224,14 +287,102 @@ Options:
def test_default_file(self):
"""
A few checks that couldn't be done though external calls
of the tool.
Check the default file is the correct one.
Only checks the internal variable, as we don't want to overwrite
the actual file here
Check that the prompting methods do verification
"""
# Hardcoded path .. should be ok since this is run from make check
usermgr = imp.load_source('usermgr', '../b10-cmdctl-usermgr.py')
self.assertEqual(SYSCONFPATH + '/cmdctl-accounts.csv',
usermgr.DEFAULT_FILE)
self.usermgr_module.DEFAULT_FILE)
def test_prompt_for_password_different(self):
# returns a different string (the representation of the number
# of times it has been called), until it has been called
# over 10 times, in which case it will always return "11"
getpass_different_called = 0
def getpass_different(question):
nonlocal getpass_different_called
getpass_different_called = getpass_different_called + 1
if getpass_different_called > 10:
return "11"
else:
return str(getpass_different_called)
with PrintCatcher() as pc:
with OverrideGetpass(getpass_different):
pwd = self.usermgr._prompt_for_password()
self.assertEqual(12, getpass_different_called)
self.assertEqual("11", pwd)
# stdout should be 5 times the no match string;
expected_output = "passwords do not match, try again\n"*5
self.assertEqual(expected_output, ''.join(pc.stdout_lines))
def test_prompt_for_password_empty(self):
# returns an empty string until it has been called over 10
# times
getpass_empty_called = 0
def getpass_empty(prompt):
nonlocal getpass_empty_called
getpass_empty_called = getpass_empty_called + 1
if getpass_empty_called > 10:
return "nonempty"
else:
return ""
usermgr_module = imp.load_source('usermgr',
'../b10-cmdctl-usermgr.py')
options = object()
args = object()
usermgr = usermgr_module.UserManager(options, args)
with PrintCatcher() as pc:
with OverrideGetpass(getpass_empty):
pwd = usermgr._prompt_for_password()
self.assertEqual("nonempty", pwd)
self.assertEqual(12, getpass_empty_called)
# stdout should be 10 times the 'cannot be empty' string
expected_output = "Error: password cannot be empty\n"*10
self.assertEqual(expected_output, ''.join(pc.stdout_lines))
def test_prompt_for_user(self):
new_input_called = 0
input_results = [ '', '', 'existinguser', 'nonexistinguser',
'', '', 'nonexistinguser', 'existinguser' ]
def new_input(prompt):
nonlocal new_input_called
if new_input_called < len(input_results):
result = input_results[new_input_called]
else:
result = 'empty'
new_input_called += 1
return result
# add fake user (value doesn't matter, method only checks for key)
self.usermgr.user_info = { 'existinguser': None }
expected_output = ''
with PrintCatcher() as pc:
with OverrideInput(self.usermgr, new_input):
# should skip the first three since empty or existing
# are not allowed, then return 'nonexistinguser'
username = self.usermgr._prompt_for_username(
self.usermgr_module.COMMAND_ADD)
self.assertEqual('nonexistinguser', username)
expected_output += "Error username can't be empty\n"*2
expected_output += "user already exists\n"
self.assertEqual(expected_output, ''.join(pc.stdout_lines))
# For delete, should again not accept empty (in a while true
# loop), and this time should not accept nonexisting users
username = self.usermgr._prompt_for_username(
self.usermgr_module.COMMAND_DELETE)
self.assertEqual('existinguser', username)
expected_output += "Error username can't be empty\n"*2
expected_output += "user does not exist\n"
self.assertEqual(expected_output, ''.join(pc.stdout_lines))
def test_bad_file(self):
"""
......@@ -291,6 +442,46 @@ Options:
'delete', 'user1'
])
def test_missing_fields(self):
"""
Test that an invalid csv file is handled gracefully
"""
# Valid but incomplete csv; should be handled
# correctly
with open(self.OUTPUT_FILE, 'w', newline='') as f:
f.write('onlyuserfield\n')
f.write('userfield,saltfield\n')
f.write(',emptyuserfield,passwordfield\n')
self.run_check(0, None, None,
[ self.TOOL,
'-f', self.OUTPUT_FILE,
'add', 'user1', 'pass1'
])
self.run_check(0, None, None,
[ self.TOOL,
'-f', self.OUTPUT_FILE,
'delete', 'onlyuserfield'
])
self.run_check(0, None, None,
[ self.TOOL,
'-f', self.OUTPUT_FILE,
'delete', ''
])
def test_bad_data(self):
# I can only think of one invalid format, an unclosed string
with open(self.OUTPUT_FILE, 'w', newline='') as f:
f.write('a,"\n')
self.run_check(2,
'Using accounts file: test_users.csv\n'
'Error parsing csv file: newline inside string\n',
'',
[ self.TOOL,
'-f', self.OUTPUT_FILE,
'add', 'user1', 'pass1'
])
if __name__== '__main__':
unittest.main()
......
......@@ -51,6 +51,9 @@ def reload():
# When "FROM_SOURCE", it lists the directories where the programs are
# built so that when BIND 10 is experimentally started on the source
# tree the programs in the tree (not installed ones) will be used.
# SYSCONFPATH: Path where the system-wide configuration files are
# stored (e.g. <prefix>/var/<package name>). This value is *not*
# overwritten if B10_FROM_SOURCE is specified.
#
# B10_FROM_SOURCE_LOCALSTATEDIR is specifically intended to be used for
# tests where we want to use various types of configuration within the test
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment