On 09/10/2015 07:27 PM, Aline Manera wrote:
On 01/09/2015 14:58, chandra(a)linux.vnet.ibm.com wrote:
> From: chandrureddy <chandra(a)linux.vnet.ibm.com>
>
> ---
> plugins/gingerbase/tests/Makefile.am | 49 +++++
> plugins/gingerbase/tests/run_tests.sh.in | 55 ++++++
I suppose it is identical from the on in Kimchi. We should keep on wok
and reuse on plugins.
I am fine with that approach. But do you want to consider
this can be
done as next step. Not to distract from the actual goal :-)
> plugins/gingerbase/tests/test_config.py.in | 154 ++++++++++++++++
> plugins/gingerbase/tests/test_host.py | 152 ++++++++++++++++
> plugins/gingerbase/tests/test_model.py | 280
> +++++++++++++++++++++++++++++
> plugins/gingerbase/tests/utils.py | 261
> +++++++++++++++++++++++++++
> plugins/kimchi/tests/test_host.py | 200 ---------------------
> 7 files changed, 951 insertions(+), 200 deletions(-)
> create mode 100644 plugins/gingerbase/tests/Makefile.am
> create mode 100644 plugins/gingerbase/tests/run_tests.sh.in
> create mode 100644 plugins/gingerbase/tests/test_config.py.in
> create mode 100644 plugins/gingerbase/tests/test_host.py
> create mode 100644 plugins/gingerbase/tests/test_model.py
> create mode 100644 plugins/gingerbase/tests/utils.py
> delete mode 100644 plugins/kimchi/tests/test_host.py
>
> diff --git a/plugins/gingerbase/tests/Makefile.am
> b/plugins/gingerbase/tests/Makefile.am
> new file mode 100644
> index 0000000..23e18f4
> --- /dev/null
> +++ b/plugins/gingerbase/tests/Makefile.am
> @@ -0,0 +1,49 @@
> +#
> +# Kimchi
> +#
> +# Copyright IBM Corp, 2013
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +
> +EXTRA_DIST = \
> + Makefile.am \
> + run_tests.sh.in \
> + test_config.py.in \
> + $(filter-out test_config.py, $(wildcard *.py)) \
> + $(NULL)
> +
> +noinst_SCRIPTS = run_tests.sh
> +
> +do_substitution = \
> + sed -e 's,[@]HAVE_PYMOD_UNITTEST[@],$(HAVE_PYMOD_UNITTEST),g' \
> + -e 's,[@]prefix[@],$(prefix),g' \
> + -e 's,[@]datadir[@],$(datadir),g' \
> + -e 's,[@]PYTHON_VERSION[@],$(PYTHON_VERSION),g' \
> + -e 's,[@]wokdir[@],$(pythondir)/wok,g' \
> + -e 's,[@]pkgdatadir[@],$(pkgdatadir),g'
> +
> +
> +run_tests.sh: run_tests.sh.in Makefile
> + $(do_substitution) < $(srcdir)/run_tests.sh.in > run_tests.sh
> + chmod +x run_tests.sh
> +
> +test_config.py: test_config.py.in Makefile
> + $(do_substitution) < $(srcdir)/test_config.py.in > test_config.py
> +
> +check-local:
> + ./run_tests.sh
> +
> +BUILT_SOURCES = test_config.py
> +CLEANFILES = run_tests.sh test_config.py
> diff --git a/plugins/gingerbase/tests/run_tests.sh.in
> b/plugins/gingerbase/tests/run_tests.sh.in
> new file mode 100644
> index 0000000..beef75e
> --- /dev/null
> +++ b/plugins/gingerbase/tests/run_tests.sh.in
> @@ -0,0 +1,55 @@
> +#!/bin/bash
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2013-2015
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +
> +HAVE_UNITTEST=@HAVE_PYMOD_UNITTEST@
> +PYTHON_VER=@PYTHON_VERSION@
> +
> +if [ "$1" = "-v" ]; then
> + OPTS="-v"
> + shift
> +else
> + OPTS=""
> +fi
> +
> +if [ $# -ne 0 ]; then
> + ARGS="$@"
> +else
> + ARGS=`find -name "test_*.py" | xargs -I @ basename @ .py`
> +fi
> +
> +if [ "$HAVE_UNITTEST" != "yes" -o "$PYTHON_VER" ==
"2.6" ]; then
> + CMD="unit2"
> +else
> + CMD="python -m unittest"
> +fi
> +
> +LIST=($ARGS)
> +MODEL_LIST=()
> +MOCK_LIST=()
> +for ((i=0;i<${#LIST[@]};i++)); do
> +
> + if [[ ${LIST[$i]} == test_model* ]]; then
> + MODEL_LIST+=(${LIST[$i]})
> + else
> + MOCK_LIST+=(${LIST[$i]})
> + fi
> +done
> +
> +PYTHONPATH=../plugins:../src:../ $CMD $OPTS ${MODEL_LIST[@]}
> ${MOCK_LIST[@]}
> diff --git a/plugins/gingerbase/tests/test_config.py.in
> b/plugins/gingerbase/tests/test_config.py.in
> new file mode 100644
> index 0000000..4ffe919
> --- /dev/null
> +++ b/plugins/gingerbase/tests/test_config.py.in
> @@ -0,0 +1,154 @@
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2014-2015
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +
> +import unittest
> +from cherrypy.lib.reprconf import Parser
> +from wok.config import Paths
> +
> +from wok.plugins.gingerbase.config import KimchiPaths
> +
> +
> +
> +get_prefix = None
> +
> +
> +def setUpModule():
> + global get_prefix
> + get_prefix = Paths.get_prefix
> +
> +
> +def tearDownModule():
> + Paths.get_prefix = KimchiPaths.get_prefix = get_prefix
> +
> +
> +class ConfigTests(unittest.TestCase):
> + def assertInstalledPath(self, actual, expected):
> + if '@pkgdatadir@' != '/usr/share/gingerbase':
> + usr_local = '/usr/local'
> + if not expected.startswith('/usr'):
> + expected = usr_local + expected
> + self.assertEquals(actual, expected)
> +
> + def test_installed_paths(self):
> + Paths.get_prefix = lambda self: '@datadir@/wok'
> + paths = Paths()
> + self.assertInstalledPath(paths.state_dir, '/var/lib/wok')
> + self.assertInstalledPath(paths.log_dir, '/var/log/wok')
> + self.assertInstalledPath(paths.conf_dir, '/etc/wok')
> + self.assertInstalledPath(paths.src_dir, '@wokdir@')
> + self.assertInstalledPath(paths.plugins_dir, '@wokdir@/plugins')
> + self.assertInstalledPath(paths.ui_dir, '@datadir@/wok/ui')
> + self.assertInstalledPath(paths.mo_dir, '@prefix@/share/locale')
> +
> + def test_uninstalled_paths(self):
> + Paths.get_prefix = lambda self: '/home/user/wok'
> + paths = Paths()
> + self.assertEquals(paths.state_dir, '/home/user/wok/data')
> + self.assertEquals(paths.log_dir, '/home/user/wok/log')
> + self.assertEquals(paths.conf_dir, '/home/user/wok/src')
> + self.assertEquals(paths.src_dir, '/home/user/wok/src/wok')
> + self.assertEquals(paths.plugins_dir, '/home/user/wok/plugins')
> + self.assertEquals(paths.ui_dir, '/home/user/wok/ui')
> + self.assertEquals(paths.mo_dir, '/home/user/wok/mo')
> +
> + def test_installed_plugin_paths(self):
> + KimchiPaths.get_prefix = lambda self: '@datadir@/wok'
> + paths = KimchiPaths()
> + self.assertInstalledPath(paths.conf_dir, '/etc/wok/plugins.d')
> + self.assertInstalledPath(paths.conf_file,
> + '/etc/wok/plugins.d/gingerbase.conf')
> + self.assertInstalledPath(paths.src_dir,
> '@wokdir@/plugins/ginger-base')
> + self.assertInstalledPath(paths.ui_dir,
> + '@datadir@/wok/plugins/gingerbase/ui')
> + self.assertInstalledPath(paths.mo_dir, '@prefix@/share/locale')
> +
> + def test_uninstalled_plugin_paths(self):
> + KimchiPaths.get_prefix = lambda self: '/home/user/wok'
> + paths = KimchiPaths()
> + self.assertEquals(paths.conf_dir,
> '/home/user/wok/plugins/gingerbase')
> + self.assertEquals(
> + paths.conf_file,
> '/home/user/wok/plugins/gingerbase/gingerbase.conf')
> + self.assertEquals(paths.src_dir,
> '/home/user/wok/plugins/gingerbase')
> + self.assertEquals(paths.ui_dir,
> '/home/user/wok/plugins/gingerbase/ui')
> + self.assertEquals(paths.mo_dir,
> '/home/user/wok/plugins/gingerbase/mo')
> +
> + def test_kimchi_config(self):
> + Paths.get_prefix = KimchiPaths.get_prefix = get_prefix
> + CACHEEXPIRES = 31536000
> + SESSIONSTIMEOUT = 10
> + configObj = {
> + '/': {
> + 'tools.trailing_slash.on': False,
> + 'request.methods_with_bodies': ('POST',
'PUT'),
> + 'tools.nocache.on': True,
> + 'tools.proxy.on': True,
> + 'tools.sessions.on': True,
> + 'tools.sessions.name': 'wok',
> + 'tools.sessions.secure': True,
> + 'tools.sessions.httponly': True,
> + 'tools.sessions.locking': 'explicit',
> + 'tools.sessions.storage_type': 'ram',
> + 'tools.sessions.timeout': SESSIONSTIMEOUT,
> + 'tools.wokauth.on': False
> + },
> + '/css': {
> + 'tools.staticdir.on': True,
> + 'tools.staticdir.dir': '%s/ui/css' %
> KimchiPaths().prefix,
> + 'tools.expires.on': True,
> + 'tools.expires.secs': CACHEEXPIRES,
> + 'tools.nocache.on': False
> + },
> + '/js': {
> + 'tools.staticdir.on': True,
> + 'tools.staticdir.dir': '%s/ui/js' %
> KimchiPaths().prefix,
> + 'tools.expires.on': True,
> + 'tools.expires.secs': CACHEEXPIRES,
> + 'tools.nocache.on': False
> + },
> + '/libs': {
> + 'tools.staticdir.on': True,
> + 'tools.staticdir.dir': '%s/ui/libs' %
> KimchiPaths().prefix,
> + 'tools.expires.on': True,
> + 'tools.expires.secs': CACHEEXPIRES,
> + 'tools.nocache.on': False,
> + },
> + '/images': {
> + 'tools.staticdir.on': True,
> + 'tools.staticdir.dir': '%s/ui/images' %
> KimchiPaths().prefix,
> + 'tools.nocache.on': False
> + },
> + '/favicon.ico': {
> + 'tools.staticfile.on': True,
> + 'tools.staticfile.filename':
> + '%s/images/logo.ico' % KimchiPaths().ui_dir
> + },
> + '/robots.txt': {
> + 'tools.staticfile.on': True,
> + 'tools.staticfile.filename': '%s/robots.txt' %
> KimchiPaths().ui_dir
> + },
> + '/help': {
> + 'tools.staticdir.on': True,
> + 'tools.staticdir.dir': '%s/ui/pages/help' %
> KimchiPaths().prefix,
> + 'tools.staticdir.index': 'en_US/index.html',
> + 'tools.nocache.on': True
> + }
> + }
> +
> + kimchi_config =
> Parser().dict_from_file(KimchiPaths().conf_file)
> + self.assertEquals(kimchi_config, configObj)
> diff --git a/plugins/gingerbase/tests/test_host.py
> b/plugins/gingerbase/tests/test_host.py
> new file mode 100644
> index 0000000..78765c0
> --- /dev/null
> +++ b/plugins/gingerbase/tests/test_host.py
> @@ -0,0 +1,152 @@
> +# -*- coding: utf-8 -*-
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2015
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +
> +import json
> +import os
> +import platform
> +import psutil
> +import tempfile
> +import time
> +import unittest
> +from functools import partial
> +
> +from wok.plugins.gingerbase.mockmodel import MockModel
> +from utils import get_free_port, patch_auth, request, run_server,
> wait_task
> +
> +
> +test_server = None
> +model = None
> +host = None
> +ssl_port = None
> +tmpfile = None
> +
> +
> +def setUpModule():
> + global test_server, model, host, ssl_port, tmpfile
> +
> + patch_auth()
> + tmpfile = tempfile.mktemp()
> + model = MockModel(tmpfile)
> + host = '127.0.0.1'
> + port = get_free_port('http')
> + ssl_port = get_free_port('https')
> + cherrypy_port = get_free_port('cherrypy_port')
> + test_server = run_server(host, port, ssl_port, test_mode=True,
> + cherrypy_port=cherrypy_port, model=model)
> +
> +
> +def tearDownModule():
> + test_server.stop()
> + os.unlink(tmpfile)
> +
> +
> +class HostTests(unittest.TestCase):
> + def setUp(self):
> + self.request = partial(request, host, ssl_port)
> +
> + def test_hostinfo(self):
> + resp = self.request('/plugins/gingerbase/host').read()
> + info = json.loads(resp)
> + keys = ['os_distro', 'os_version', 'os_codename',
'cpu_model',
> + 'memory', 'cpus']
> + self.assertEquals(sorted(keys), sorted(info.keys()))
> +
> + distro, version, codename = platform.linux_distribution()
> + self.assertEquals(distro, info['os_distro'])
> + self.assertEquals(version, info['os_version'])
> + self.assertEquals(unicode(codename, "utf-8"),
> info['os_codename'])
> + self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
> +
> + def test_hoststats(self):
> + time.sleep(1)
> + stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
> + 'disk_write_rate', 'net_recv_rate',
> 'net_sent_rate']
> + resp = self.request('/plugins/gingerbase/host/stats').read()
> + stats = json.loads(resp)
> + self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
> +
> + cpu_utilization = stats['cpu_utilization']
> + self.assertIsInstance(cpu_utilization, float)
> + self.assertGreaterEqual(cpu_utilization, 0.0)
> + self.assertTrue(cpu_utilization <= 100.0)
> +
> + memory_stats = stats['memory']
> + self.assertIn('total', memory_stats)
> + self.assertIn('free', memory_stats)
> + self.assertIn('cached', memory_stats)
> + self.assertIn('buffers', memory_stats)
> + self.assertIn('avail', memory_stats)
> +
> + resp =
> self.request('/plugins/gingerbase/host/stats/history').read()
> + history = json.loads(resp)
> + self.assertEquals(sorted(stats_keys), sorted(history.keys()))
> +
> + def test_host_actions(self):
> + def _task_lookup(taskid):
> + return
> json.loads(self.request('/plugins/gingerbase/tasks/%s' %
> + taskid).read())
> +
> + resp = self.request('/plugins/gingerbase/host/shutdown',
> '{}', 'POST')
> + self.assertEquals(200, resp.status)
> + resp = self.request('/plugins/gingerbase/host/reboot', '{}',
> 'POST')
> + self.assertEquals(200, resp.status)
> +
> + # Test system update
> + resp =
> self.request('/plugins/gingerbase/host/packagesupdate', None, 'GET')
> + pkgs = json.loads(resp.read())
> + self.assertEquals(3, len(pkgs))
> +
> + pkg_keys = ['package_name', 'repository', 'arch',
'version']
> + for p in pkgs:
> + name = p['package_name']
> + resp =
> self.request('/plugins/gingerbase/host/packagesupdate/' + name,
> + None, 'GET')
> + info = json.loads(resp.read())
> + self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
> +
> + resp = self.request('/plugins/gingerbase/host/swupdate',
> '{}', 'POST')
> + task = json.loads(resp.read())
> + task_params = [u'id', u'message', u'status',
u'target_uri']
> + self.assertEquals(sorted(task_params), sorted(task.keys()))
> +
> + resp = self.request('/plugins/gingerbase/tasks/' +
> task[u'id'], None, 'GET')
> + task_info = json.loads(resp.read())
> + self.assertEquals(task_info['status'], 'running')
> + wait_task(_task_lookup, task_info['id'])
> + resp = self.request('/plugins/gingerbase/tasks/' +
> task[u'id'], None, 'GET')
> + task_info = json.loads(resp.read())
> + self.assertEquals(task_info['status'], 'finished')
> + self.assertIn(u'All packages updated',
task_info['message'])
> + pkgs = model.packagesupdate_get_list()
> + self.assertEquals(0, len(pkgs))
> +
> + def test_host_partitions(self):
> + resp = self.request('/plugins/gingerbase/host/partitions')
> + self.assertEquals(200, resp.status)
> + partitions = json.loads(resp.read())
> +
> + keys = ['name', 'path', 'type', 'fstype',
'size', 'mountpoint',
> + 'available']
> + for item in partitions:
> + resp =
> self.request('/plugins/gingerbase/host/partitions/%s' %
> + item['name'])
> + info = json.loads(resp.read())
> + self.assertEquals(sorted(info.keys()), sorted(keys))
> +
> diff --git a/plugins/gingerbase/tests/test_model.py
> b/plugins/gingerbase/tests/test_model.py
> new file mode 100644
> index 0000000..d471ce5
> --- /dev/null
> +++ b/plugins/gingerbase/tests/test_model.py
> @@ -0,0 +1,280 @@
> +# -*- coding: utf-8 -*-
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2013-2015
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +
> +import os
> +import shutil
> +import unittest
> +
> +import wok.objectstore
> +from wok.basemodel import Singleton
> +from wok.exception import InvalidParameter, NotFoundError,
> OperationFailed
> +from wok.rollbackcontext import RollbackContext
> +from wok.plugins.gingerbase.model import model
> +
> +# import iso_gen
> +# import utils
> +
> +
> +invalid_repository_urls = ['www.fedora.org', # missing protocol
> + '://www.fedora.org', # missing protocol
> + 'http://www.fedora', # invalid domain
> name
> + 'file:///home/foobar'] # invalid path
> +
> +TMP_DIR = '/var/lib/kimchi/tests/'
> +# UBUNTU_ISO = TMP_DIR + 'ubuntu14.04.iso'
> +
> +
> +def setUpModule():
> + if not os.path.exists(TMP_DIR):
> + os.makedirs(TMP_DIR)
> +
> + # iso_gen.construct_fake_iso(UBUNTU_ISO, True, '14.04',
'ubuntu')
> +
> + # Some FeatureTests functions depend on server to validate their
> result.
> + # As CapabilitiesModel is a Singleton class it will get the
> first result
> + # from FeatureTests which may be wrong when using the Model
> instance
> + # directly - the case of this test_model.py
> + # So clean Singleton instances to make sure to get the right
> result when
> + # running the following tests.
> + Singleton._instances = {}
> +
> +
> +def tearDownModule():
> + shutil.rmtree(TMP_DIR)
> +
> +
> +class ModelTests(unittest.TestCase):
> + def setUp(self):
> + self.tmp_store = '/tmp/kimchi-store-test'
> +
> + def tearDown(self):
> + # FIXME: Tests using 'test:///default' URI should be moved to
> + # test_rest or test_mockmodel to avoid overriding problems
> + # LibvirtConnection._connections['test:///default'] = {}
> +
> + os.unlink(self.tmp_store)
> +
> + def test_repository_create(self):
> + inst = model.Model(objstore_loc=self.tmp_store)
> +
> + yum_repos = [{'repo_id': 'fedora-fake',
> + 'baseurl': 'http://www.fedora.org'},
> + {'repo_id': 'fedora-updates-fake',
> + 'config':
> + {'mirrorlist':
'http://www.fedoraproject.org'}}]
> +
> + deb_repos = [{'baseurl':
'http://archive.ubuntu.com/ubuntu/',
> + 'config': {'dist': 'quantal'}},
> + {'baseurl':
'http://archive.ubuntu.com/ubuntu/',
> + 'config': {'dist': 'quantal',
'comps':
> ['main']}}]
> +
> + yum_invalid_repos = []
> + deb_invalid_repos = []
> +
> + for url in invalid_repository_urls:
> + wrong_baseurl = {'repo_id': 'wrong-id',
'baseurl': url}
> + wrong_mirrorlist = {'repo_id': 'wrong-id',
> + 'baseurl': 'www.example.com',
> + 'config': {'mirrorlist': url}}
> + wrong_config_item = {
> + 'repo_id': 'wrong-id',
> + 'baseurl': 'www.example.com',
> + 'config': {
> + 'gpgkey':
> 'file:///tmp/KEY-fedora-updates-fake-19'}}
> +
> + yum_invalid_repos.append(wrong_baseurl)
> + yum_invalid_repos.append(wrong_mirrorlist)
> + yum_invalid_repos.append(wrong_config_item)
> +
> + wrong_baseurl['config'] = {'dist': 'tasty'}
> + wrong_config = {'baseurl': deb_repos[0]['baseurl'],
> + 'config': {
> + 'unsupported_item':
> "a_unsupported_item"}}
> + deb_invalid_repos.append(wrong_baseurl)
> + deb_invalid_repos.append(wrong_config)
> +
> + repo_type = inst.capabilities_lookup()['repo_mngt_tool']
> + if repo_type == 'yum':
> + test_repos = yum_repos
> + invalid_repos = yum_invalid_repos
> + elif repo_type == 'deb':
> + test_repos = deb_repos
> + invalid_repos = deb_invalid_repos
> + else:
> + # repository management tool was not recognized by Kimchi
> + # skip test case
> + return
> +
> + # create repositories with invalid data
> + for repo in invalid_repos:
> + self.assertRaises(InvalidParameter,
> inst.repositories_create, repo)
> +
> + for repo in test_repos:
> + system_host_repos = len(inst.repositories_get_list())
> + repo_id = inst.repositories_create(repo)
> + host_repos = inst.repositories_get_list()
> + self.assertEquals(system_host_repos + 1, len(host_repos))
> +
> + repo_info = inst.repository_lookup(repo_id)
> + self.assertEquals(repo_id, repo_info['repo_id'])
> + self.assertEquals(True, repo_info.get('enabled'))
> + self.assertEquals(repo.get('baseurl', ''),
> + repo_info.get('baseurl'))
> +
> + original_config = repo.get('config', {})
> + config_info = repo_info.get('config', {})
> +
> + if repo_type == 'yum':
> + self.assertEquals(original_config.get('mirrorlist', ''),
> + config_info.get('mirrorlist',
''))
> + self.assertEquals(True, config_info['gpgcheck'])
> + else:
> + self.assertEquals(original_config['dist'],
> config_info['dist'])
> + self.assertEquals(original_config.get('comps', []),
> + config_info.get('comps', []))
> +
> + inst.repository_delete(repo_id)
> + self.assertRaises(NotFoundError, inst.repository_lookup,
> repo_id)
> +
> + self.assertRaises(NotFoundError, inst.repository_lookup,
> 'google')
> +
> + def test_repository_update(self):
> + inst = model.Model(objstore_loc=self.tmp_store)
> +
> + yum_repo = {'repo_id': 'fedora-fake',
> + 'baseurl': 'http://www.fedora.org'}
> + yum_new_repo = {'baseurl': 'http://www.fedoraproject.org'}
> +
> + deb_repo = {'baseurl': 'http://archive.ubuntu.com/ubuntu/',
> + 'config': {'dist': 'quantal'}}
> + deb_new_repo = {'baseurl':
> 'http://br.archive.canonical.com/ubuntu/',
> + 'config': {'dist': 'utopic'}}
> +
> + yum_invalid_repos = []
> + deb_invalid_repos = []
> +
> + for url in invalid_repository_urls:
> + wrong_baseurl = {'baseurl': url}
> + wrong_mirrorlist = {'baseurl': 'www.example.com',
> + 'config': {'mirrorlist': url}}
> +
> + yum_invalid_repos.append(wrong_baseurl)
> + yum_invalid_repos.append(wrong_mirrorlist)
> +
> + wrong_baseurl['config'] = {'dist': 'tasty'}
> + deb_invalid_repos.append(wrong_baseurl)
> +
> + repo_type = inst.capabilities_lookup()['repo_mngt_tool']
> + if repo_type == 'yum':
> + repo = yum_repo
> + new_repo = yum_new_repo
> + invalid_repos = yum_invalid_repos
> + elif repo_type == 'deb':
> + repo = deb_repo
> + new_repo = deb_new_repo
> + invalid_repos = deb_invalid_repos
> + else:
> + # repository management tool was not recognized by Kimchi
> + # skip test case
> + return
> +
> + system_host_repos = len(inst.repositories_get_list())
> +
> + with RollbackContext() as rollback:
> + repo_id = inst.repositories_create(repo)
> + rollback.prependDefer(inst.repository_delete, repo_id)
> +
> + host_repos = inst.repositories_get_list()
> + self.assertEquals(system_host_repos + 1, len(host_repos))
> +
> + # update repositories with invalid data
> + for tmp_repo in invalid_repos:
> + self.assertRaises(InvalidParameter,
> inst.repository_update,
> + repo_id, tmp_repo)
> +
> + new_repo_id = inst.repository_update(repo_id, new_repo)
> + repo_info = inst.repository_lookup(new_repo_id)
> +
> + self.assertEquals(new_repo_id, repo_info['repo_id'])
> + self.assertEquals(new_repo['baseurl'],
> repo_info['baseurl'])
> + self.assertEquals(True, repo_info['enabled'])
> + inst.repository_update(new_repo_id, repo)
> +
> + def test_repository_disable_enable(self):
> + inst = model.Model(objstore_loc=self.tmp_store)
> +
> + yum_repo = {'repo_id': 'fedora-fake',
> + 'baseurl': 'http://www.fedora.org'}
> + deb_repo = {'baseurl': 'http://archive.ubuntu.com/ubuntu/',
> + 'config': {'dist': 'quantal'}}
> +
> + repo_type = inst.capabilities_lookup()['repo_mngt_tool']
> + if repo_type == 'yum':
> + repo = yum_repo
> + elif repo_type == 'deb':
> + repo = deb_repo
> + else:
> + # repository management tool was not recognized by Kimchi
> + # skip test case
> + return
> +
> + system_host_repos = len(inst.repositories_get_list())
> +
> + repo_id = inst.repositories_create(repo)
> +
> + host_repos = inst.repositories_get_list()
> + self.assertEquals(system_host_repos + 1, len(host_repos))
> +
> + repo_info = inst.repository_lookup(repo_id)
> + self.assertEquals(True, repo_info['enabled'])
> +
> + inst.repository_disable(repo_id)
> + repo_info = inst.repository_lookup(repo_id)
> + self.assertEquals(False, repo_info['enabled'])
> +
> + inst.repository_enable(repo_id)
> + repo_info = inst.repository_lookup(repo_id)
> + self.assertEquals(True, repo_info['enabled'])
> +
> + # remove files creates
> + inst.repository_delete(repo_id)
> +
> +
> +class BaseModelTests(unittest.TestCase):
> + class FoosModel(object):
> + def __init__(self):
> + self.data = {}
> +
> + def create(self, params):
> + self.data.update(params)
> +
> + def get_list(self):
> + return list(self.data)
> +
> + class TestModel(wok.basemodel.BaseModel):
> + def __init__(self):
> + foo = BaseModelTests.FoosModel()
> + super(BaseModelTests.TestModel, self).__init__([foo])
> +
> + def test_root_model(self):
> + t = BaseModelTests.TestModel()
> + t.foos_create({'item1': 10})
> + self.assertEquals(t.foos_get_list(), ['item1'])
> +
> diff --git a/plugins/gingerbase/tests/utils.py
> b/plugins/gingerbase/tests/utils.py
> new file mode 100644
> index 0000000..986d91b
> --- /dev/null
> +++ b/plugins/gingerbase/tests/utils.py
> @@ -0,0 +1,261 @@
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2013-2015
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> +#
> +
> +import base64
> +import cherrypy
> +import grp
> +import httplib
> +import inspect
> +import json
> +import os
> +import socket
> +import ssl
> +import sys
> +import threading
> +import time
> +import unittest
> +from contextlib import closing
> +from lxml import etree
> +
> +import wok.server
> +from wok.config import config, PluginPaths
> +from wok.auth import User, USER_NAME, USER_GROUPS, USER_ROLES, tabs
> +from wok.exception import NotFoundError, OperationFailed
> +from wok.utils import wok_log
> +
> +from wok.plugins.gingerbase import mockmodel
> +
> +
> +_ports = {}
> +
> +# provide missing unittest decorators and API for python 2.6; these
> decorators
> +# do not actually work, just avoid the syntax failure
> +if sys.version_info[:2] == (2, 6):
> + def skipUnless(condition, reason):
> + if not condition:
> + sys.stderr.write('[expected failure] ')
> + raise Exception(reason)
> + return lambda obj: obj
> +
> + unittest.skipUnless = skipUnless
> + unittest.expectedFailure = lambda obj: obj
> +
> + def assertGreater(self, a, b, msg=None):
> + if not a > b:
> + self.fail('%s not greater than %s' % (repr(a), repr(b)))
> +
> + def assertGreaterEqual(self, a, b, msg=None):
> + if not a >= b:
> + self.fail('%s not greater than or equal to %s'
> + % (repr(a), repr(b)))
> +
> + def assertIsInstance(self, obj, cls, msg=None):
> + if not isinstance(obj, cls):
> + self.fail('%s is not an instance of %r' % (repr(obj), cls))
> +
> + def assertIn(self, a, b, msg=None):
> + if a not in b:
> + self.fail("%s is not in %b" % (repr(a), repr(b)))
> +
> + def assertNotIn(self, a, b, msg=None):
> + if a in b:
> + self.fail("%s is in %b" % (repr(a), repr(b)))
> +
> + unittest.TestCase.assertGreaterEqual = assertGreaterEqual
> + unittest.TestCase.assertGreater = assertGreater
> + unittest.TestCase.assertIsInstance = assertIsInstance
> + unittest.TestCase.assertIn = assertIn
> + unittest.TestCase.assertNotIn = assertNotIn
> +
> +
> +def get_free_port(name='http'):
> + global _ports
> + if _ports.get(name) is not None:
> + return _ports[name]
> + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> + with closing(sock):
> + try:
> + sock.bind(("0.0.0.0", 0))
> + except:
> + raise Exception("Could not find a free port")
> + _ports[name] = sock.getsockname()[1]
> + return _ports[name]
> +
> +
> +def run_server(host, port, ssl_port, test_mode, cherrypy_port=None,
> + model=None, environment='development'):
> +
> + if cherrypy_port is None:
> + cherrypy_port = get_free_port('cherrypy_port')
> +
> + if ssl_port is None:
> + ssl_port = get_free_port('https')
> +
> + args = type('_', (object,),
> + {'host': host, 'port': port, 'ssl_port':
ssl_port,
> + 'cherrypy_port': cherrypy_port, 'max_body_size':
> '4*1024',
> + 'ssl_cert': '', 'ssl_key': '',
> + 'test': test_mode, 'access_log':
'/dev/null',
> + 'error_log': '/dev/null', 'environment':
environment,
> + 'log_level': 'debug'})()
> + if model is not None:
> + setattr(args, 'model', model)
> +
> + s = wok.server.Server(args)
> + t = threading.Thread(target=s.start)
> + t.setDaemon(True)
> + t.start()
> + cherrypy.engine.wait(cherrypy.engine.states.STARTED)
> + return s
> +
> +
> +def silence_server():
> + """
> + Silence server status messages on stdout
> + """
> + cherrypy.config.update({"environment": "embedded"})
> +
> +
> +def running_as_root():
> + return os.geteuid() == 0
> +
> +
> +def _request(conn, path, data, method, headers):
> + if headers is None:
> + headers = {'Content-Type': 'application/json',
> + 'Accept': 'application/json'}
> + if 'AUTHORIZATION' not in headers.keys():
> + user, pw = mockmodel.fake_user.items()[0]
> + hdr = "Basic " + base64.b64encode("%s:%s" % (user, pw))
> + headers['AUTHORIZATION'] = hdr
> + conn.request(method, path, data, headers)
> + return conn.getresponse()
> +
> +
> +def request(host, port, path, data=None, method='GET', headers=None):
> + # verify if HTTPSConnection has context parameter
> + if "context" in
> inspect.getargspec(httplib.HTTPSConnection.__init__).args:
> + context = ssl._create_unverified_context()
> + conn = httplib.HTTPSConnection(host, port, context=context)
> + else:
> + conn = httplib.HTTPSConnection(host, port)
> +
> + return _request(conn, path, data, method, headers)
> +
> +
> +def get_remote_iso_path():
> + """
> + Get a remote iso with the right arch from the distro files shipped
> + with kimchi.
> + """
> + host_arch = os.uname()[4]
> + remote_path = ''
> + with open(os.path.join(PluginPaths('kimchi').conf_dir,
> 'distros.d', 'fedora.json')) \
> + as fedora_isos:
> + # Get a list of dicts
> + json_isos_list = json.load(fedora_isos)
> + for iso in json_isos_list:
> + if (iso.get('os_arch')) == host_arch:
> + remote_path = iso.get('path')
> + break
> +
> + return remote_path
> +
> +
> +class FakeUser(User):
> + auth_type = "fake"
> + sudo = True
> +
> + def __init__(self, username):
> + self.user = {}
> + self.user[USER_NAME] = username
> + self.user[USER_GROUPS] = None
> + self.user[USER_ROLES] = dict.fromkeys(tabs, 'user')
> +
> + def get_groups(self):
> + return sorted([group.gr_name for group in grp.getgrall()])[0:3]
> +
> + def get_roles(self):
> + if self.sudo:
> + self.user[USER_ROLES] = dict.fromkeys(tabs, 'admin')
> + return self.user[USER_ROLES]
> +
> + def get_user(self):
> + return self.user
> +
> + @staticmethod
> + def authenticate(username, password, service="passwd"):
> + try:
> + return mockmodel.fake_user[username] == password
> + except KeyError, e:
> + raise OperationFailed("GGBAUTH0001E", {'username':
> 'username',
> + 'code': e.message})
> +
> +
> +def patch_auth(sudo=True):
> + """
> + Override the authenticate function with a simple test against an
> + internal dict of users and passwords.
> + """
> + config.set("authentication", "method", "fake")
> + FakeUser.sudo = sudo
> +
> +
> +def normalize_xml(xml_str):
> + return etree.tostring(etree.fromstring(xml_str,
> + etree.XMLParser(remove_blank_text=True)))
> +
> +
> +def wait_task(task_lookup, taskid, timeout=10):
> + for i in range(0, timeout):
> + task_info = task_lookup(taskid)
> + if task_info['status'] == "running":
> + wok_log.info("Waiting task %s, message: %s",
> + taskid, task_info['message'])
> + time.sleep(1)
> + else:
> + return
> + wok_log.error("Timeout while process long-run task, "
> + "try to increase timeout value.")
> +
> +
> +# The action functions in model backend raise NotFoundError
> exception if the
> +# element is not found. But in some tests, these functions are
> called after
> +# the element has been deleted if test finishes correctly, then
> NofFoundError
> +# exception is raised and rollback breaks. To avoid it, this wrapper
> ignores
> +# the NotFoundError.
> +def rollback_wrapper(func, resource, *args):
> + try:
> + func(resource, *args)
> + except NotFoundError:
> + # VM has been deleted already
> + return
> +
> +
> +# This function is used to test storage volume upload.
> +# If we use self.request, we may encode multipart formdata by ourselves
> +# requests lib take care of encode part, so use this lib instead
> +def fake_auth_header():
> + headers = {'Accept': 'application/json'}
> + user, pw = mockmodel.fake_user.items()[0]
> + hdr = "Basic " + base64.b64encode("%s:%s" % (user, pw))
> + headers['AUTHORIZATION'] = hdr
> + return headers
> +
> diff --git a/plugins/kimchi/tests/test_host.py
> b/plugins/kimchi/tests/test_host.py
> deleted file mode 100644
> index e2aa196..0000000
> --- a/plugins/kimchi/tests/test_host.py
> +++ /dev/null
> @@ -1,200 +0,0 @@
> -# -*- coding: utf-8 -*-
> -#
> -# Project Kimchi
> -#
> -# Copyright IBM, Corp. 2015
> -#
> -# This library is free software; you can redistribute it and/or
> -# modify it under the terms of the GNU Lesser General Public
> -# License as published by the Free Software Foundation; either
> -# version 2.1 of the License, or (at your option) any later version.
> -#
> -# This library is distributed in the hope that it will be useful,
> -# but WITHOUT ANY WARRANTY; without even the implied warranty of
> -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> -# Lesser General Public License for more details.
> -#
> -# You should have received a copy of the GNU Lesser General Public
> -# License along with this library; if not, write to the Free Software
> -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301 USA
> -
> -import json
> -import os
> -import platform
> -import psutil
> -import tempfile
> -import time
> -import unittest
> -from functools import partial
> -
> -from wok.plugins.kimchi.mockmodel import MockModel
> -
> -from utils import get_free_port, patch_auth, request, run_server,
> wait_task
> -
> -
> -test_server = None
> -model = None
> -host = None
> -ssl_port = None
> -tmpfile = None
> -
> -
> -def setUpModule():
> - global test_server, model, host, ssl_port, tmpfile
> -
> - patch_auth()
> - tmpfile = tempfile.mktemp()
> - model = MockModel(tmpfile)
> - host = '127.0.0.1'
> - port = get_free_port('http')
> - ssl_port = get_free_port('https')
> - cherrypy_port = get_free_port('cherrypy_port')
> - test_server = run_server(host, port, ssl_port, test_mode=True,
> - cherrypy_port=cherrypy_port, model=model)
> -
> -
> -def tearDownModule():
> - test_server.stop()
> - os.unlink(tmpfile)
> -
> -
> -class HostTests(unittest.TestCase):
> - def setUp(self):
> - self.request = partial(request, host, ssl_port)
> -
> - def test_hostinfo(self):
> - resp = self.request('/plugins/kimchi/host').read()
> - info = json.loads(resp)
> - keys = ['os_distro', 'os_version', 'os_codename',
'cpu_model',
> - 'memory', 'cpus']
> - self.assertEquals(sorted(keys), sorted(info.keys()))
> -
> - distro, version, codename = platform.linux_distribution()
> - self.assertEquals(distro, info['os_distro'])
> - self.assertEquals(version, info['os_version'])
> - self.assertEquals(unicode(codename, "utf-8"),
> info['os_codename'])
> - self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
> -
> - def test_hoststats(self):
> - time.sleep(1)
> - stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
> - 'disk_write_rate', 'net_recv_rate',
> 'net_sent_rate']
> - resp = self.request('/plugins/kimchi/host/stats').read()
> - stats = json.loads(resp)
> - self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
> -
> - cpu_utilization = stats['cpu_utilization']
> - self.assertIsInstance(cpu_utilization, float)
> - self.assertGreaterEqual(cpu_utilization, 0.0)
> - self.assertTrue(cpu_utilization <= 100.0)
> -
> - memory_stats = stats['memory']
> - self.assertIn('total', memory_stats)
> - self.assertIn('free', memory_stats)
> - self.assertIn('cached', memory_stats)
> - self.assertIn('buffers', memory_stats)
> - self.assertIn('avail', memory_stats)
> -
> - resp =
> self.request('/plugins/kimchi/host/stats/history').read()
> - history = json.loads(resp)
> - self.assertEquals(sorted(stats_keys), sorted(history.keys()))
> -
> - def test_host_actions(self):
> - def _task_lookup(taskid):
> - return json.loads(self.request('/plugins/kimchi/tasks/%s' %
> - taskid).read())
> -
> - resp = self.request('/plugins/kimchi/host/shutdown', '{}',
> 'POST')
> - self.assertEquals(200, resp.status)
> - resp = self.request('/plugins/kimchi/host/reboot', '{}',
> 'POST')
> - self.assertEquals(200, resp.status)
> -
> - # Test system update
> - resp = self.request('/plugins/kimchi/host/packagesupdate',
> None, 'GET')
> - pkgs = json.loads(resp.read())
> - self.assertEquals(3, len(pkgs))
> -
> - pkg_keys = ['package_name', 'repository', 'arch',
'version']
> - for p in pkgs:
> - name = p['package_name']
> - resp =
> self.request('/plugins/kimchi/host/packagesupdate/' + name,
> - None, 'GET')
> - info = json.loads(resp.read())
> - self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
> -
> - resp = self.request('/plugins/kimchi/host/swupdate', '{}',
> 'POST')
> - task = json.loads(resp.read())
> - task_params = [u'id', u'message', u'status',
u'target_uri']
> - self.assertEquals(sorted(task_params), sorted(task.keys()))
> -
> - resp = self.request('/plugins/kimchi/tasks/' + task[u'id'],
> None, 'GET')
> - task_info = json.loads(resp.read())
> - self.assertEquals(task_info['status'], 'running')
> - wait_task(_task_lookup, task_info['id'])
> - resp = self.request('/plugins/kimchi/tasks/' + task[u'id'],
> None, 'GET')
> - task_info = json.loads(resp.read())
> - self.assertEquals(task_info['status'], 'finished')
> - self.assertIn(u'All packages updated',
task_info['message'])
> - pkgs = model.packagesupdate_get_list()
> - self.assertEquals(0, len(pkgs))
> -
> - def test_host_partitions(self):
> - resp = self.request('/plugins/kimchi/host/partitions')
> - self.assertEquals(200, resp.status)
> - partitions = json.loads(resp.read())
> -
> - keys = ['name', 'path', 'type', 'fstype',
'size', 'mountpoint',
> - 'available']
> - for item in partitions:
> - resp = self.request('/plugins/kimchi/host/partitions/%s' %
> - item['name'])
> - info = json.loads(resp.read())
> - self.assertEquals(sorted(info.keys()), sorted(keys))
> -
> - def test_host_devices(self):
> - def asset_devices_type(devices, dev_type):
> - for dev in devices:
> - self.assertEquals(dev['device_type'], dev_type)
> -
> - resp =
> self.request('/plugins/kimchi/host/devices?_cap=scsi_host')
> - nodedevs = json.loads(resp.read())
> - # Mockmodel brings 3 preconfigured scsi fc_host
> - self.assertEquals(3, len(nodedevs))
> -
> - nodedev = json.loads(self.request(
> - '/plugins/kimchi/host/devices/scsi_host2').read())
> - # Mockmodel generates random wwpn and wwnn
> - self.assertEquals('scsi_host2', nodedev['name'])
> - self.assertEquals('fc_host',
nodedev['adapter']['type'])
> - self.assertEquals(16, len(nodedev['adapter']['wwpn']))
> - self.assertEquals(16, len(nodedev['adapter']['wwnn']))
> -
> - devs =
> json.loads(self.request('/plugins/kimchi/host/devices').read())
> - dev_names = [dev['name'] for dev in devs]
> - for dev_type in ('pci', 'usb_device', 'scsi'):
> - resp =
> self.request('/plugins/kimchi/host/devices?_cap=%s' %
> - dev_type)
> - devsByType = json.loads(resp.read())
> - names = [dev['name'] for dev in devsByType]
> - self.assertTrue(set(names) <= set(dev_names))
> - asset_devices_type(devsByType, dev_type)
> -
> - resp =
> self.request('/plugins/kimchi/host/devices?_passthrough=true')
> - passthru_devs = [dev['name'] for dev in
> json.loads(resp.read())]
> - self.assertTrue(set(passthru_devs) <= set(dev_names))
> -
> - for dev_type in ('pci', 'usb_device', 'scsi'):
> - resp = self.request(
> - '/plugins/kimchi/host/devices?_cap=%s&_passthrough=true' %
> - dev_type)
> - filteredDevs = json.loads(resp.read())
> - filteredNames = [dev['name'] for dev in filteredDevs]
> - self.assertTrue(set(filteredNames) <= set(dev_names))
> - asset_devices_type(filteredDevs, dev_type)
> -
> - for dev in passthru_devs:
> - resp = self.request(
> - '/plugins/kimchi/host/devices?_passthrough_affected_by=%s' %
> - dev)
> - affected_devs = [dev['name'] for dev in
> json.loads(resp.read())]
> - self.assertTrue(set(affected_devs) <= set(dev_names))