[Kimchi-devel] [PATCH] Host tests

Aline Manera alinefm at linux.vnet.ibm.com
Mon Apr 20 21:14:42 UTC 2015


This patch creates a new test file (test_host.py) to handle all the Host
Resource related tests.
It also updates the docs/API.md to reflect the right data set returned
for /host/partitions API.

Signed-off-by: Aline Manera <alinefm at linux.vnet.ibm.com>
---
 docs/API.md             |   1 +
 src/kimchi/mockmodel.py |   8 +-
 tests/test_host.py      | 192 ++++++++++++++++++++++++++++++++++++++++++++++++
 tests/test_mockmodel.py |  17 -----
 tests/test_model.py     |  73 ------------------
 tests/test_rest.py      |  72 ------------------
 6 files changed, 200 insertions(+), 163 deletions(-)
 create mode 100644 tests/test_host.py

diff --git a/docs/API.md b/docs/API.md
index 3f7925f..922ded1 100644
--- a/docs/API.md
+++ b/docs/API.md
@@ -985,6 +985,7 @@ stats history
     * size: The total size of the partition, in bytes
     * mountpoint: If the partition is mounted, represents the mountpoint.
       Otherwise blank.
+    * available: false, if the partition is in use by system; true, otherwise.
 
 ### Collection: Devices
 
diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index 413ac5d..3ebddca 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -42,7 +42,7 @@ from kimchi.model.templates import LibvirtVMTemplate
 from kimchi.model.users import PAMUsersModel
 from kimchi.model.groups import PAMGroupsModel
 from kimchi.objectstore import ObjectStore
-from kimchi.utils import add_task, get_next_clone_name
+from kimchi.utils import add_task, get_next_clone_name, kimchi_log
 from kimchi.vmtemplate import VMTemplate
 from kimchi.xmlutils.utils import xml_item_update
 
@@ -273,6 +273,12 @@ class MockModel(Model):
 
         conn.storagePoolDefineXML(ET.tostring(root), 0)
 
