Source code for ovirtlago

#
# Copyright 2014 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
#
# Refer to the README and COPYING files for full details of the license
#
import ConfigParser
import functools
import logging
import os
import re
import shutil
import time

import nose.core
import nose.config
from ovirtsdk.infrastructure.errors import (RequestError, ConnectionError)
import lago
from lago import log_utils
from lago.utils import (LockFile, run_command, )
from lago.prefix import Prefix
from lago.workdir import Workdir

from . import (utils, merge_repos, paths, testlib, virt, )

# TODO: put it into some config
PROJECTS_LIST = ['vdsm', 'ovirt-engine', 'vdsm-jsonrpc-java', 'ioprocess', ]
LOGGER = logging.getLogger(__name__)
LogTask = functools.partial(log_utils.LogTask, logger=LOGGER)
log_task = functools.partial(log_utils.log_task, logger=LOGGER)


[docs]def _with_repo_server(func): @functools.wraps(func) def wrapper(*args, **kwargs): with utils.repo_server_context(args[0]): return func(*args, **kwargs) return wrapper
[docs]def _fix_reposync_issues(reposync_out, repo_path): """ Fix for the issue described at:: https://bugzilla.redhat.com/show_bug.cgi?id=1332441 """ LOGGER.warn( 'Due to bug https://bugzilla.redhat.com/show_bug.cgi?id=1332441 ' 'sometimes reposync fails to update some packages that have older ' 'versions already downloaded, will remove those if any and retry' ) package_regex = re.compile(r'(?P<package_name>[^:\r\s]+): \[Errno 256\]') for match in package_regex.findall(reposync_out): find_command = ['find', repo_path, '-name', match + '*', ] ret, out, _ = run_command(find_command) if ret: raise RuntimeError('Failed to execute %s' % find_command) for to_remove in out.splitlines(): if not to_remove.startswith(repo_path): LOGGER.warn('Skipping out-of-repo file %s', to_remove) continue LOGGER.info('Removing: %s', to_remove) os.unlink(to_remove)
[docs]def _sync_rpm_repository(repo_path, yum_config, repos): lock_path = os.path.join(repo_path, 'repolock') if not os.path.exists(repo_path): os.makedirs(repo_path) reposync_command = [ 'reposync', '--config=%s' % yum_config, '--download_path=%s' % repo_path, '--newest-only', '--delete', '--cachedir=%s/cache' % repo_path, ] + [ '--repoid=%s' % repo for repo in repos ] with LockFile(lock_path, timeout=180): with LogTask('Running reposync'): ret, out, _ = run_command(reposync_command) if not ret: return _fix_reposync_issues(reposync_out=out, repo_path=repo_path) with LogTask('Rerunning reposync'): ret, _, _ = run_command(reposync_command) if not ret: return LOGGER.warn( 'Failed to run reposync again, that usually means that ' 'some of the local rpms might be corrupted or the metadata ' 'invalid, cleaning caches and retrying a second time' ) shutil.rmtree('%s/cache' % repo_path) with LogTask('Rerunning reposync a last time'): ret, _, _ = run_command(reposync_command) if ret: raise RuntimeError( 'Failed to run reposync a second time, aborting' ) return
[docs]def _build_rpms(name, script, source_dir, output_dir, dists, env=None): with LogTask( 'Build %s(%s) from %s, for %s, store results in %s' % (name, script, source_dir, ', '.join(dists), output_dir), ): ret, out, err = run_command( [ script, source_dir, output_dir, ] + dists, env=env, ) if ret: LOGGER.error('%s returned with error %d', script, ret, ) LOGGER.error('Output was: \n%s', out) LOGGER.error('Errors were: \n%s', err) raise RuntimeError('%s failed, see logs' % script) return ret
[docs]def _build_vdsm_rpms(vdsm_dir, output_dir, dists): _build_rpms('vdsm', 'build_vdsm_rpms.sh', vdsm_dir, output_dir, dists)
[docs]def _build_engine_rpms(engine_dir, output_dir, dists, build_gwt=False): env = os.environ.copy() if build_gwt: env['BUILD_GWT'] = '1' else: env['BUILD_GWT'] = '0' _build_rpms( 'ovirt-engine', 'build_engine_rpms.sh', engine_dir, output_dir, dists, env )
[docs]def _build_vdsm_jsonrpc_java_rpms(source_dir, output_dir, dists): _build_rpms( 'vdsm-jsonrpc-java', 'build_vdsm-jsonrpc-java_rpms.sh', source_dir, output_dir, dists )
[docs]def _build_ioprocess_rpms(source_dir, output_dir, dists): _build_rpms( 'ioprocess', 'build_ioprocess_rpms.sh', source_dir, output_dir, dists )
[docs]def _git_revision_at(path): ret, out, _ = run_command(['git', 'rev-parse', 'HEAD'], cwd=path) if ret: return 'unknown' return out.strip()
[docs]def _activate_storage_domains(api, sds): if not sds: LOGGER.info('No storages to activate') return for sd in sds: if sd.status.get_state() != 'active': sd.activate() LOGGER.info('Started activation of storage domain %s', sd.name) else: LOGGER.info('Storage domain %s already active', sd.name) with LogTask('Waiting for the domains to become active'): for sd in sds: dc = api.datacenters.get(id=sd.get_data_center().get_id(), ) with LogTask( 'Waiting for storage domain %s to become active' % sd.name, level='debug' ): testlib.assert_true_within_long( lambda: ( dc.storagedomains.get(sd.name).status.state == 'active' ) )
[docs]def _deactivate_storage_domains(api, sds): if not sds: LOGGER.info('No storages to deactivate') return for sd in sds: if sd.status.get_state() != 'maintenance': sd.deactivate() LOGGER.info('Started deactivation of storage domain %s', sd.name) else: LOGGER.info('Storage domain %s already inactive', sd.name) with LogTask('Waiting for the domains to get into maintenance'): for sd in sds: dc = api.datacenters.get(id=sd.get_data_center().get_id()) with LogTask( 'Waiting for storage domain %s to become inactive' % sd.name, level='debug' ): testlib.assert_true_within_long( lambda: ( dc.storagedomains.get(sd.name).status.state == 'maintenance' ), )
@log_task('Deactivating all storage domains')
[docs]def _deactivate_all_storage_domains(api): for dc in api.datacenters.list(): with LogTask('Deactivating domains for datacenter %s' % dc.name): sds = dc.storagedomains.list() with LogTask('Deactivating non-master storage domains'): _deactivate_storage_domains( api, [sd for sd in sds if not sd.master], ) with LogTask('Deactivating master storage domains'): _deactivate_storage_domains( api, [sd for sd in sds if sd.master], )
[docs]def _deactivate_all_hosts(api): hosts = api.hosts.list() while hosts: host = hosts.pop() try: host.deactivate() LOGGER.info('Sent host %s to maintenance', host.name) except RequestError: LOGGER.exception('Failed to maintenance host %s', host.name) hosts.insert(0, host) for host in api.hosts.list(): with LogTask( 'Wait for %s to go into maintenance' % host.name, level='debug', ): testlib.assert_true_within_short( lambda: api.hosts.get(host.name).status.state == 'maintenance', )
[docs]def _activate_all_hosts(api): names = [host.name for host in api.hosts.list()] for name in names: try: api.hosts.get(name).activate() except RequestError: pass for name in names: testlib.assert_true_within_short( lambda: api.hosts.get(name).status.state == 'up', )
@log_task('Activating all storage domains')
[docs]def _activate_all_storage_domains(api): for dc in api.datacenters.list(): with LogTask('Activating domains for datacenter %s' % dc.name): sds = dc.storagedomains.list() with LogTask('Activating master storage domains'): _activate_storage_domains( api, [sd for sd in sds if sd.master], ) with LogTask('Activating non-master storage domains'): _activate_storage_domains( api, [sd for sd in sds if not sd.master], )
[docs]class OvirtPrefix(Prefix): VIRT_ENV_CLASS = virt.OvirtVirtEnv def __init__(self, *args, **kwargs): super(OvirtPrefix, self).__init__(*args, **kwargs) self.paths = paths.OvirtPaths(self._prefix)
[docs] def create_snapshots(self, name, restore=True): with lago.utils.RollbackContext() as rollback, \ LogTask('Create snapshots'): engine = self.virt_env.engine_vm() self._deactivate() rollback.prependDefer(self._activate) # stop engine: engine.service('ovirt-engine').stop() rollback.prependDefer(engine.get_api) rollback.prependDefer(engine.service('ovirt-engine').start) # stop VDSMs: def stop_host(host): host.service('vdsmd').stop() rollback.prependDefer(host.service('vdsmd').start) host.service('supervdsmd').stop() rollback.prependDefer(host.service('supervdsmd').start) lago.utils.invoke_in_parallel(stop_host, self.virt_env.host_vms()) super(OvirtPrefix, self).create_snapshots(name) if not restore: rollback.clear()
[docs] def revert_snapshots(self, name): super(OvirtPrefix, self).revert_snapshots(name) self._activate()
[docs] def _create_rpm_repository( self, dists, repos_path, repo_names, projects_list=None, ): if not projects_list: projects_list = PROJECTS_LIST def create_repo(dist): dist_output = self.paths.internal_repo(dist) rpm_dirs = [] project_roots = [ self.paths.build_dir(project_name) for project_name in projects_list ] rpm_dirs.extend( [ os.path.join(folder, dist) for folder in project_roots if os.path.exists(folder) ] ) rpm_dirs.extend( [ os.path.join(repos_path, name) for name in repo_names if name.endswith(dist) ], ) merge_repos.merge(dist_output, rpm_dirs) lago.utils.invoke_in_parallel(create_repo, dists)
@log_task('Create prefix internal repo')
[docs] def prepare_repo( self, rpm_repo=None, reposync_yum_config=None, skip_sync=False, ): # Detect distros from template metadata engine_dists = [self.virt_env.engine_vm().distro()] \ if self.virt_env.engine_vm() else [] vdsm_dists = list( set( [ host.distro() for host in self.virt_env.host_vms() ] ) ) all_dists = list(set(engine_dists + vdsm_dists)) repos = [] if rpm_repo and reposync_yum_config: parser = ConfigParser.SafeConfigParser() with open(reposync_yum_config) as repo_conf_fd: parser.readfp(repo_conf_fd) repos = [ repo for repo in parser.sections() if repo.split('-')[-1] in all_dists ] if not skip_sync: with LogTask( 'Syncing remote repos locally (this might take some time)' ): _sync_rpm_repository(rpm_repo, reposync_yum_config, repos) self._create_rpm_repository(all_dists, rpm_repo, repos) self.save()
@_with_repo_server
[docs] def run_test(self, path): with LogTask('Run test: %s' % os.path.basename(path)): env = os.environ.copy() env['LAGO_PREFIX'] = self.paths.prefix results_path = os.path.abspath( os.path.join( self.paths.prefix, 'nosetests-%s.xml' % os.path.basename(path), ) ) extra_args = [ '--with-xunit', '--xunit-file=%s' % results_path, '--with-tasklog-plugin', '--with-log-collector-plugin', ] class DummyStream(object): def write(self, *args): pass def writeln(self, *args): pass def flush(self): pass config = nose.config.Config( verbosity=3, env=env, plugins=nose.core.DefaultPluginManager(), stream=DummyStream(), stopOnError=True, ) addplugins = [ testlib.TaskLogNosePlugin(), testlib.LogCollectorPlugin(self), ] result = nose.core.run( argv=['testrunner', path] + extra_args, config=config, addplugins=addplugins, ) LOGGER.info('Results located at %s' % results_path) return result
@log_task('Deploy oVirt environment') @_with_repo_server
[docs] def deploy(self): return super(OvirtPrefix, self).deploy()
@_with_repo_server
[docs] def serve(self): try: while True: time.sleep(0.1) except: pass
[docs] def _create_virt_env(self): return virt.OvirtVirtEnv.from_prefix(self)
[docs] def _activate(self): with LogTask('Wait for ssh connectivity'): for vm in self.virt_env.get_vms().values(): vm.wait_for_ssh() with LogTask('Wait for engine to go online'): testlib.assert_true_within_long( lambda: self.virt_env.engine_vm().get_api() or True, allowed_exceptions=[RequestError, ConnectionError], ) api = self.virt_env.engine_vm().get_api() with LogTask('Activate hosts'): _activate_all_hosts(api) with LogTask('Activate storage domains'): _activate_all_storage_domains(api)
[docs] def _deactivate(self): api = self.virt_env.engine_vm().get_api() with LogTask('Deactivate storage domains'): _deactivate_all_storage_domains(api) with LogTask('Deactivate hosts'): _deactivate_all_hosts(api)
[docs] def start(self): super(OvirtPrefix, self).start() with LogTask('Activate'): self._activate()
[docs] def stop(self): with LogTask('Deactivate'): self._deactivate() super(OvirtPrefix, self).stop()
[docs]class OvirtWorkdir(Workdir): def __init__(self, *args, **kwargs): super(OvirtWorkdir, self).__init__(*args, **kwargs) self.prefix_class = OvirtPrefix