nut-debian/debian/tests/testlib.py

1145 lines
39 KiB
Python
Raw Normal View History

#
# testlib.py quality assurance test script
# Copyright (C) 2008-2011 Canonical Ltd.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
'''Common classes and functions for package tests.'''
import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
import sys, socket, gzip
from stat import *
import warnings
warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
try:
import apt_pkg
apt_pkg.InitSystem();
except:
# On non-Debian system, fall back to simple comparison without debianisms
class apt_pkg(object):
def VersionCompare(one, two):
list_one = one.split('.')
list_two = two.split('.')
while len(list_one)>0 and len(list_two)>0:
if list_one[0] > list_two[0]:
return 1
if list_one[0] < list_two[0]:
return -1
list_one.pop(0)
list_two.pop(0)
return 0
bogus_nxdomain = "208.69.32.132"
# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
# This is needed so that the subprocesses that produce endless output
# actually quit when the reader goes away.
import signal
def subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
class TimedOutException(Exception):
def __init__(self, value = "Timed Out"):
self.value = value
def __str__(self):
return repr(self.value)
def _restore_backup(path):
pathbackup = path + '.autotest'
if os.path.exists(pathbackup):
shutil.move(pathbackup, path)
def _save_backup(path):
pathbackup = path + '.autotest'
if os.path.exists(path) and not os.path.exists(pathbackup):
shutil.copy2(path, pathbackup)
# copy2 does not copy ownership, so do it here.
# Reference: http://docs.python.org/library/shutil.html
a = os.stat(path)
os.chown(pathbackup, a[4], a[5])
def config_copydir(path):
if os.path.exists(path) and not os.path.isdir(path):
raise OSError("'%s' is not a directory" % (path))
_restore_backup(path)
pathbackup = path + '.autotest'
if os.path.exists(path):
shutil.copytree(path, pathbackup, symlinks=True)
def config_replace(path,contents,append=False):
'''Replace (or append) to a config file'''
_restore_backup(path)
if os.path.exists(path):
_save_backup(path)
if append:
contents = open(path).read() + contents
open(path, 'w').write(contents)
def config_comment(path, field):
_save_backup(path)
contents = ""
for line in open(path).readlines():
if re.search("^\s*%s\s*=" % (field), line):
line = "#" + line
contents += line
open(path+'.new', 'w').write(contents)
os.rename(path+'.new', path)
def config_set(path, field, value, spaces=True):
_save_backup(path)
contents = ""
if spaces==True:
setting = '%s = %s\n' % (field, value)
else:
setting = '%s=%s\n' % (field, value)
found = False
for line in open(path).readlines():
if re.search("^\s*%s\s*=" % (field), line):
found = True
line = setting
contents += line
if not found:
contents += setting
open(path+'.new', 'w').write(contents)
os.rename(path+'.new', path)
def config_patch(path, patch, depth=1):
'''Patch a config file'''
_restore_backup(path)
_save_backup(path)
handle, name = mkstemp_fill(patch)
rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
os.unlink(name)
if rc != 0:
raise Exception("Patch failed")
def config_restore(path):
'''Rename a replaced config file back to its initial state'''
_restore_backup(path)
def timeout(secs, f, *args):
def handler(signum, frame):
raise TimedOutException()
old = signal.signal(signal.SIGALRM, handler)
result = None
signal.alarm(secs)
try:
result = f(*args)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old)
return result
def require_nonroot():
if os.geteuid() == 0:
print("This series of tests should be run as a regular user with sudo access, not as root.", file=sys.stderr)
sys.exit(1)
def require_root():
if os.geteuid() != 0:
print("This series of tests should be run with root privileges (e.g. via sudo).", file=sys.stderr)
sys.exit(1)
def require_sudo():
if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
print("This series of tests must be run under sudo.", file=sys.stderr)
sys.exit(1)
if os.environ['SUDO_USER'] == 'root':
print('Please run this test using sudo from a regular user. (You ran sudo from root.)', file=sys.stderr)
sys.exit(1)
def random_string(length,lower=False):
'''Return a random string, consisting of ASCII letters, with given
length.'''
s = ''
selection = string.letters
if lower:
selection = string.lowercase
maxind = len(selection)-1
for l in range(length):
s += selection[random.randint(0, maxind)]
return s
def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
'''As tempfile.mkstemp does, return a (file, name) pair, but with
prefilled contents.'''
handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
os.close(handle)
handle = open(name,"w+")
handle.write(contents)
handle.flush()
handle.seek(0)
return handle, name
def create_fill(path, contents, mode=0o644):
'''Safely create a page'''
# make the temp file in the same dir as the destination file so we
# don't get invalid cross-device link errors when we rename
handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
handle.close()
os.rename(name, path)
os.chmod(path, mode)
def login_exists(login):
'''Checks whether the given login exists on the system.'''
try:
pwd.getpwnam(login)
return True
except KeyError:
return False
def group_exists(group):
'''Checks whether the given login exists on the system.'''
try:
grp.getgrnam(group)
return True
except KeyError:
return False
def recursive_rm(dirPath, contents_only=False):
'''recursively remove directory'''
names = os.listdir(dirPath)
for name in names:
path = os.path.join(dirPath, name)
if os.path.islink(path) or not os.path.isdir(path):
os.unlink(path)
else:
recursive_rm(path)
if contents_only == False:
os.rmdir(dirPath)
def check_pidfile(exe, pidfile):
'''Checks if pid in pidfile is running'''
if not os.path.exists(pidfile):
return False
# get the pid
try:
fd = open(pidfile, 'r')
pid = fd.readline().rstrip('\n')
fd.close()
except:
return False
return check_pid(exe, pid)
def check_pid(exe, pid):
'''Checks if pid is running'''
cmdline = "/proc/%s/cmdline" % (str(pid))
if not os.path.exists(cmdline):
return False
# get the command line
try:
fd = open(cmdline, 'r')
tmp = fd.readline().split('\0')
fd.close()
except:
return False
# this allows us to match absolute paths or just the executable name
if re.match('^' + exe + '$', tmp[0]) or \
re.match('.*/' + exe + '$', tmp[0]) or \
re.match('^' + exe + ': ', tmp[0]) or \
re.match('^\(' + exe + '\)', tmp[0]):
return True
return False
def check_port(port, proto, ver=4):
'''Check if something is listening on the specified port.
WARNING: for some reason this does not work with a bind mounted /proc
'''
assert (port >= 1)
assert (port <= 65535)
assert (proto.lower() == "tcp" or proto.lower() == "udp")
assert (ver == 4 or ver == 6)
fn = "/proc/net/%s" % (proto)
if ver == 6:
fn += str(ver)
rc, report = cmd(['cat', fn])
assert (rc == 0)
hport = "%0.4x" % port
if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
return True
return False
def get_arch():
'''Get the current architecture'''
rc, report = cmd(['uname', '-m'])
assert (rc == 0)
return report.strip()
def get_memory():
'''Gets total ram and swap'''
meminfo = "/proc/meminfo"
memtotal = 0
swaptotal = 0
if not os.path.exists(meminfo):
return (False, False)
try:
fd = open(meminfo, 'r')
for line in fd.readlines():
splitline = line.split()
if splitline[0] == 'MemTotal:':
memtotal = int(splitline[1])
elif splitline[0] == 'SwapTotal:':
swaptotal = int(splitline[1])
fd.close()
except:
return (False, False)
return (memtotal,swaptotal)
def is_running_in_vm():
'''Check if running under a VM'''
# add other virtualization environments here
for search in ['QEMU Virtual CPU']:
rc, report = cmd_pipe(['dmesg'], ['grep', search])
if rc == 0:
return True
return False
def ubuntu_release():
'''Get the Ubuntu release'''
f = "/etc/lsb-release"
try:
size = os.stat(f)[ST_SIZE]
except:
return "UNKNOWN"
if size > 1024*1024:
raise IOError('Could not open "%s" (too big)' % f)
try:
fh = open("/etc/lsb-release", 'r')
except:
raise
lines = fh.readlines()
fh.close()
pat = re.compile(r'DISTRIB_CODENAME')
for line in lines:
if pat.search(line):
return line.split('=')[1].rstrip('\n').rstrip('\r')
return "UNKNOWN"
def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
'''Try to execute given command (array) and return its stdout, or return
a textual error if it failed.'''
try:
sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup, universal_newlines=True)
except OSError as e:
return [127, str(e)]
out, outerr = sp.communicate(input)
# Handle redirection of stdout
if out == None:
out = ''
# Handle redirection of stderr
if outerr == None:
outerr = ''
return [sp.returncode,out+outerr]
def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
'''Try to pipe command1 into command2.'''
try:
sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
except OSError as e:
return [127, str(e)]
out = sp2.communicate(input)[0]
return [sp2.returncode,out]
def cwd_has_enough_space(cdir, total_bytes):
'''Determine if the partition of the current working directory has 'bytes'
free.'''
rc, df_output = cmd(['df'])
result = 'Got exit code %d, expected %d\n' % (rc, 0)
if rc != 0:
return False
kb = total_bytes / 1024
mounts = dict()
for line in df_output.splitlines():
if '/' not in line:
continue
tmp = line.split()
mounts[tmp[5]] = int(tmp[3])
cdir = os.getcwd()
while cdir != '/':
if cdir not in mounts:
cdir = os.path.dirname(cdir)
continue
if kb < mounts[cdir]:
return True
else:
return False
if kb < mounts['/']:
return True
return False
def get_md5(filename):
'''Gets the md5sum of the file specified'''
(rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
expected = 0
assert (expected == rc)
return report.split(' ')[0]
def dpkg_compare_installed_version(pkg, check, version):
'''Gets the version for the installed package, and compares it to the
specified version.
'''
(rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
assert (rc == 0)
assert ("Status: install ok installed" in report)
installed_version = ""
for line in report.splitlines():
if line.startswith("Version: "):
installed_version = line.split()[1]
assert (installed_version != "")
(rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
assert (rc == 0 or rc == 1)
if rc == 0:
return True
return False
def prepare_source(source, builder, cached_src, build_src, patch_system):
'''Download and unpack source package, installing necessary build depends,
adjusting the permissions for the 'builder' user, and returning the
directory of the unpacked source. Patch system can be one of:
- cdbs
- dpatch
- quilt
- quiltv3
- None (not the string)
This is normally used like this:
def setUp(self):
...
self.topdir = os.getcwd()
self.cached_src = os.path.join(os.getcwd(), "source")
self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
self.builder = testlib.TestUser()
testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
os.chmod(self.tmpdir, 0775)
def tearDown(self):
...
self.builder = None
self.topdir = os.getcwd()
if os.path.exists(self.tmpdir):
testlib.recursive_rm(self.tmpdir)
def test_suite_build(self):
...
build_dir = testlib.prepare_source('foo', \
self.builder, \
self.cached_src, \
os.path.join(self.tmpdir, \
os.path.basename(self.cached_src)),
"quilt")
os.chdir(build_dir)
# Example for typical build, adjust as necessary
print ""
print " make clean"
rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
print " configure"
rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
print " make (will take a while)"
rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
print " make check (will take a while)",
rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
self.assertEquals(expected, rc, result + report)
def test_suite_cleanup(self):
...
if os.path.exists(self.cached_src):
testlib.recursive_rm(self.cached_src)
It is up to the caller to clean up cached_src and build_src (as in the
above example, often the build_src is in a tmpdir that is cleaned in
tearDown() and the cached_src is cleaned in a one time clean-up
operation (eg 'test_suite_cleanup()) which must be run after the build
suite test (obviously).
'''
# Make sure we have a clean slate
assert (os.path.exists(os.path.dirname(build_src)))
assert (not os.path.exists(build_src))
cdir = os.getcwd()
if os.path.exists(cached_src):
shutil.copytree(cached_src, build_src)
os.chdir(build_src)
else:
# Only install the build dependencies on the initial setup
rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
assert (rc == 0)
os.makedirs(build_src)
os.chdir(build_src)
# These are always needed
pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
assert (rc == 0)
rc, report = cmd(['apt-get','source',source])
assert (rc == 0)
shutil.copytree(build_src, cached_src)
unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
# Now apply the patches. Do it here so that we don't mess up our cached
# sources.
os.chdir(unpacked_dir)
assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
if patch_system != None and patch_system != "quiltv3":
if patch_system == "quilt":
os.environ.setdefault('QUILT_PATCHES','debian/patches')
rc, report = cmd(['quilt', 'push', '-a'])
assert (rc == 0)
elif patch_system == "cdbs":
rc, report = cmd(['./debian/rules', 'apply-patches'])
assert (rc == 0)
elif patch_system == "dpatch":
rc, report = cmd(['dpatch', 'apply-all'])
assert (rc == 0)
cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
os.chdir(cdir)
return unpacked_dir
def _aa_status():
'''Get aa-status output'''
exe = "/usr/sbin/aa-status"
assert (os.path.exists(exe))
if os.geteuid() == 0:
return cmd([exe])
return cmd(['sudo', exe])
def is_apparmor_loaded(path):
'''Check if profile is loaded'''
rc, report = _aa_status()
if rc != 0:
return False
for line in report.splitlines():
if line.endswith(path):
return True
return False
def is_apparmor_confined(path):
'''Check if application is confined'''
rc, report = _aa_status()
if rc != 0:
return False
for line in report.splitlines():
if re.search('%s \(' % path, line):
return True
return False
def check_apparmor(path, first_ubuntu_release, is_running=True):
'''Check if path is loaded and confined for everything higher than the
first Ubuntu release specified.
Usage:
rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
if rc < 0:
return self._skipped(report)
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
self.assertEquals(expected, rc, result + report)
'''
global manager
rc = -1
if manager.lsb_release["Release"] < first_ubuntu_release:
return (rc, "Skipped apparmor check")
if not os.path.exists('/sbin/apparmor_parser'):
return (rc, "Skipped (couldn't find apparmor_parser)")
rc = 0
msg = ""
if not is_apparmor_loaded(path):
rc = 1
msg = "Profile not loaded for '%s'" % path
# this check only makes sense it the 'path' is currently executing
if is_running and rc == 0 and not is_apparmor_confined(path):
rc = 1
msg = "'%s' is not running in enforce mode" % path
return (rc, msg)
def get_gcc_version(gcc, full=True):
gcc_version = 'none'
if not gcc.startswith('/'):
gcc = '/usr/bin/%s' % (gcc)
if os.path.exists(gcc):
gcc_version = 'unknown'
lines = cmd([gcc,'-v'])[1].strip().splitlines()
version_lines = [x for x in lines if x.startswith('gcc version')]
if len(version_lines) == 1:
gcc_version = " ".join(version_lines[0].split()[2:])
if not full:
return gcc_version.split()[0]
return gcc_version
def is_kdeinit_running():
'''Test if kdeinit is running'''
# applications that use kdeinit will spawn it if it isn't running in the
# test. This is a problem because it does not exit. This is a helper to
# check for it.
rc, report = cmd(['ps', 'x'])
if 'kdeinit4 Running' not in report:
print(("kdeinit not running (you may start/stop any KDE application then run this script again)"), file=sys.stderr)
return False
return True
def get_pkgconfig_flags(libs=[]):
'''Find pkg-config flags for libraries'''
assert (len(libs) > 0)
rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
expected = 0
if rc != expected:
print('Got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr)
assert(rc == expected)
return pkg_config.split()
class TestDaemon:
'''Helper class to manage daemons consistently'''
def __init__(self, init):
'''Setup daemon attributes'''
self.initscript = init
def start(self):
'''Start daemon'''
rc, report = cmd([self.initscript, 'start'])
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
time.sleep(2)
if expected != rc:
return (False, result + report)
if "fail" in report:
return (False, "Found 'fail' in report\n" + report)
return (True, "")
def stop(self):
'''Stop daemon'''
rc, report = cmd([self.initscript, 'stop'])
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
if expected != rc:
return (False, result + report)
if "fail" in report:
return (False, "Found 'fail' in report\n" + report)
return (True, "")
def reload(self):
'''Reload daemon'''
rc, report = cmd([self.initscript, 'force-reload'])
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
if expected != rc:
return (False, result + report)
if "fail" in report:
return (False, "Found 'fail' in report\n" + report)
return (True, "")
def restart(self):
'''Restart daemon'''
(res, str) = self.stop()
if not res:
return (res, str)
(res, str) = self.start()
if not res:
return (res, str)
return (True, "")
def status(self):
'''Check daemon status'''
rc, report = cmd([self.initscript, 'status'])
expected = 0
result = 'Got exit code %d, expected %d\n' % (rc, expected)
if expected != rc:
return (False, result + report)
if "fail" in report:
return (False, "Found 'fail' in report\n" + report)
return (True, "")
class TestlibManager(object):
'''Singleton class used to set up per-test-run information'''
def __init__(self):
# Set glibc aborts to dump to stderr instead of the tty so test output
# is more sane.
os.environ.setdefault('LIBC_FATAL_STDERR_','1')
# check verbosity
self.verbosity = False
if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
self.verbosity = True
# Load LSB release file
self.lsb_release = dict()
if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
raise OSError("Please install 'lsb-release'")
for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE, universal_newlines=True).communicate()[0].splitlines():
field, value = line.split(':',1)
value=value.strip()
field=field.strip()
# Convert numerics
try:
value = float(value)
except:
pass
self.lsb_release.setdefault(field,value)
# FIXME: hack OEM releases into known-Ubuntu versions
if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
if self.lsb_release['Release'] == 1.0:
self.lsb_release['Distributor ID'] = "Ubuntu"
self.lsb_release['Release'] = 8.04
else:
raise OSError("Unknown version of HP MIE")
# FIXME: hack to assume a most-recent release if we're not
# running under Ubuntu.
if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
self.lsb_release['Release'] = 10000
# Adjust Linaro release to pretend to be Ubuntu
if self.lsb_release['Distributor ID'] in ["Linaro"]:
self.lsb_release['Distributor ID'] = "Ubuntu"
self.lsb_release['Release'] -= 0.01
# Load arch
if not os.path.exists('/usr/bin/dpkg'):
machine = cmd(['uname','-m'])[1].strip()
if machine.endswith('86'):
self.dpkg_arch = 'i386'
elif machine.endswith('_64'):
self.dpkg_arch = 'amd64'
elif machine.startswith('arm'):
self.dpkg_arch = 'armel'
else:
raise ValueError("Unknown machine type '%s'" % (machine))
else:
self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
# Find kernel version
self.kernel_is_ubuntu = False
self.kernel_version_signature = None
self.kernel_version = cmd(["uname","-r"])[1].strip()
versig = '/proc/version_signature'
if os.path.exists(versig):
self.kernel_is_ubuntu = True
with open(versig) as f:
self.kernel_version_signature = f.read().strip()
self.kernel_version_ubuntu = self.kernel_version
elif os.path.exists('/usr/bin/dpkg'):
# this can easily be inaccurate but is only an issue for Dapper
rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
if rc == 0:
self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
self.kernel_version_ubuntu = self.kernel_version_signature
if self.kernel_version_signature == None:
# Attempt to fall back to something for non-Debian-based
self.kernel_version_signature = self.kernel_version
self.kernel_version_ubuntu = self.kernel_version
# Build ubuntu version without hardware suffix
try:
self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
except:
pass
# Find gcc version
self.gcc_version = get_gcc_version('gcc')
# Find libc
self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
# Report self
if self.verbosity:
kernel = self.kernel_version_ubuntu
if kernel != self.kernel_version_signature:
kernel += " (%s)" % (self.kernel_version_signature)
print("Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
sys.argv[0],
self.lsb_release['Distributor ID'],
self.lsb_release['Release'],
kernel,
self.dpkg_arch,
os.geteuid(), os.getuid(),
os.environ.get('SUDO_USER', '')), file=sys.stdout)
sys.stdout.flush()
# Additional heuristics
#if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
# sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
# sys.stdout.flush()
# time.sleep(0.5)
# sys.stdout.write("destroyed\n")
# time.sleep(0.5)
def hello(self, msg):
print("Hello from %s" % (msg), file=sys.stderr)
# The central instance
manager = TestlibManager()
class TestlibCase(unittest.TestCase):
def __init__(self, *args):
'''This is called for each TestCase test instance, which isn't much better
than SetUp.'''
unittest.TestCase.__init__(self, *args)
# Attach to and duplicate dicts from manager singleton
self.manager = manager
#self.manager.hello(repr(self) + repr(*args))
self.my_verbosity = self.manager.verbosity
self.lsb_release = self.manager.lsb_release
self.dpkg_arch = self.manager.dpkg_arch
self.kernel_version = self.manager.kernel_version
self.kernel_version_signature = self.manager.kernel_version_signature
self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
self.gcc_version = self.manager.gcc_version
self.path_libc = self.manager.path_libc
def version_compare(self, one, two):
return apt_pkg.VersionCompare(one,two)
def assertFileType(self, filename, filetype):
'''Checks the file type of the file specified'''
(rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
out = out.strip()
expected = 0
# Absolutely no idea why this happens on Hardy
if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
rc = 0
result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
self.assertEqual(expected, rc, result)
filetype = '^%s$' % (filetype)
result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
self.assertNotEqual(None, re.search(filetype, out), result)
def yank_commonname_from_cert(self, certfile):
'''Extract the commonName from a given PEM'''
rc, out = cmd(['openssl','asn1parse','-in',certfile])
if rc == 0:
ready = False
for line in out.splitlines():
if ready:
return line.split(':')[-1]
if ':commonName' in line:
ready = True
return socket.getfqdn()
def announce(self, text):
if self.my_verbosity:
print("(%s) " % (text), end=' ', file=sys.stdout)
sys.stdout.flush()
def make_clean(self):
rc, output = self.shell_cmd(['make','clean'])
self.assertEqual(rc, 0, output)
def get_makefile_compiler(self):
# Find potential compiler name
compiler = 'gcc'
if os.path.exists('Makefile'):
for line in open('Makefile'):
if line.startswith('CC') and '=' in line:
items = [x.strip() for x in line.split('=')]
if items[0] == 'CC':
compiler = items[1]
break
return compiler
def make_target(self, target, expected=0):
'''Compile a target and report output'''
compiler = self.get_makefile_compiler()
rc, output = self.shell_cmd(['make',target])
self.assertEqual(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
return output
# call as return testlib.skipped()
def _skipped(self, reason=""):
'''Provide a visible way to indicate that a test was skipped'''
if reason != "":
reason = ': %s' % (reason)
self.announce("skipped%s" % (reason))
return False
def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
argstr = "'" + "', '".join(args).strip() + "'"
rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
report = 'Command: ' + argstr + '\nOutput:\n' + out
return rc, report, out
def shell_cmd(self, args, stdin=None):
return cmd(args,stdin=stdin)
def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
'''Test a shell command matches a specific exit code'''
rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
result = 'Got exit code %d, expected %d\n' % (rc, expected)
self.assertEqual(expected, rc, msg + result + report)
def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
'''Test a shell command doesn't match a specific exit code'''
rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
result = 'Got (unwanted) exit code %d\n' % rc
self.assertNotEqual(unwanted, rc, msg + result + report)
def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
'''Test a shell command contains a specific output'''
rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
if not invert:
self.assertTrue(text in out, msg + result + report)
else:
self.assertFalse(text in out, msg + result + report)
def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
'''Test a shell command matches a specific output'''
rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
if not invert:
self.assertEqual(text, out, msg + result + report)
else:
self.assertNotEqual(text, out, msg + result + report)
if expected != None:
result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
self.assertEqual(rc, expected, msg + result + report)
def _word_find(self, report, content, invert=False):
'''Check for a specific string'''
if invert:
warning = 'Found "%s"\n' % content
self.assertTrue(content not in report, warning + report)
else:
warning = 'Could not find "%s"\n' % content
self.assertTrue(content in report, warning + report)
def _test_sysctl_value(self, path, expected, msg=None, exists=True):
sysctl = '/proc/sys/%s' % (path)
self.assertEqual(exists, os.path.exists(sysctl), sysctl)
value = None
if exists:
value = int(open(sysctl).read())
report = "%s is not %d: %d" % (sysctl, expected, value)
if msg:
report += " (%s)" % (msg)
self.assertEqual(value, expected, report)
return value
def set_sysctl_value(self, path, desired):
sysctl = '/proc/sys/%s' % (path)
self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
open(sysctl,'w').write(str(desired))
self._test_sysctl_value(path, desired)
def kernel_at_least(self, introduced):
return self.version_compare(self.kernel_version_ubuntu,
introduced) >= 0
def kernel_claims_cve_fixed(self, cve):
changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
if os.path.exists(changelog):
for line in gzip.open(changelog):
if cve in line and not "revert" in line and not "Revert" in line:
return True
return False
class TestGroup:
'''Create a temporary test group and remove it again in the dtor.'''
def __init__(self, group=None, lower=False):
'''Create a new group'''
self.group = None
if group:
if group_exists(group):
raise ValueError('group name already exists')
else:
while(True):
group = random_string(7,lower=lower)
if not group_exists(group):
break
assert subprocess.call(['groupadd',group]) == 0
self.group = group
g = grp.getgrnam(self.group)
self.gid = g[2]
def __del__(self):
'''Remove the created group.'''
if self.group:
rc, report = cmd(['groupdel', self.group])
assert rc == 0
class TestUser:
'''Create a temporary test user and remove it again in the dtor.'''
def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
'''Create a new user account with a random password.
By default, the login name is random, too, but can be explicitly
specified with 'login'. By default, a home directory is created, this
can be suppressed with 'home=False'.'''
self.login = None
if os.geteuid() != 0:
raise ValueError("You must be root to run this test")
if login:
if login_exists(login):
raise ValueError('login name already exists')
else:
while(True):
login = 't' + random_string(7,lower=lower)
if not login_exists(login):
break
self.salt = random_string(2)
self.password = random_string(8,lower=lower)
self.crypted = crypt.crypt(self.password, self.salt)
creation = ['useradd', '-p', self.crypted]
if home:
creation += ['-m']
if group:
creation += ['-G',group]
if uidmin:
creation += ['-K','UID_MIN=%d'%uidmin]
if shell:
creation += ['-s',shell]
creation += [login]
assert subprocess.call(creation) == 0
# Set GECOS
assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
self.login = login
p = pwd.getpwnam(self.login)
self.uid = p[2]
self.gid = p[3]
self.gecos = p[4]
self.home = p[5]
self.shell = p[6]
def __del__(self):
'''Remove the created user account.'''
if self.login:
# sanity check the login name so we don't accidentally wipe too much
if len(self.login)>3 and not '/' in self.login:
subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
rc, report = cmd(['userdel', '-f', self.login])
assert rc == 0
def add_to_group(self, group):
'''Add user to the specified group name'''
rc, report = cmd(['usermod', '-G', group, self.login])
if rc != 0:
print(report)
assert rc == 0
# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
class TimeoutFunctionException(Exception):
"""Exception to raise on a timeout"""
pass
class TimeoutFunction:
def __init__(self, function, timeout):
self.timeout = timeout
self.function = function
def handle_timeout(self, signum, frame):
raise TimeoutFunctionException()
def __call__(self, *args, **kwargs):
old = signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.timeout)
try:
result = self.function(*args, **kwargs)
finally:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
return result
def main():
print("hi")
unittest.main()