+    def _mock_host_shutdown(self, *name):
+        kimchi_log.info("The host system will be shutted down")
+
+    def _mock_host_reboot(self, *name):
+        kimchi_log.info("The host system will be rebooted")
+
     def _mock_storagevolumes_create(self, pool, params):
         vol_source = ['file', 'url', 'capacity']
         index_list = list(i for i in range(len(vol_source))
diff --git a/tests/test_host.py b/tests/test_host.py
new file mode 100644
index 0000000..1273457
--- /dev/null
+++ b/tests/test_host.py
@@ -0,0 +1,192 @@
+# -*- coding: utf-8 -*-
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+
+import json
+import os
+import platform
+import psutil
+import tempfile
+import time
+import unittest
+
+from functools import partial
+
+from kimchi.mockmodel import MockModel
+from utils import get_free_port, patch_auth, request, run_server, wait_task
+
+test_server = None
+model = None
+host = None
+ssl_port = None
+tmpfile = None
+
+
+def setUpModule():
+    global test_server, model, host, ssl_port, tmpfile
+
+    patch_auth()
+    tmpfile = tempfile.mktemp()
+    model = MockModel(tmpfile)
+    host = '127.0.0.1'
+    port = get_free_port('http')
+    ssl_port = get_free_port('https')
+    cherrypy_port = get_free_port('cherrypy_port')
+    test_server = run_server(host, port, ssl_port, test_mode=True,
+                             cherrypy_port=cherrypy_port, model=model)
+
+
+def tearDownModule():
+    test_server.stop()
+    os.unlink(tmpfile)
+
+
+class HostTests(unittest.TestCase):
+    def setUp(self):
+        self.request = partial(request, host, ssl_port)
+
+    def test_hostinfo(self):
+        resp = self.request('/host').read()
+        info = json.loads(resp)
+        keys = ['os_distro', 'os_version', 'os_codename', 'cpu_model',
+                'memory', 'cpus']
+        self.assertEquals(sorted(keys), sorted(info.keys()))
+
+        distro, version, codename = platform.linux_distribution()
+        self.assertEquals(distro, info['os_distro'])
+        self.assertEquals(version, info['os_version'])
+        self.assertEquals(unicode(codename, "utf-8"), info['os_codename'])
+        self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
+
+    def test_hoststats(self):
+        time.sleep(1)
+        stats_keys = ['cpu_utilization', 'memory', 'disk_read_rate',
+                      'disk_write_rate', 'net_recv_rate', 'net_sent_rate']
+        resp = self.request('/host/stats').read()
+        stats = json.loads(resp)
+        self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
+
+        cpu_utilization = stats['cpu_utilization']
+        self.assertIsInstance(cpu_utilization, float)
+        self.assertGreaterEqual(cpu_utilization, 0.0)
+        self.assertTrue(cpu_utilization <= 100.0)
+
+        memory_stats = stats['memory']
+        self.assertIn('total', memory_stats)
+        self.assertIn('free', memory_stats)
+        self.assertIn('cached', memory_stats)
+        self.assertIn('buffers', memory_stats)
+        self.assertIn('avail', memory_stats)
+
+        resp = self.request('/host/stats/history').read()
+        history = json.loads(resp)
+        self.assertEquals(sorted(stats_keys), sorted(history.keys()))
+
+    def test_host_actions(self):
+        def _task_lookup(taskid):
+            return json.loads(self.request('/tasks/%s' % taskid).read())
+
+        resp = self.request('/host/shutdown', '{}', 'POST')
+        self.assertEquals(200, resp.status)
+        resp = self.request('/host/reboot', '{}', 'POST')
+        self.assertEquals(200, resp.status)
+
+        # Test system update
+        resp = self.request('/host/packagesupdate', None, 'GET')
+        pkgs = json.loads(resp.read())
+        self.assertEquals(3, len(pkgs))
+
+        pkg_keys = ['package_name', 'repository', 'arch', 'version']
+        for p in pkgs:
+            name = p['package_name']
+            resp = self.request('/host/packagesupdate/' + name, None, 'GET')
+            info = json.loads(resp.read())
+            self.assertEquals(sorted(pkg_keys), sorted(info.keys()))
+
+        resp = self.request('/host/swupdate', '{}', 'POST')
+        task = json.loads(resp.read())
+        task_params = [u'id', u'message', u'status', u'target_uri']
+        self.assertEquals(sorted(task_params), sorted(task.keys()))
+
+        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
+        task_info = json.loads(resp.read())
+        self.assertEquals(task_info['status'], 'running')
+        wait_task(_task_lookup, task_info['id'])
+        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
+        task_info = json.loads(resp.read())
+        self.assertEquals(task_info['status'], 'finished')
+        self.assertIn(u'All packages updated', task_info['message'])
+        pkgs = model.packagesupdate_get_list()
+        self.assertEquals(0, len(pkgs))
+
+    def test_host_partitions(self):
+        resp = self.request('/host/partitions')
+        self.assertEquals(200, resp.status)
+        partitions = json.loads(resp.read())
+
+        keys = ['name', 'path', 'type', 'fstype', 'size', 'mountpoint',
+                'available']
+        for item in partitions:
+            resp = self.request('/host/partitions/%s' % item['name'])
+            info = json.loads(resp.read())
+            self.assertEquals(sorted(info.keys()), sorted(keys))
+
+    def test_host_devices(self):
+        def asset_devices_type(devices, dev_type):
+            for dev in devices:
+                self.assertEquals(dev['device_type'], dev_type)
+
+        resp = self.request('/host/devices?_cap=scsi_host')
+        nodedevs = json.loads(resp.read())
+        # Mockmodel brings 3 preconfigured scsi fc_host
+        self.assertEquals(3, len(nodedevs))
+
+        nodedev = json.loads(self.request('/host/devices/scsi_host2').read())
+        # Mockmodel generates random wwpn and wwnn
+        self.assertEquals('scsi_host2', nodedev['name'])
+        self.assertEquals('fc_host', nodedev['adapter']['type'])
+        self.assertEquals(16, len(nodedev['adapter']['wwpn']))
+        self.assertEquals(16, len(nodedev['adapter']['wwnn']))
+
+        devs = json.loads(self.request('/host/devices').read())
+        dev_names = [dev['name'] for dev in devs]
+        for dev_type in ('pci', 'usb_device', 'scsi'):
+            resp = self.request('/host/devices?_cap=%s' % dev_type)
+            devsByType = json.loads(resp.read())
+            names = [dev['name'] for dev in devsByType]
+            self.assertTrue(set(names) <= set(dev_names))
+            asset_devices_type(devsByType, dev_type)
+
+        resp = self.request('/host/devices?_passthrough=true')
+        passthru_devs = [dev['name'] for dev in json.loads(resp.read())]
+        self.assertTrue(set(passthru_devs) <= set(dev_names))
+
+        for dev_type in ('pci', 'usb_device', 'scsi'):
+            resp = self.request('/host/devices?_cap=%s&_passthrough=true' %
+                                dev_type)
+            filteredDevs = json.loads(resp.read())
+            filteredNames = [dev['name'] for dev in filteredDevs]
+            self.assertTrue(set(filteredNames) <= set(dev_names))
+            asset_devices_type(filteredDevs, dev_type)
+
+        for dev in passthru_devs:
+            resp = self.request('/host/devices?_passthrough_affected_by=%s' %
+                                dev)
+            affected_devs = [dev['name'] for dev in json.loads(resp.read())]
+            self.assertTrue(set(affected_devs) <= set(dev_names))
diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
index c7e733e..aa48dd1 100644
--- a/tests/test_mockmodel.py
+++ b/tests/test_mockmodel.py
@@ -26,7 +26,6 @@ import unittest
 
 import kimchi.mockmodel
 from utils import get_free_port, patch_auth, request, run_server
-from utils import wait_task
 from kimchi.osinfo import get_template_default
 
 
@@ -131,19 +130,3 @@ class MockModelTests(unittest.TestCase):
         self.assertEquals(stats_keys, set(info['stats'].keys()))
         self.assertEquals('vnc', info['graphics']['type'])
         self.assertEquals('127.0.0.1', info['graphics']['listen'])
-
-    def test_packages_update(self):
-        pkgs = model.packagesupdate_get_list()
-        self.assertEquals(3, len(pkgs))
-
-        for pkg_name in pkgs:
-            pkgupdate = model.packageupdate_lookup(pkg_name)
-            self.assertIn('package_name', pkgupdate.keys())
-            self.assertIn('repository', pkgupdate.keys())
-            self.assertIn('arch', pkgupdate.keys())
-            self.assertIn('version', pkgupdate.keys())
-
-        task = model.host_swupdate()
-        task_params = [u'id', u'message', u'status', u'target_uri']
-        self.assertEquals(sorted(task_params), sorted(task.keys()))
-        wait_task(model.task_lookup, task['id'])
diff --git a/tests/test_model.py b/tests/test_model.py
index c25929f..210adfd 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -20,8 +20,6 @@
 
 import grp
 import os
-import platform
-import psutil
 import pwd
 import re
 import shutil
@@ -725,37 +723,6 @@ class ModelTests(unittest.TestCase):
             self.assertIn('ipaddr', iface)
             self.assertIn('netmask', iface)
 
-    @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
-    def test_get_devices(self):
-        def asset_devices_type(devices, dev_type):
-            for dev in devices:
-                self.assertEquals(dev['device_type'], dev_type)
-
-        inst = model.Model('qemu:///system',
-                           objstore_loc=self.tmp_store)
-
-        devs = inst.devices_get_list()
-
-        for dev_type in ('pci', 'usb_device', 'scsi'):
-            names = inst.devices_get_list(_cap=dev_type)
-            self.assertTrue(set(names) <= set(devs))
-            infos = [inst.device_lookup(name) for name in names]
-            asset_devices_type(infos, dev_type)
-
-        passthru_devs = inst.devices_get_list(_passthrough='true')
-        self.assertTrue(set(passthru_devs) <= set(devs))
-
-        for dev_type in ('pci', 'usb_device', 'scsi'):
-            names = inst.devices_get_list(_cap=dev_type, _passthrough='true')
-            self.assertTrue(set(names) <= set(devs))
-            infos = [inst.device_lookup(name) for name in names]
-            asset_devices_type(infos, dev_type)
-
-        for dev_name in passthru_devs:
-            affected_devs = inst.devices_get_list(
-                _passthrough_affected_by=dev_name)
-            self.assertTrue(set(affected_devs) <= set(devs))
-
     def test_async_tasks(self):
         class task_except(Exception):
             pass
@@ -983,46 +950,6 @@ class ModelTests(unittest.TestCase):
             self.assertIn('path', distro)
 
     @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
-    def test_get_hostinfo(self):
-        inst = model.Model(None,
-                           objstore_loc=self.tmp_store)
-        info = inst.host_lookup()
-        distro, version, codename = platform.linux_distribution()
-        self.assertIn('cpu_model', info)
-        self.assertIn('cpus', info)
-        self.assertEquals(distro, info['os_distro'])
-        self.assertEquals(version, info['os_version'])
-        self.assertEquals(unicode(codename, "utf-8"), info['os_codename'])
-        self.assertEquals(psutil.TOTAL_PHYMEM, info['memory'])
-
-    def test_get_hoststats(self):
-        inst = model.Model('test:///default',
-                           objstore_loc=self.tmp_store)
-        time.sleep(1.5)
-        stats = inst.hoststats_lookup()
-        stats_keys = ['cpu_utilization', 'memory', 'disk_read_rate',
-                      'disk_write_rate', 'net_recv_rate', 'net_sent_rate']
-        self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
-        cpu_utilization = stats['cpu_utilization']
-        # cpu_utilization is set int 0, after first stats sample
-        # the cpu_utilization is float in range [0.0, 100.0]
-        self.assertIsInstance(cpu_utilization, float)
-        self.assertGreaterEqual(cpu_utilization, 0.0)
-        self.assertTrue(cpu_utilization <= 100.0)
-
-        memory_stats = stats['memory']
-        self.assertIn('total', memory_stats)
-        self.assertIn('free', memory_stats)
-        self.assertIn('cached', memory_stats)
-        self.assertIn('buffers', memory_stats)
-        self.assertIn('avail', memory_stats)
-
-        history = inst.hoststatshistory_lookup()
-        self.assertEquals(sorted(stats_keys), sorted(history.keys()))
-        for key, value in history.iteritems():
-            self.assertEquals(type(value), list)
-
-    @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
     def test_deep_scan(self):
         inst = model.Model(None,
                            objstore_loc=self.tmp_store)
diff --git a/tests/test_rest.py b/tests/test_rest.py
index ee350b2..4ade722 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -93,19 +93,6 @@ class RestTests(unittest.TestCase):
         resp = self.request(*args)
         self.assertEquals(code, resp.status)
 
-    def test_host_devices(self):
-        resp = self.request('/host/devices?_cap=scsi_host')
-        nodedevs = json.loads(resp.read())
-        # Mockmodel brings 3 preconfigured scsi fc_host
-        self.assertEquals(3, len(nodedevs))
-
-        nodedev = json.loads(self.request('/host/devices/scsi_host2').read())
-        # Mockmodel generates random wwpn and wwnn
-        self.assertEquals('scsi_host2', nodedev['name'])
-        self.assertEquals('fc_host', nodedev['adapter']['type'])
-        self.assertEquals(16, len(nodedev['adapter']['wwpn']))
-        self.assertEquals(16, len(nodedev['adapter']['wwnn']))
-
     def test_get_vms(self):
         vms = json.loads(self.request('/vms').read())
         # test_rest.py uses MockModel() which connects to libvirt URI
@@ -1129,65 +1116,6 @@ class RestTests(unittest.TestCase):
             resp = request(host, ssl_port, debugre['uri'])
             self.assertEquals(200, resp.status)
 
-    def test_host(self):
-        resp = self.request('/host').read()
-        info = json.loads(resp)
-
-        keys = ['os_distro', 'os_version', 'os_codename', 'cpu_model',
-                'memory', 'cpus']
-        self.assertEquals(sorted(keys), sorted(info.keys()))
-
-    def test_hoststats(self):
-        stats_keys = ['cpu_utilization', 'memory', 'disk_read_rate',
-                      'disk_write_rate', 'net_recv_rate', 'net_sent_rate']
-        resp = self.request('/host/stats').read()
-        stats = json.loads(resp)
-        self.assertEquals(sorted(stats_keys), sorted(stats.keys()))
-
-        cpu_utilization = stats['cpu_utilization']
-        self.assertIsInstance(cpu_utilization, float)
-        self.assertGreaterEqual(cpu_utilization, 0.0)
-        self.assertTrue(cpu_utilization <= 100.0)
-
-        memory_stats = stats['memory']
-        self.assertIn('total', memory_stats)
-        self.assertIn('free', memory_stats)
-        self.assertIn('cached', memory_stats)
-        self.assertIn('buffers', memory_stats)
-        self.assertIn('avail', memory_stats)
-
-        resp = self.request('/host/stats/history').read()
-        history = json.loads(resp)
-        self.assertEquals(sorted(stats_keys), sorted(history.keys()))
-
-    def test_packages_update(self):
-        resp = self.request('/host/packagesupdate', None, 'GET')
-        pkgs = json.loads(resp.read())
-        self.assertEquals(3, len(pkgs))
-
-        for p in pkgs:
-            name = p['package_name']
-            resp = self.request('/host/packagesupdate/' + name, None, 'GET')
-            info = json.loads(resp.read())
-            self.assertIn('package_name', info.keys())
-            self.assertIn('repository', info.keys())
-            self.assertIn('arch', info.keys())
-            self.assertIn('version', info.keys())
-
-        resp = self.request('/host/swupdate', '{}', 'POST')
-        task = json.loads(resp.read())
-        task_params = [u'id', u'message', u'status', u'target_uri']
-        self.assertEquals(sorted(task_params), sorted(task.keys()))
-
-        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
-        task_info = json.loads(resp.read())
-        self.assertEquals(task_info['status'], 'running')
-        wait_task(self._task_lookup, task_info['id'])
-        resp = self.request('/tasks/' + task[u'id'], None, 'GET')
-        task_info = json.loads(resp.read())
-        self.assertEquals(task_info['status'], 'finished')
-        self.assertIn(u'All packages updated', task_info['message'])
-
     def test_repositories(self):
         def verify_repo(t, res):
             for field in ('repo_id', 'enabled', 'baseurl', 'config'):
-- 
2.1.0




More information about the Kimchi-devel mailing list