Source code for ovirtlago

# Copyright 2014 Red Hat, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
# Refer to the README and COPYING files for full details of the license
import ConfigParser
import functools
import logging
import os
import re
import shutil
import time

import nose.core
import nose.config
from ovirtsdk.infrastructure.errors import (RequestError, ConnectionError)
import lago
from lago import log_utils
from lago.utils import (LockFile, run_command, )
from lago.prefix import Prefix
from lago.workdir import Workdir

from . import (utils, merge_repos, paths, testlib, virt, )

# TODO: put it into some config
PROJECTS_LIST = ['vdsm', 'ovirt-engine', 'vdsm-jsonrpc-java', 'ioprocess', ]
LOGGER = logging.getLogger(__name__)
LogTask = functools.partial(log_utils.LogTask, logger=LOGGER)
log_task = functools.partial(log_utils.log_task, logger=LOGGER)

[docs]def _with_repo_server(func): @functools.wraps(func) def wrapper(*args, **kwargs): with utils.repo_server_context(args[0]): return func(*args, **kwargs) return wrapper
[docs]def _fix_reposync_issues(reposync_out, repo_path): """ Fix for the issue described at:: """ LOGGER.warn( 'Due to bug ' 'sometimes reposync fails to update some packages that have older ' 'versions already downloaded, will remove those if any and retry' ) package_regex = re.compile(r'(?P<package_name>[^:\r\s]+): \[Errno 256\]') for match in package_regex.findall(reposync_out): find_command = ['find', repo_path, '-name', match + '*', ] ret, out, _ = run_command(find_command) if ret: raise RuntimeError('Failed to execute %s' % find_command) for to_remove in out.splitlines(): if not to_remove.startswith(repo_path): LOGGER.warn('Skipping out-of-repo file %s', to_remove) continue'Removing: %s', to_remove) os.unlink(to_remove)
[docs]def _sync_rpm_repository(repo_path, yum_config, repos): lock_path = os.path.join(repo_path, 'repolock') if not os.path.exists(repo_path): os.makedirs(repo_path) reposync_command = [ 'reposync', '--config=%s' % yum_config, '--download_path=%s' % repo_path, '--newest-only', '--delete', '--cachedir=%s/cache' % repo_path, ] + [ '--repoid=%s' % repo for repo in repos ] with LockFile(lock_path, timeout=180): with LogTask('Running reposync'): ret, out, _ = run_command(reposync_command) if not ret: return _fix_reposync_issues(reposync_out=out, repo_path=repo_path) with LogTask('Rerunning reposync'): ret, _, _ = run_command(reposync_command) if not ret: return LOGGER.warn( 'Failed to run reposync again, that usually means that ' 'some of the local rpms might be corrupted or the metadata ' 'invalid, cleaning caches and retrying a second time' ) shutil.rmtree('%s/cache' % repo_path) with LogTask('Rerunning reposync a last time'): ret, _, _ = run_command(reposync_command) if ret: raise RuntimeError( 'Failed to run reposync a second time, aborting' ) return
[docs]def _build_rpms(name, script, source_dir, output_dir, dists, env=None): with LogTask( 'Build %s(%s) from %s, for %s, store results in %s' % (name, script, source_dir, ', '.join(dists), output_dir), ): ret, out, err = run_command( [ script, source_dir, output_dir, ] + dists, env=env, ) if ret: LOGGER.error('%s returned with error %d', script, ret, ) LOGGER.error('Output was: \n%s', out) LOGGER.error('Errors were: \n%s', err) raise RuntimeError('%s failed, see logs' % script) return ret
[docs]def _build_vdsm_rpms(vdsm_dir, output_dir, dists): _build_rpms('vdsm', '', vdsm_dir, output_dir, dists)
[docs]def _build_engine_rpms(engine_dir, output_dir, dists, build_gwt=False): env = os.environ.copy() if build_gwt: env['BUILD_GWT'] = '1' else: env['BUILD_GWT'] = '0' _build_rpms( 'ovirt-engine', '', engine_dir, output_dir, dists, env )
[docs]def _build_vdsm_jsonrpc_java_rpms(source_dir, output_dir, dists): _build_rpms( 'vdsm-jsonrpc-java', '', source_dir, output_dir, dists )
[docs]def _build_ioprocess_rpms(source_dir, output_dir, dists): _build_rpms( 'ioprocess', '', source_dir, output_dir, dists )
[docs]def _git_revision_at(path): ret, out, _ = run_command(['git', 'rev-parse', 'HEAD'], cwd=path) if ret: return 'unknown' return out.strip()
[docs]def _activate_storage_domains(api, sds): if not sds:'No storages to activate') return for sd in sds: if sd.status.get_state() != 'active': sd.activate()'Started activation of storage domain %s', else:'Storage domain %s already active', with LogTask('Waiting for the domains to become active'): for sd in sds: dc = api.datacenters.get(id=sd.get_data_center().get_id(), ) with LogTask( 'Waiting for storage domain %s to become active' %, level='debug' ): testlib.assert_true_within_long( lambda: ( dc.storagedomains.get( == 'active' ) )
[docs]def _deactivate_storage_domains(api, sds): if not sds:'No storages to deactivate') return for sd in sds: if sd.status.get_state() != 'maintenance': sd.deactivate()'Started deactivation of storage domain %s', else:'Storage domain %s already inactive', with LogTask('Waiting for the domains to get into maintenance'): for sd in sds: dc = api.datacenters.get(id=sd.get_data_center().get_id()) with LogTask( 'Waiting for storage domain %s to become inactive' %, level='debug' ): testlib.assert_true_within_long( lambda: ( dc.storagedomains.get( == 'maintenance' ), )
@log_task('Deactivating all storage domains')
[docs]def _deactivate_all_storage_domains(api): for dc in api.datacenters.list(): with LogTask('Deactivating domains for datacenter %s' % sds = dc.storagedomains.list() with LogTask('Deactivating non-master storage domains'): _deactivate_storage_domains( api, [sd for sd in sds if not sd.master], ) with LogTask('Deactivating master storage domains'): _deactivate_storage_domains( api, [sd for sd in sds if sd.master], )
[docs]def _deactivate_all_hosts(api): hosts = api.hosts.list() while hosts: host = hosts.pop() try: host.deactivate()'Sent host %s to maintenance', except RequestError: LOGGER.exception('Failed to maintenance host %s', hosts.insert(0, host) for host in api.hosts.list(): with LogTask( 'Wait for %s to go into maintenance' %, level='debug', ): testlib.assert_true_within_short( lambda: api.hosts.get( == 'maintenance', )
[docs]def _activate_all_hosts(api): names = [ for host in api.hosts.list()] for name in names: try: api.hosts.get(name).activate() except RequestError: pass for name in names: testlib.assert_true_within_short( lambda: api.hosts.get(name).status.state == 'up', )
@log_task('Activating all storage domains')
[docs]def _activate_all_storage_domains(api): for dc in api.datacenters.list(): with LogTask('Activating domains for datacenter %s' % sds = dc.storagedomains.list() with LogTask('Activating master storage domains'): _activate_storage_domains( api, [sd for sd in sds if sd.master], ) with LogTask('Activating non-master storage domains'): _activate_storage_domains( api, [sd for sd in sds if not sd.master], )
[docs]class OvirtPrefix(Prefix): def __init__(self, *args, **kwargs): super(OvirtPrefix, self).__init__(*args, **kwargs) self.paths = paths.OvirtPaths(self._prefix)
[docs] def create_snapshots(self, name, restore=True): with lago.utils.RollbackContext() as rollback, \ LogTask('Create snapshots'): engine = self.virt_env.engine_vm() self._deactivate() rollback.prependDefer(self._activate) # stop engine: engine.service('ovirt-engine').stop() rollback.prependDefer(engine.get_api) rollback.prependDefer(engine.service('ovirt-engine').start) # stop VDSMs: def stop_host(host): host.service('vdsmd').stop() rollback.prependDefer(host.service('vdsmd').start) host.service('supervdsmd').stop() rollback.prependDefer(host.service('supervdsmd').start) lago.utils.invoke_in_parallel(stop_host, self.virt_env.host_vms()) super(OvirtPrefix, self).create_snapshots(name) if not restore: rollback.clear()
[docs] def revert_snapshots(self, name): super(OvirtPrefix, self).revert_snapshots(name) self._activate()
[docs] def _create_rpm_repository( self, dists, repos_path, repo_names, projects_list=None, ): if not projects_list: projects_list = PROJECTS_LIST def create_repo(dist): dist_output = self.paths.internal_repo(dist) rpm_dirs = [] project_roots = [ self.paths.build_dir(project_name) for project_name in projects_list ] rpm_dirs.extend( [ os.path.join(folder, dist) for folder in project_roots if os.path.exists(folder) ] ) rpm_dirs.extend( [ os.path.join(repos_path, name) for name in repo_names if name.endswith(dist) ], ) merge_repos.merge(dist_output, rpm_dirs) lago.utils.invoke_in_parallel(create_repo, dists)
@log_task('Create prefix internal repo')
[docs] def prepare_repo( self, rpm_repo=None, reposync_yum_config=None, skip_sync=False, ): # Detect distros from template metadata engine_dists = [self.virt_env.engine_vm().distro()] \ if self.virt_env.engine_vm() else [] vdsm_dists = list( set( [ host.distro() for host in self.virt_env.host_vms() ] ) ) all_dists = list(set(engine_dists + vdsm_dists)) repos = [] if rpm_repo and reposync_yum_config: parser = ConfigParser.SafeConfigParser() with open(reposync_yum_config) as repo_conf_fd: parser.readfp(repo_conf_fd) repos = [ repo for repo in parser.sections() if repo.split('-')[-1] in all_dists ] if not skip_sync: with LogTask( 'Syncing remote repos locally (this might take some time)' ): _sync_rpm_repository(rpm_repo, reposync_yum_config, repos) self._create_rpm_repository(all_dists, rpm_repo, repos)
[docs] def run_test(self, path): with LogTask('Run test: %s' % os.path.basename(path)): env = os.environ.copy() env['LAGO_PREFIX'] = self.paths.prefix results_path = os.path.abspath( os.path.join( self.paths.prefix, 'nosetests-%s.xml' % os.path.basename(path), ) ) extra_args = [ '--with-xunit', '--xunit-file=%s' % results_path, '--with-tasklog-plugin', '--with-log-collector-plugin', ] class DummyStream(object): def write(self, *args): pass def writeln(self, *args): pass def flush(self): pass config = nose.config.Config( verbosity=3, env=env, plugins=nose.core.DefaultPluginManager(), stream=DummyStream(), stopOnError=True, ) addplugins = [ testlib.TaskLogNosePlugin(), testlib.LogCollectorPlugin(self), ] result = argv=['testrunner', path] + extra_args, config=config, addplugins=addplugins, )'Results located at %s' % results_path) return result
@log_task('Deploy oVirt environment') @_with_repo_server
[docs] def deploy(self): return super(OvirtPrefix, self).deploy()
[docs] def serve(self): try: while True: time.sleep(0.1) except: pass
[docs] def _create_virt_env(self): return virt.OvirtVirtEnv.from_prefix(self)
[docs] def _activate(self): with LogTask('Wait for ssh connectivity'): for vm in self.virt_env.get_vms().values(): vm.wait_for_ssh() with LogTask('Wait for engine to go online'): testlib.assert_true_within_long( lambda: self.virt_env.engine_vm().get_api() or True, allowed_exceptions=[RequestError, ConnectionError], ) api = self.virt_env.engine_vm().get_api() with LogTask('Activate hosts'): _activate_all_hosts(api) with LogTask('Activate storage domains'): _activate_all_storage_domains(api)
[docs] def _deactivate(self): api = self.virt_env.engine_vm().get_api() with LogTask('Deactivate storage domains'): _deactivate_all_storage_domains(api) with LogTask('Deactivate hosts'): _deactivate_all_hosts(api)
[docs] def start(self): super(OvirtPrefix, self).start() with LogTask('Activate'): self._activate()
[docs] def stop(self): with LogTask('Deactivate'): self._deactivate() super(OvirtPrefix, self).stop()
[docs]class OvirtWorkdir(Workdir): def __init__(self, *args, **kwargs): super(OvirtWorkdir, self).__init__(*args, **kwargs) self.prefix_class = OvirtPrefix