This patch creates a new test file (test_host.py) to handle all the Host
Resource related tests.
It also updates the docs/API.md to reflect the right data set returned
for /host/partitions API.
Signed-off-by: Aline Manera <alinefm(a)linux.vnet.ibm.com>
---
docs/API.md | 1 +
src/kimchi/mockmodel.py | 8 +-
tests/test_host.py | 192 ++++++++++++++++++++++++++++++++++++++++++++++++
tests/test_mockmodel.py | 17 -----
tests/test_model.py | 73 ------------------
tests/test_rest.py | 72 ------------------
6 files changed, 200 insertions(+), 163 deletions(-)
create mode 100644 tests/test_host.py
diff --git a/docs/API.md b/docs/API.md
index 3f7925f..922ded1 100644
--- a/docs/API.md
+++ b/docs/API.md
@@ -985,6 +985,7 @@ stats history
* size: The total size of the partition, in bytes
* mountpoint: If the partition is mounted, represents the mountpoint.
Otherwise blank.
+ * available: false, if the partition is in use by system; true, otherwise.
### Collection: Devices
diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index 413ac5d..3ebddca 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -42,7 +42,7 @@ from kimchi.model.templates import LibvirtVMTemplate
from kimchi.model.users import PAMUsersModel
from kimchi.model.groups import PAMGroupsModel
from kimchi.objectstore import ObjectStore
-from kimchi.utils import add_task, get_next_clone_name
+from kimchi.utils import add_task, get_next_clone_name, kimchi_log
from kimchi.vmtemplate import VMTemplate
from kimchi.xmlutils.utils import xml_item_update
@@ -273,6 +273,12 @@ class MockModel(Model):
conn.storagePoolDefineXML(ET.tostring(root), 0)
+ def _mock_host_shutdown(self, *name):
+ kimchi_log.info("The host system will be shutted down")
+
+ def _mock_host_reboot(self, *name):
+ kimchi_log.info("The host system will be rebooted")
+
def _mock_storagevolumes_create(self, pool, params):
vol_source = ['file', 'url', 'capacity']
index_list = list(i for i in range(len(vol_source))
diff --git a/tests/test_host.py b/tests/test_host.py
new file mode 100644
index 0000000..1273457
--- /dev/null
+++ b/tests/test_host.py
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import json
+import os
+import platform
+import psutil
+import tempfile
+import time
+import unittest
+
+from functools import partial
+
+from kimchi.mockmodel import MockModel
+from utils import get_free_port, patch_auth, request, run_server, wait_task
+
+test_server = None
+model = None
+host = None
+ssl_port = None
+tmpfile = None
+
+
+def setUpModule():
+ global test_server, model, host, ssl_port, tmpfile
+
+ patch_auth()
+ tmpfile = tempfile.mktemp()
+ model = MockModel(tmpfile)
+ host = '127.0.0.1'
+ port = get_free_port('http')
+ ssl_port = get_free_port('https')
+ cherrypy_port = get_free_port('cherrypy_port')
+ test_server = run_server(host, port, ssl_port, test_mode=True,
+ cherrypy_port=cherrypy_port, model=model)
+
+
+def tearDownModule():
+ test_server.stop()
+ os.unlink(tmpfile)
+
+
+class HostTests(unittest.TestCase):
+ def setUp(self):
+ self.request = partial(request, host, ssl_port)
+
+ def test_hostinfo(self):
+ resp = self.request('/host').read()
+ info = json.loads(resp)
+ keys = ['os_distro', 'os_version', 'os_codename',
'cpu_model',
+ 'memory', 'cpus']
+ self.assertEquals(sorted(keys), sorted(info.keys()))
+
+ distro, version, codename = platform.linux_distribution()
+ self.assertEquals(distro, info['os_distro'])
+ self.assertEquals(version, info['os_version'])
+ self.assertEquals(unicode(codename, "utf-8"),
info['os_codename'])
+ self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
+
+ def test_hoststats(self):
+ time.sleep(1)
+ stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
+ 'disk_write_rate', 'net_recv_rate',
'net_sent_rate']
+ resp = self.request('/host/stats').read()
+ stats = json.loads(resp)
+ self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
+
+ cpu_utilization = stats['cpu_utilization']
+ self.assertIsInstance(cpu_utilization, float)
+ self.assertGreaterEqual(cpu_utilization, 0.0)
+ self.assertTrue(cpu_utilization <= 100.0)
+
+ memory_stats = stats['memory']
+ self.assertIn('total', memory_stats)
+ self.assertIn('free', memory_stats)
+ self.assertIn('cached', memory_stats)
+ self.assertIn('buffers', memory_stats)
+ self.assertIn('avail', memory_stats)
+
+ resp = self.request('/host/stats/history').read()
+ history = json.loads(resp)
+ self.assertEquals(sorted(stats_keys), sorted(history.keys()))
+
+ def test_host_actions(self):
+ def _task_lookup(taskid):
+ return json.loads(self.request('/tasks/%s' % taskid).read())
+
+ resp = self.request('/host/shutdown', '{}', 'POST')
+ self.assertEquals(200, resp.status)
+ resp = self.request('/host/reboot', '{}', 'POST')
+ self.assertEquals(200, resp.status)
+
+ # Test system update
+ resp = self.request('/host/packagesupdate', None, 'GET')
+ pkgs = json.loads(resp.read())
+ self.assertEquals(3, len(pkgs))
+
+ pkg_keys = ['package_name', 'repository', 'arch',
'version']
+ for p in pkgs:
+ name = p['package_name']
+ resp = self.request('/host/packagesupdate/' + name, None,
'GET')
+ info = json.loads(resp.read())
+ self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
+
+ resp = self.request('/host/swupdate', '{}', 'POST')
+ task = json.loads(resp.read())
+ task_params = [u'id', u'message', u'status',
u'target_uri']
+ self.assertEquals(sorted(task_params), sorted(task.keys()))
+
+ resp = self.request('/tasks/' + task[u'id'], None,
'GET')
+ task_info = json.loads(resp.read())
+ self.assertEquals(task_info['status'], 'running')
+ wait_task(_task_lookup, task_info['id'])
+ resp = self.request('/tasks/' + task[u'id'], None,
'GET')
+ task_info = json.loads(resp.read())
+ self.assertEquals(task_info['status'], 'finished')
+ self.assertIn(u'All packages updated', task_info['message'])
+ pkgs = model.packagesupdate_get_list()
+ self.assertEquals(0, len(pkgs))
+
+ def test_host_partitions(self):
+ resp = self.request('/host/partitions')
+ self.assertEquals(200, resp.status)
+ partitions = json.loads(resp.read())
+
+ keys = ['name', 'path', 'type', 'fstype',
'size', 'mountpoint',
+ 'available']
+ for item in partitions:
+ resp = self.request('/host/partitions/%s' % item['name'])
+ info = json.loads(resp.read())
+ self.assertEquals(sorted(info.keys()), sorted(keys))
+
+ def test_host_devices(self):
+ def asset_devices_type(devices, dev_type):
+ for dev in devices:
+ self.assertEquals(dev['device_type'], dev_type)
+
+ resp = self.request('/host/devices?_cap=scsi_host')
+ nodedevs = json.loads(resp.read())
+ # Mockmodel brings 3 preconfigured scsi fc_host
+ self.assertEquals(3, len(nodedevs))
+
+ nodedev = json.loads(self.request('/host/devices/scsi_host2').read())
+ # Mockmodel generates random wwpn and wwnn
+ self.assertEquals('scsi_host2', nodedev['name'])
+ self.assertEquals('fc_host', nodedev['adapter']['type'])
+ self.assertEquals(16, len(nodedev['adapter']['wwpn']))
+ self.assertEquals(16, len(nodedev['adapter']['wwnn']))
+
+ devs = json.loads(self.request('/host/devices').read())
+ dev_names = [dev['name'] for dev in devs]
+ for dev_type in ('pci', 'usb_device', 'scsi'):
+ resp = self.request('/host/devices?_cap=%s' % dev_type)
+ devsByType = json.loads(resp.read())
+ names = [dev['name'] for dev in devsByType]
+ self.assertTrue(set(names) <= set(dev_names))
+ asset_devices_type(devsByType, dev_type)
+
+ resp = self.request('/host/devices?_passthrough=true')
+ passthru_devs = [dev['name'] for dev in json.loads(resp.read())]
+ self.assertTrue(set(passthru_devs) <= set(dev_names))
+
+ for dev_type in ('pci', 'usb_device', 'scsi'):
+ resp = self.request('/host/devices?_cap=%s&_passthrough=true' %
+ dev_type)
+ filteredDevs = json.loads(resp.read())
+ filteredNames = [dev['name'] for dev in filteredDevs]
+ self.assertTrue(set(filteredNames) <= set(dev_names))
+ asset_devices_type(filteredDevs, dev_type)
+
+ for dev in passthru_devs:
+ resp = self.request('/host/devices?_passthrough_affected_by=%s' %
+ dev)
+ affected_devs = [dev['name'] for dev in json.loads(resp.read())]
+ self.assertTrue(set(affected_devs) <= set(dev_names))
diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
index c7e733e..aa48dd1 100644
--- a/tests/test_mockmodel.py
+++ b/tests/test_mockmodel.py
@@ -26,7 +26,6 @@ import unittest
import kimchi.mockmodel
from utils import get_free_port, patch_auth, request, run_server
-from utils import wait_task
from kimchi.osinfo import get_template_default
@@ -131,19 +130,3 @@ class MockModelTests(unittest.TestCase):
self.assertEquals(stats_keys, set(info['stats'].keys()))
self.assertEquals('vnc', info['graphics']['type'])
self.assertEquals('127.0.0.1',
info['graphics']['listen'])
-
- def test_packages_update(self):
- pkgs = model.packagesupdate_get_list()
- self.assertEquals(3, len(pkgs))
-
- for pkg_name in pkgs:
- pkgupdate = model.packageupdate_lookup(pkg_name)
- self.assertIn('package_name', pkgupdate.keys())
- self.assertIn('repository', pkgupdate.keys())
- self.assertIn('arch', pkgupdate.keys())
- self.assertIn('version', pkgupdate.keys())
-
- task = model.host_swupdate()
- task_params = [u'id', u'message', u'status',
u'target_uri']
- self.assertEquals(sorted(task_params), sorted(task.keys()))
- wait_task(model.task_lookup, task['id'])
diff --git a/tests/test_model.py b/tests/test_model.py
index c25929f..210adfd 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -20,8 +20,6 @@
import grp
import os
-import platform
-import psutil
import pwd
import re
import shutil
@@ -725,37 +723,6 @@ class ModelTests(unittest.TestCase):
self.assertIn('ipaddr', iface)
self.assertIn('netmask', iface)
- @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
- def test_get_devices(self):
- def asset_devices_type(devices, dev_type):
- for dev in devices:
- self.assertEquals(dev['device_type'], dev_type)
-
- inst = model.Model('qemu:///system',
- objstore_loc=self.tmp_store)
-
- devs = inst.devices_get_list()
-
- for dev_type in ('pci', 'usb_device', 'scsi'):
- names = inst.devices_get_list(_cap=dev_type)
- self.assertTrue(set(names) <= set(devs))
- infos = [inst.device_lookup(name) for name in names]
- asset_devices_type(infos, dev_type)
-
- passthru_devs = inst.devices_get_list(_passthrough='true')
- self.assertTrue(set(passthru_devs) <= set(devs))
-
- for dev_type in ('pci', 'usb_device', 'scsi'):
- names = inst.devices_get_list(_cap=dev_type, _passthrough='true')
- self.assertTrue(set(names) <= set(devs))
- infos = [inst.device_lookup(name) for name in names]
- asset_devices_type(infos, dev_type)
-
- for dev_name in passthru_devs:
- affected_devs = inst.devices_get_list(
- _passthrough_affected_by=dev_name)
- self.assertTrue(set(affected_devs) <= set(devs))
-
def test_async_tasks(self):
class task_except(Exception):
pass
@@ -983,46 +950,6 @@ class ModelTests(unittest.TestCase):
self.assertIn('path', distro)
@unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
- def test_get_hostinfo(self):
- inst = model.Model(None,
- objstore_loc=self.tmp_store)
- info = inst.host_lookup()
- distro, version, codename = platform.linux_distribution()
- self.assertIn('cpu_model', info)
- self.assertIn('cpus', info)
- self.assertEquals(distro, info['os_distro'])
- self.assertEquals(version, info['os_version'])
- self.assertEquals(unicode(codename, "utf-8"),
info['os_codename'])
- self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
-
- def test_get_hoststats(self):
- inst = model.Model('test:///default',
- objstore_loc=self.tmp_store)
- time.sleep(1.5)
- stats = inst.hoststats_lookup()
- stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
- 'disk_write_rate', 'net_recv_rate',
'net_sent_rate']
- self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
- cpu_utilization = stats['cpu_utilization']
- # cpu_utilization is set int 0, after first stats sample
- # the cpu_utilization is float in range [0.0, 100.0]
- self.assertIsInstance(cpu_utilization, float)
- self.assertGreaterEqual(cpu_utilization, 0.0)
- self.assertTrue(cpu_utilization <= 100.0)
-
- memory_stats = stats['memory']
- self.assertIn('total', memory_stats)
- self.assertIn('free', memory_stats)
- self.assertIn('cached', memory_stats)
- self.assertIn('buffers', memory_stats)
- self.assertIn('avail', memory_stats)
-
- history = inst.hoststatshistory_lookup()
- self.assertEquals(sorted(stats_keys), sorted(history.keys()))
- for key, value in history.iteritems():
- self.assertEquals(type(value), list)
-
- @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
def test_deep_scan(self):
inst = model.Model(None,
objstore_loc=self.tmp_store)
diff --git a/tests/test_rest.py b/tests/test_rest.py
index ee350b2..4ade722 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -93,19 +93,6 @@ class RestTests(unittest.TestCase):
resp = self.request(*args)
self.assertEquals(code, resp.status)
- def test_host_devices(self):
- resp = self.request('/host/devices?_cap=scsi_host')
- nodedevs = json.loads(resp.read())
- # Mockmodel brings 3 preconfigured scsi fc_host
- self.assertEquals(3, len(nodedevs))
-
- nodedev = json.loads(self.request('/host/devices/scsi_host2').read())
- # Mockmodel generates random wwpn and wwnn
- self.assertEquals('scsi_host2', nodedev['name'])
- self.assertEquals('fc_host', nodedev['adapter']['type'])
- self.assertEquals(16, len(nodedev['adapter']['wwpn']))
- self.assertEquals(16, len(nodedev['adapter']['wwnn']))
-
def test_get_vms(self):
vms = json.loads(self.request('/vms').read())
# test_rest.py uses MockModel() which connects to libvirt URI
@@ -1129,65 +1116,6 @@ class RestTests(unittest.TestCase):
resp = request(host, ssl_port, debugre['uri'])
self.assertEquals(200, resp.status)
- def test_host(self):
- resp = self.request('/host').read()
- info = json.loads(resp)
-
- keys = ['os_distro', 'os_version', 'os_codename',
'cpu_model',
- 'memory', 'cpus']
- self.assertEquals(sorted(keys), sorted(info.keys()))
-
- def test_hoststats(self):
- stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
- 'disk_write_rate', 'net_recv_rate',
'net_sent_rate']
- resp = self.request('/host/stats').read()
- stats = json.loads(resp)
- self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
-
- cpu_utilization = stats['cpu_utilization']
- self.assertIsInstance(cpu_utilization, float)
- self.assertGreaterEqual(cpu_utilization, 0.0)
- self.assertTrue(cpu_utilization <= 100.0)
-
- memory_stats = stats['memory']
- self.assertIn('total', memory_stats)
- self.assertIn('free', memory_stats)
- self.assertIn('cached', memory_stats)
- self.assertIn('buffers', memory_stats)
- self.assertIn('avail', memory_stats)
-
- resp = self.request('/host/stats/history').read()
- history = json.loads(resp)
- self.assertEquals(sorted(stats_keys), sorted(history.keys()))
-
- def test_packages_update(self):
- resp = self.request('/host/packagesupdate', None, 'GET')
- pkgs = json.loads(resp.read())
- self.assertEquals(3, len(pkgs))
-
- for p in pkgs:
- name = p['package_name']
- resp = self.request('/host/packagesupdate/' + name, None,
'GET')
- info = json.loads(resp.read())
- self.assertIn('package_name', info.keys())
- self.assertIn('repository', info.keys())
- self.assertIn('arch', info.keys())
- self.assertIn('version', info.keys())
-
- resp = self.request('/host/swupdate', '{}', 'POST')
- task = json.loads(resp.read())
- task_params = [u'id', u'message', u'status',
u'target_uri']
- self.assertEquals(sorted(task_params), sorted(task.keys()))
-
- resp = self.request('/tasks/' + task[u'id'], None,
'GET')
- task_info = json.loads(resp.read())
- self.assertEquals(task_info['status'], 'running')
- wait_task(self._task_lookup, task_info['id'])
- resp = self.request('/tasks/' + task[u'id'], None,
'GET')
- task_info = json.loads(resp.read())
- self.assertEquals(task_info['status'], 'finished')
- self.assertIn(u'All packages updated', task_info['message'])
-
def test_repositories(self):
def verify_repo(t, res):
for field in ('repo_id', 'enabled', 'baseurl',
'config'):
--
2.1.0