Source code for ovirtlago.testlib

#
# Copyright 2014 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
#
# Refer to the README and COPYING files for full details of the license
#
import datetime
import functools
import logging
import os
import time

import nose.plugins
from nose.plugins.skip import SkipTest

from lago import (utils as utils, log_utils as log_utils)

import ovirtlago

LOGGER = logging.getLogger(__name__)
SHORT_TIMEOUT = 3 * 60
LONG_TIMEOUT = 10 * 60

_test_prefix = None


[docs]def get_test_prefix(): global _test_prefix if _test_prefix is None: cur_workdir_path = os.environ.get('LAGO_WORKDIR_PATH', os.curdir) workdir = ovirtlago.OvirtWorkdir(path=cur_workdir_path) _test_prefix = workdir.get_prefix('current') return _test_prefix
[docs]def with_ovirt_prefix(func): @functools.wraps(func) def wrapper(*args, **kwargs): return func(get_test_prefix(), *args, **kwargs) return wrapper
[docs]def with_ovirt_api(func): @functools.wraps(func) @with_ovirt_prefix def wrapper(prefix, *args, **kwargs): return func(prefix.virt_env.engine_vm().get_api(), *args, **kwargs) return wrapper
[docs]def continue_on_failure(func): func.continue_on_failure = True return func
[docs]def _vms_capable(vms, caps): caps = set(caps) def get_vm_caps(vm): set(vm.metadata.get('ovirt-capabilities', [])) existing_caps = set() for vm in vms: existing_caps.union(get_vm_caps(vm) or []) return caps.issubset(existing_caps)
[docs]def engine_capability(caps): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): prefix = get_test_prefix() if not _vms_capable([prefix.virt_env.engine_vm()], caps): raise SkipTest() return func() return wrapper return decorator
[docs]def host_capability(caps): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): prefix = get_test_prefix() if not _vms_capable(prefix.virt_env.host_vms(), caps): raise SkipTest() return func() return wrapper return decorator
[docs]def test_sequence_gen(test_list): failure_occured = [False] for test in test_list: def wrapped_test(): if failure_occured[0]: raise SkipTest() try: return test() except SkipTest: raise except: if not getattr(test, 'continue_on_failure', False): failure_occured[0] = True raise wrapped_test.description = test.__name__ yield wrapped_test
[docs]class LogCollectorPlugin(nose.plugins.Plugin): name = 'log-collector-plugin' def __init__(self, prefix): nose.plugins.Plugin.__init__(self) self._prefix = prefix
[docs] def options(self, parser, env=None): env = env if env is not None else os.environ super(LogCollectorPlugin, self).options(parser, env)
[docs] def configure(self, options, conf): super(LogCollectorPlugin, self).configure(options, conf)
[docs] def addError(self, test, err): self._addFault(test, err)
[docs] def addFailure(self, test, err): self._addFault(test, err)
[docs] def _addFault(self, test, err): suffix = datetime.datetime.now().strftime("%Y%m%d%H%M%S") test_name = '%s-%s' % (test.id(), suffix) self._prefix.collect_artifacts(self._prefix.paths.test_logs(test_name))
[docs]class TaskLogNosePlugin(nose.plugins.Plugin): name = "tasklog-plugin" def __init__(self, *args, **kwargs): self.logger = logging.getLogger('nose') super(TaskLogNosePlugin, self).__init__(*args, **kwargs)
[docs] def options(self, parser, env): return super(TaskLogNosePlugin, self).options(parser, env)
[docs] def configure(self, options, conf): res = super(TaskLogNosePlugin, self).configure(options, conf) self.logger.handlers = logging.root.handlers return res
[docs] def startTest(self, test): log_utils.start_log_task( test.shortDescription() or str(test), logger=self.logger )
[docs] def stopTest(self, test): log_utils.end_log_task( test.shortDescription() or str(test), logger=self.logger )
[docs]def _instance_of_any(obj, cls_list): return any(True for cls in cls_list if isinstance(obj, cls))
[docs]def assert_true_within(func, timeout, allowed_exceptions=None): allowed_exceptions = allowed_exceptions or [] with utils.EggTimer(timeout) as timer: while not timer.elapsed(): try: if func(): return except Exception as exc: if _instance_of_any(exc, allowed_exceptions): continue LOGGER.exception("Unhandled exception in %s", func) raise time.sleep(3) raise AssertionError('Timed out after %s seconds' % timeout)
[docs]def assert_true_within_short(func, allowed_exceptions=None): allowed_exceptions = allowed_exceptions or [] assert_true_within( func, SHORT_TIMEOUT, allowed_exceptions=allowed_exceptions, )
[docs]def assert_true_within_long(func, allowed_exceptions=None): allowed_exceptions = allowed_exceptions or [] assert_true_within( func, LONG_TIMEOUT, allowed_exceptions=allowed_exceptions, )