The mockmodel was designed to provide a way to user tests Kimchi
without affecting the system (by "kimchid --test") and also to make the
tests easier to do.
But in fact, we have a bunch of code that completely differs from
the real model, so the developer needs to do 2 kinds of implementations
while developing a new feature.
This patch change MockModel to be Model("test:///default") and only
overrides what is not supported by the libvirt Test Driver.
It also fixes the test cases after that change: only delete a inactive
storage pool, the Test Driver already has on VM created on start up, the
ISO file must be an existing path, etc.
The run_tests.sh.in script was also updated to run the test_model.py
prior to the MockModel tests as the MockModel will override some Model
methods which can cause problems if the MockModel runs before Model.
Signed-off-by: Aline Manera <alinefm(a)linux.vnet.ibm.com>
---
src/kimchi/mockmodel.py | 1769 ++++++++-----------------------------------
src/kimchi/vmtemplate.py | 9 +-
tests/run_tests.sh.in | 18 +-
tests/test_authorization.py | 25 +-
tests/test_mockmodel.py | 60 +-
tests/test_rest.py | 254 +++----
6 files changed, 495 insertions(+), 1640 deletions(-)
diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index 626ef35..7d3eb38 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -14,1105 +14,291 @@
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the
-# Free Software Foundation, Inc.
-# 51 Franklin Street, Fifth Floor,
-# Boston, MA 02110-1301 USA
-
-import cherrypy
-import copy
-import disks
-import glob
-import ipaddr
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import libvirt
+import lxml.etree as ET
import os
-import shutil
-import psutil
import random
-import string
import time
-import uuid
-
-
-try:
- from PIL import Image
- from PIL import ImageDraw
-except ImportError:
- import Image
- import ImageDraw
+from lxml import objectify
from kimchi import config
-from kimchi.asynctask import AsyncTask
-from kimchi.config import READONLY_POOL_TYPE, config as kconfig
-from kimchi.distroloader import DistroLoader
-from kimchi.exception import InvalidOperation, InvalidParameter
-from kimchi.exception import MissingParameter, NotFoundError, OperationFailed
-from kimchi.model.storagepools import ISO_POOL_NAME
-from kimchi.model.storageservers import STORAGE_SERVERS
-from kimchi.model.utils import get_vm_name
+from kimchi import imageinfo
+from kimchi import osinfo
+from kimchi.model.debugreports import DebugReportsModel
+from kimchi.model.host import DeviceModel
+from kimchi.model.model import Model
+from kimchi.model.storagevolumes import StorageVolumesModel
+from kimchi.model.templates import LibvirtVMTemplate
from kimchi.objectstore import ObjectStore
-from kimchi.screenshot import VMScreenshot
-from kimchi.utils import get_next_clone_name, pool_name_from_uri
-from kimchi.utils import validate_repo_url, template_name_from_uri
+from kimchi.utils import add_task
from kimchi.vmtemplate import VMTemplate
+from kimchi.xmlutils.utils import xml_item_update
fake_user = {'admin': 'letmein!'}
+mockmodel_defaults = {'storagepool': '/storagepools/default-pool',
+ 'domain': 'test', 'arch': 'i686'}
+
+class MockModel(Model):
+ _mock_vms = {}
+ _XMLDesc = libvirt.virDomain.XMLDesc
+ _defineXML = libvirt.virConnect.defineXML
+ _undefineDomain = libvirt.virDomain.undefine
+ _libvirt_get_vol_path = LibvirtVMTemplate._get_volume_path
-class MockModel(object):
def __init__(self, objstore_loc=None):
- self.reset()
- self.objstore = ObjectStore(objstore_loc)
- self.objstore_loc = objstore_loc
- self.distros = self._get_distros()
-
- def capabilities_lookup(self, *ident):
- return {'libvirt_stream_protocols':
- ['http', 'https', 'ftp', 'ftps',
'tftp'],
- 'qemu_spice': True,
- 'qemu_stream': True,
- 'screenshot': True,
- 'system_report_tool': True,
- 'update_tool': True,
- 'repo_mngt_tool': 'yum',
- 'federation': 'off'}
+ # Override osinfo.defaults to ajust the values according to
+ # test:///default driver
+ defaults = dict(osinfo.defaults)
+ defaults.update(mockmodel_defaults)
+ osinfo.defaults = dict(defaults)
- def reset(self):
- if hasattr(self, 'objstore'):
- self.objstore = ObjectStore(self.objstore_loc)
- self._mock_vms = {}
- self._mock_screenshots = {}
- self._mock_templates = {}
- self._mock_storagepools = {'default':
MockStoragePool('default')}
- self._mock_networks = {'default': MockNetwork('default')}
- self._mock_interfaces = self.dummy_interfaces()
- self._mock_swupdate = MockSoftwareUpdate()
- self.next_taskid = 1
- self.storagepool_activate('default')
- self._mock_host_repositories = MockRepositories()
self._mock_devices = MockDevices()
+ self._mock_interfaces = MockInterfaces()
+ self._mock_storagevolumes = MockStorageVolumes()
+ self._mock_swupdate = MockSoftwareUpdate()
+ self._mock_repositories = MockRepositories()
+
+ libvirt.virConnect.defineXML = MockModel.domainDefineXML
+ libvirt.virDomain.XMLDesc = MockModel.domainXMLDesc
+ libvirt.virDomain.undefine = MockModel.undefineDomain
+ libvirt.virDomain.attachDeviceFlags = MockModel.attachDeviceFlags
+ libvirt.virDomain.detachDeviceFlags = MockModel.detachDeviceFlags
+ libvirt.virDomain.updateDeviceFlags = MockModel.updateDeviceFlags
+ libvirt.virStorageVol.resize = MockModel.volResize
+ libvirt.virStorageVol.wipePattern = MockModel.volWipePattern
+
+ super(MockModel, self).__init__('test:///default', objstore_loc)
+ self.objstore_loc = objstore_loc
+ self.objstore = ObjectStore(objstore_loc)
- def _static_vm_update(self, dom, params):
- state = dom.info['state']
-
- for key, val in params.items():
- if key == 'name':
- if state == 'running' or params['name'] in
self.vms_get_list():
- msg_args = {'name': dom.name, 'new_name':
params['name']}
- raise InvalidParameter("KCHVM0003E", msg_args)
-
- del self._mock_vms[dom.name]
- dom.name = params['name']
- self._mock_vms[dom.name] = dom
-
- elif key == 'users':
- invalid_users = set(val) - set(self.users_get_list())
- if len(invalid_users) != 0:
- raise InvalidParameter("KCHVM0027E",
- {'users': ",
".join(invalid_users)})
+ # The MockModel methods are instantiated on runtime according to Model
+ # and BaseModel
+ # Because that a normal method override will not work here
+ # Instead of that we also need to do the override on runtime
+ for method in dir(self):
+ if method.startswith('_mock_'):
+ mock_method = getattr(self, method)
+ if not callable(mock_method):
+ continue
+
+ m = method.strip('_mock_')
+ model_method = getattr(self, m)
+ setattr(self, '_model_' + m, model_method)
+ setattr(self, m, mock_method)
+
+ DeviceModel.lookup = self._mock_device_lookup
+ StorageVolumesModel.get_list = self._mock_storagevolumes_get_list
+ DebugReportsModel._gen_debugreport_file = self._gen_debugreport_file
+ LibvirtVMTemplate._get_volume_path = self._get_volume_path
+ VMTemplate.get_iso_info = self._probe_image
+ imageinfo.probe_image = self._probe_image
- elif key == 'groups':
- invalid_groups = set(val) - set(self.groups_get_list())
- if len(invalid_groups) != 0:
- raise InvalidParameter("KCHVM0028E",
- {'groups':
- ", ".join(invalid_groups)})
+ def reset(self):
+ MockModel._mock_vms = {}
+ self._mock_swupdate = MockSoftwareUpdate()
+ self._mock_repositories = MockRepositories()
- dom.info[key] = val
+ if hasattr(self, 'objstore'):
+ self.objstore = ObjectStore(self.objstore_loc)
- def _live_vm_update(self, dom, params):
- if 'graphics' not in params:
- return
+ params = {'vms': [u'test'], 'templates': [],
+ 'networks': [u'default'], 'storagepools':
[u'default-pool']}
- graphics = params.pop('graphics')
- passwd = graphics.get('passwd')
- if passwd is None:
- passwd = "".join(random.sample(string.ascii_letters +
- string.digits, 8))
+ for res, items in params.iteritems():
+ resources = getattr(self, '%s_get_list' % res)()
+ for i in resources:
+ if i in items:
+ continue
- expire = graphics.get('passwdValidTo')
- if expire is not None:
- expire = round(time.time()) + expire
+ try:
+ getattr(self, '%s_deactivate' % res[:-1])(i)
+ except:
+ pass
- dom.info['graphics']["passwd"] = passwd
- dom.info['graphics']["passwdValidTo"] = expire
+ getattr(self, '%s_delete' % res[:-1])(i)
- def vm_update(self, name, params):
- dom = self._get_vm(name)
- self._static_vm_update(dom, params)
- self._live_vm_update(dom, params)
+ volumes = self.storagevolumes_get_list('default-pool')
+ for v in volumes:
+ self.storagevolume_delete('default-pool', v)
- return dom.name
-
- def vm_lookup(self, name):
- vm = self._get_vm(name)
- if vm.info['state'] == 'running':
- vm.info['screenshot'] = self.vmscreenshot_lookup(name)
- else:
- vm.info['screenshot'] = None
-
- validTo = vm.info['graphics']['passwdValidTo']
- validTo = (validTo - round(time.time()) if validTo is not None
- else None)
- vm.info['graphics']['passwdValidTo'] = validTo
- return vm.info
-
- def vm_delete(self, name):
- vm = self._get_vm(name)
- self._vmscreenshot_delete(vm.uuid)
- for disk in vm.disk_paths:
- self.storagevolume_delete(disk['pool'], disk['volume'])
-
- del self._mock_vms[vm.name]
-
- def vm_start(self, name):
- self._get_vm(name).info['state'] = 'running'
-
- def vm_poweroff(self, name):
- self._get_vm(name).info['state'] = 'shutoff'
-
- def vm_shutdown(self, name):
- self._get_vm(name).info['state'] = 'shutoff'
-
- def vm_reset(self, name):
- pass
-
- def vm_connect(self, name):
- pass
-
- def vm_clone(self, name):
- vm = self._mock_vms[name]
- if vm.info['state'] != u'shutoff':
- raise InvalidParameter('KCHVM0033E', {'name': name})
-
- new_name = get_next_clone_name(self.vms_get_list(), name)
-
- taskid = self.add_task(u'/vms/%s' % new_name, self._do_clone,
- {'name': name, 'new_name': new_name})
- return self.task_lookup(taskid)
-
- def _do_clone(self, cb, params):
- name = params['name']
- new_name = params['new_name']
-
- vm = self._mock_vms[name]
- new_vm = copy.deepcopy(vm)
-
- new_uuid = unicode(uuid.uuid4())
-
- new_vm.name = new_name
- new_vm.info['name'] = new_name
- new_vm.uuid = new_uuid
- new_vm.info['uuid'] = new_uuid
-
- for mac, iface in new_vm.ifaces.items():
- new_mac = MockVMIface.get_mac()
- iface.info['mac'] = new_mac
- new_vm.ifaces[new_mac] = iface
-
- storage_names = new_vm.storagedevices.keys()
- for i, storage_name in enumerate(storage_names):
- storage = new_vm.storagedevices[storage_name]
- basename, ext = os.path.splitext(storage.info['path'])
- new_path = u'%s-%d%s' % (basename, i, ext)
- new_vm.storagedevices[storage_name].path = new_path
-
- self._mock_vms[new_name] = new_vm
-
- cb('OK', True)
-
- def vms_create(self, params):
- t_name = template_name_from_uri(params['template'])
- name = get_vm_name(params.get('name'), t_name, self._mock_vms.keys())
- if name in self._mock_vms:
- raise InvalidOperation("KCHVM0001E", {'name': name})
-
- vm_uuid = str(uuid.uuid4())
- vm_overrides = dict()
- pool_uri = params.get('storagepool')
- if pool_uri:
- vm_overrides['storagepool'] = pool_uri
-
- t = self._get_template(t_name, vm_overrides)
- t.validate()
-
- t_info = copy.deepcopy(t.info)
- graphics = params.get('graphics')
- if graphics:
- t_info.update({'graphics': graphics})
-
- vm = MockVM(vm_uuid, name, t_info)
- icon = t_info.get('icon')
- if icon:
- vm.info['icon'] = icon
-
- pool = t._storage_validate()
- if pool.info['type'] == 'scsi':
- vm.disk_paths = []
- if not params.get('volumes'):
- raise MissingParameter('KCHVM0017E')
- for vol in params['volumes']:
- vm.disk_paths.append({'pool': pool.name,
- 'volume': vol})
-
- else:
- vm.disk_paths = t.fork_vm_storage(vm_uuid)
-
- index = 0
- for disk in vm.disk_paths:
- storagepath =
self._mock_storagepools[disk['pool']].info['path']
- fullpath = os.path.join(storagepath, disk['volume'])
- dev_name = "hd" + string.ascii_lowercase[index]
- params = {'dev': dev_name, 'path': fullpath, 'type':
'disk'}
- vm.storagedevices[dev_name] = MockVMStorageDevice(params)
- index += 1
-
- cdrom = "hd" + string.ascii_lowercase[index + 1]
- if t_info.get('cdrom'):
- cdrom_params = {
- 'dev': cdrom, 'path': t_info['cdrom'],
'type': 'cdrom'}
- vm.storagedevices[cdrom] = MockVMStorageDevice(cdrom_params)
-
- self._mock_vms[name] = vm
- return name
-
- def vms_get_list(self):
- names = self._mock_vms.keys()
- return sorted(names, key=unicode.lower)
-
- def vmscreenshot_lookup(self, name):
- vm = self._get_vm(name)
- if vm.info['state'] != 'running':
- raise NotFoundError("KCHVM0004E", {'name': name})
-
- screenshot = self._mock_screenshots.setdefault(
- vm.uuid, MockVMScreenshot({'uuid': vm.uuid}))
- return screenshot.lookup()
-
- def _vmscreenshot_delete(self, vm_uuid):
- screenshot = self._mock_screenshots.get(vm_uuid)
- if screenshot:
- screenshot.delete()
- del self._mock_screenshots[vm_uuid]
-
- def template_lookup(self, name):
- t = self._get_template(name)
- return t.validate_integrity()
-
- def template_delete(self, name):
- try:
- del self._mock_templates[name]
- except KeyError:
- raise NotFoundError("KCHTMPL0002E", {'name': name})
-
- def templates_create(self, params):
- name = params.get('name', '').strip()
-
- for net_name in params.get(u'networks', []):
- try:
- self._get_network(net_name)
- except NotFoundError:
- msg_args = {'network': net_name, 'template': name}
- raise InvalidParameter("KCHTMPL0003E", msg_args)
-
- if params.get('cpu_info') is None:
- params['cpu_info'] = dict()
-
- t = MockVMTemplate(params, self)
- if t.name in self._mock_templates:
- raise InvalidOperation("KCHTMPL0001E", {'name': name})
-
- self._mock_templates[name] = t
- return name
-
- def template_clone(self, name):
- # set default name
- subfixs = [v[len(name):] for v in self.templates_get_list()
- if v.startswith(name)]
- indexs = [int(v.lstrip("-clone")) for v in subfixs
- if v.startswith("-clone") and
- v.lstrip("-clone").isdigit()]
- indexs.sort()
- index = "1" if not indexs else str(indexs[-1] + 1)
- clone_name = name + "-clone" + index
-
- temp = self.template_lookup(name)
- temp['name'] = clone_name
- ident = self.templates_create(temp)
- return ident
-
- def template_update(self, name, params):
- old_t = self.template_lookup(name)
- new_t = copy.copy(old_t)
-
- new_t.update(params)
- ident = name
-
- new_storagepool = new_t.get(u'storagepool', '')
- try:
- self._get_storagepool(pool_name_from_uri(new_storagepool))
- except Exception:
- msg_args = {'pool': new_storagepool, 'template': name}
- raise InvalidParameter("KCHTMPL0004E", msg_args)
-
- for net_name in params.get(u'networks', []):
- try:
- self._get_network(net_name)
- except NotFoundError:
- msg_args = {'network': net_name, 'template': name}
- raise InvalidParameter("KCHTMPL0003E", msg_args)
-
- self.template_delete(name)
+ @staticmethod
+ def domainDefineXML(conn, xml):
+ name = objectify.fromstring(xml).name.text
try:
- ident = self.templates_create(new_t)
+ dom = conn.lookupByName(name)
+ if not dom.isActive():
+ MockModel._mock_vms[name] = xml
except:
- ident = self.templates_create(old_t)
- raise
- return ident
+ pass
+
+ return MockModel._defineXML(conn, xml)
+
+ @staticmethod
+ def domainXMLDesc(dom, flags=0):
+ return MockModel._mock_vms.get(dom.name(),
+ MockModel._XMLDesc(dom, flags))
+
+ @staticmethod
+ def undefineDomain(dom):
+ name = dom.name()
+ if name in MockModel._mock_vms.keys():
+ del MockModel._mock_vms[dom.name()]
+ return MockModel._undefineDomain(dom)
+
+ @staticmethod
+ def attachDeviceFlags(dom, xml, flags=0):
+ old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
+ root = objectify.fromstring(old_xml)
+ dev = objectify.fromstring(xml)
+ root.devices.append(dev)
+
+ MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
+
+ @staticmethod
+ def _get_device_node(dom, xml):
+ xpath_map = {'disk': 'target',
+ 'interface': 'mac',
+ 'graphics': 'listen'}
+
+ dev = objectify.fromstring(xml)
+ dev_id = dev.find(xpath_map[dev.tag]).items()
+
+ dev_filter = ''
+ for key, value in dev_id:
+ dev_filter += "[@%s='%s']" % (key, value)
+
+ old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
+ root = objectify.fromstring(old_xml)
+ devices = root.devices
+
+ dev = devices.find("./%s/%s%s/.." % (dev.tag, xpath_map[dev.tag],
+ dev_filter))
+
+ return (root, dev)
+
+ @staticmethod
+ def detachDeviceFlags(dom, xml, flags=0):
+ root, dev = MockModel._get_device_node(dom, xml)
+ root.devices.remove(dev)
+
+ MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
+
+ @staticmethod
+ def updateDeviceFlags(dom, xml, flags=0):
+ root, old_dev = MockModel._get_device_node(dom, xml)
+ root.devices.replace(old_dev, objectify.fromstring(xml))
+ MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
+
+ @staticmethod
+ def volResize(vol, size, flags=0):
+ new_xml = xml_item_update(vol.XMLDesc(0), './capacity', str(size))
+ vol.delete(0)
+ pool = vol.storagePoolLookupByVolume()
+ pool.createXML(new_xml)
+
+ @staticmethod
+ def volWipePattern(vol, algorithm, flags=0):
+ new_xml = xml_item_update(vol.XMLDesc(0), './allocation', '0')
+ vol.delete(0)
+ pool = vol.storagePoolLookupByVolume()
+ pool.createXML(new_xml)
+
+ def _probe_image(self, path):
+ return ('unkown', 'unkown')
- def templates_get_list(self):
- return self._mock_templates.keys()
-
- def _get_template(self, name, overrides=None):
- try:
- t = self._mock_templates[name]
- if overrides:
- args = copy.copy(t.info)
- args.update(overrides)
- return MockVMTemplate(args, self)
- else:
- return t
- except KeyError:
- raise NotFoundError("KCHTMPL0002E", {'name': name})
-
- def debugreport_lookup(self, name):
- path = config.get_debugreports_path()
- file_pattern = os.path.join(path, name + '.txt')
- try:
- file_target = glob.glob(file_pattern)[0]
- except IndexError:
- raise NotFoundError("KCHDR0001E", {'name': name})
-
- ctime = os.stat(file_target).st_mtime
- ctime = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(ctime))
- file_target = os.path.split(file_target)[-1]
- file_target = os.path.join("/data/debugreports", file_target)
- return {'uri': file_target,
- 'ctime': ctime}
-
- def debugreportcontent_lookup(self, name):
- return self.debugreport_lookup(name)
-
- def debugreport_update(self, name, params):
- path = config.get_debugreports_path()
- file_pattern = os.path.join(path, name + '.txt')
- try:
- file_source = glob.glob(file_pattern)[0]
- except IndexError:
- raise NotFoundError("KCHDR0001E", {'name': name})
+ def _get_volume_path(self, pool, vol):
+ pool_info = self.storagepool_lookup(pool)
+ if pool_info['type'] == 'scsi':
+ return self._mock_storagevolumes.scsi_volumes[vol]['path']
- file_target = file_source.replace(name, params['name'])
- if os.path.isfile(file_target):
- raise InvalidParameter('KCHDR0008E', {'name':
params['name']})
+ return MockModel._libvirt_get_vol_path(pool, vol)
- shutil.move(file_source, file_target)
- return params['name']
+ def _gen_debugreport_file(self, name):
+ return add_task('/debugreports/%s' % name, self._create_log,
+ self.objstore, name)
- def debugreport_delete(self, name):
- path = config.get_debugreports_path()
- file_pattern = os.path.join(path, name + '.txt')
- try:
- file_target = glob.glob(file_pattern)[0]
- except IndexError:
- raise NotFoundError("KCHDR0001E", {'name': name})
-
- os.remove(file_target)
-
- def debugreports_create(self, params):
- ident = params.get('name').strip()
- # Generate a name with time and millisec precision, if necessary
- if ident is None or ident == "":
- ident = 'report-' + str(int(time.time() * 1000))
- else:
- if ident in self.debugreports_get_list():
- raise InvalidParameter("KCHDR0008E", {"name":
ident})
- taskid = self._gen_debugreport_file(ident)
- return self.task_lookup(taskid)
-
- def debugreports_get_list(self):
+ def _create_log(self, cb, name):
path = config.get_debugreports_path()
- file_pattern = os.path.join(path, '*.txt')
- file_lists = glob.glob(file_pattern)
- file_lists = [os.path.split(file)[1] for file in file_lists]
- name_lists = [file.split('.', 1)[0] for file in file_lists]
-
- return name_lists
-
- def _get_vm(self, name):
- try:
- return self._mock_vms[name]
- except KeyError:
- raise NotFoundError("KCHVM0002E", {'name': name})
-
- def storagepools_create(self, params):
- try:
- name = params['name']
- pool = MockStoragePool(name)
- pool.info['type'] = params['type']
- if params['type'] == 'scsi':
- pool.info['path'] = '/dev/disk/by-path'
- pool.info['source'] = params['source']
- if not pool.info['source'].get('adapter_name'):
- raise MissingParameter('KCHPOOL0004E',
- {'item': 'adapter_name',
- 'name': name})
- for vol in ['unit:0:0:1', 'unit:0:0:2',
- 'unit:0:0:3', 'unit:0:0:4']:
- mockvol = MockStorageVolume(name, vol,
- dict([('type', 'lun')]))
- pool._volumes[vol] = mockvol
- else:
- pool.info['path'] = params['path']
- if params['type'] in ['dir', 'scsi']:
- pool.info['autostart'] = True
- else:
- pool.info['autostart'] = False
- except KeyError, item:
- raise MissingParameter("KCHPOOL0004E",
- {'item': str(item), 'name': name})
-
- if name in self._mock_storagepools or name in (ISO_POOL_NAME,):
- raise InvalidOperation("KCHPOOL0001E", {'name': name})
-
- self._mock_storagepools[name] = pool
- return name
-
- def storagepool_lookup(self, name):
- storagepool = self._get_storagepool(name)
- storagepool.refresh()
- return storagepool.info
-
- def storagepool_update(self, name, params):
- pool = self._get_storagepool(name)
- if 'autostart' in params:
- pool.info['autostart'] = params['autostart']
- if 'disks' in params:
- # check if pool is type 'logical'
- if pool.info['type'] != 'logical':
- raise InvalidOperation('KCHPOOL0029E')
- self._update_lvm_disks(name, params['disks'])
- ident = pool.name
- return ident
-
- def storagepool_activate(self, name):
- self._get_storagepool(name).info['state'] = 'active'
-
- def storagepool_deactivate(self, name):
- self._get_storagepool(name).info['state'] = 'inactive'
-
- def storagepool_delete(self, name):
- # firstly, we should check the pool actually exists
- pool = self._get_storagepool(name)
- del self._mock_storagepools[pool.name]
-
- def storagepools_get_list(self):
- return sorted(self._mock_storagepools.keys())
-
- def _get_storagepool(self, name):
- try:
- return self._mock_storagepools[name]
- except KeyError:
- raise NotFoundError("KCHPOOL0002E", {'name': name})
+ tmpf = os.path.join(path, name + '.tmp')
+ realf = os.path.join(path, name + '.txt')
+ length = random.randint(1000, 10000)
+ with open(tmpf, 'w') as fd:
+ while length:
+ fd.write('I am logged')
+ length = length - 1
+ os.rename(tmpf, realf)
+ cb("OK", True)
- def storagevolumes_create(self, pool_name, params):
+ def _mock_storagevolumes_create(self, pool, params):
vol_source = ['file', 'url', 'capacity']
- require_name_params = ['capacity']
-
- name = params.get('name')
-
index_list = list(i for i in range(len(vol_source))
if vol_source[i] in params)
- if len(index_list) != 1:
- raise InvalidParameter("KCHVOL0018E",
- {'param': ",".join(vol_source)})
-
create_param = vol_source[index_list[0]]
-
+ name = params.get('name')
if name is None:
- if create_param in require_name_params:
- raise InvalidParameter('KCHVOL0016E')
-
if create_param == 'file':
name = os.path.basename(params['file'].filename)
+ del params['file']
+ params['capacity'] = 1024
elif create_param == 'url':
name = os.path.basename(params['url'])
- else:
- name = 'upload-%s' % int(time.time())
+ del params['url']
+ params['capacity'] = 1024
params['name'] = name
- try:
- create_func = getattr(self, '_create_volume_with_%s' %
- create_param)
- except AttributeError:
- raise InvalidParameter("KCHVOL0019E", {'param':
create_param})
-
- pool = self._get_storagepool(pool_name)
- if pool.info['type'] in READONLY_POOL_TYPE:
- raise InvalidParameter("KCHVOL0012E", {'type':
pool.info['type']})
- if pool.info['state'] == 'inactive':
- raise InvalidParameter('KCHVOL0003E', {'pool': pool_name,
- 'volume': name})
- if name in pool._volumes:
- raise InvalidOperation("KCHVOL0001E", {'name': name})
-
- params['pool'] = pool_name
- targeturi = '/storagepools/%s/storagevolumes/%s' % (pool_name, name)
- taskid = self.add_task(targeturi, create_func, params)
- return self.task_lookup(taskid)
-
- def _create_volume_with_file(self, cb, params):
- upload_file = params['file']
- params['name'] = params['name']
- params['format'] = 'raw'
- params['capacity'] = upload_file.fp.length
- size = 0
- try:
- while True:
- data = upload_file.file.read(8192*32)
- if not data:
- break
- size += len(data)
- cb('%s/%s' % (size, params['capacity']), True)
- except Exception as e:
- raise OperationFailed('KCHVOL0007E',
- {'name': params['name'],
- 'pool': params['pool'],
- 'err': e.message})
- self._create_volume_with_capacity(cb, params)
- cb('%s/%s' % (size, params['capacity']), True)
-
- def _create_volume_with_capacity(self, cb, params):
- pool_name = params.pop('pool')
- pool = self._get_storagepool(pool_name)
+ return self._model_storagevolumes_create(pool, params)
- try:
- name = params['name']
- volume = MockStorageVolume(pool, name, params)
- volume.info['type'] = 'file'
- volume.info['ref_cnt'] = params.get('ref_cnt', 0)
- volume.info['format'] = params['format']
- volume.info['path'] = os.path.join(
- pool.info['path'], name)
- if 'base' in params:
- volume.info['base'] = copy.deepcopy(params['base'])
- except KeyError, item:
- raise MissingParameter("KCHVOL0004E",
- {'item': str(item), 'volume': name})
-
- pool._volumes[name] = volume
- cb('OK', True)
-
- def _create_volume_with_url(self, cb, params):
- pool_name = params['pool']
- name = params['name']
- url = params['url']
-
- pool = self._get_storagepool(pool_name)
-
- params['path'] = os.path.join(pool.info['path'], name)
- params['type'] = 'file'
- params['base'] = url
-
- volume = MockStorageVolume(pool, name, params)
- pool._volumes[name] = volume
-
- cb('OK', True)
-
- def storagevolume_clone(self, pool, name, new_pool=None, new_name=None):
- if new_name is None:
- base, ext = os.path.splitext(name)
- new_name = get_next_clone_name(self.vms_get_list(), base, ext)
-
- if new_pool is None:
- new_pool = pool
-
- params = {'name': name,
- 'pool': pool,
- 'new_name': new_name,
- 'new_pool': new_pool}
- taskid = self.add_task('/storagepools/%s/storagevolumes/%s' %
- (new_pool, new_name),
- self._storagevolume_clone_task, params)
- return self.task_lookup(taskid)
-
- def _storagevolume_clone_task(self, cb, params):
- try:
- vol_name = params['name'].decode('utf-8')
- pool_name = params['pool'].decode('utf-8')
- new_vol_name = params['new_name'].decode('utf-8')
- new_pool_name = params['new_pool'].decode('utf-8')
-
- orig_pool = self._get_storagepool(pool_name)
- orig_vol = self._get_storagevolume(pool_name, vol_name)
-
- new_vol = copy.deepcopy(orig_vol)
- new_vol.info['name'] = new_vol_name
- new_vol.info['path'] = os.path.join(orig_pool.info['path'],
- new_vol_name)
-
- new_pool = self._get_storagepool(new_pool_name)
- new_pool._volumes[new_vol_name] = new_vol
- except (KeyError, NotFoundError), e:
- raise OperationFailed('KCHVOL0023E',
- {'name': vol_name, 'pool': pool_name,
- 'err': e.message})
-
- cb('OK', True)
-
- def storagevolume_lookup(self, pool, name):
- if self._get_storagepool(pool).info['state'] != 'active':
- raise InvalidOperation("KCHVOL0005E", {'pool': pool,
- 'volume': name})
-
- storagevolume = self._get_storagevolume(pool, name)
- return storagevolume.info
-
- def storagevolume_wipe(self, pool, name):
- volume = self._get_storagevolume(pool, name)
- volume.info['allocation'] = 0
-
- def storagevolume_delete(self, pool, name):
- # firstly, we should check the pool actually exists
- volume = self._get_storagevolume(pool, name)
- del self._get_storagepool(pool)._volumes[volume.name]
-
- def storagevolume_resize(self, pool, name, size):
- volume = self._get_storagevolume(pool, name)
- volume.info['capacity'] = size
-
- def storagevolumes_get_list(self, pool):
- res = self._get_storagepool(pool)
- if res.info['state'] == 'inactive':
- raise InvalidOperation("KCHVOL0006E", {'pool': pool})
- return res._volumes.keys()
-
- def devices_get_list(self, _cap=None, _passthrough=None,
- _passthrough_affected_by=None):
- if _cap is None:
- return self._mock_devices.devices.keys()
- return [dev['name'] for dev in self._mock_devices.devices.values()
- if dev['device_type'] == _cap]
+ def _mock_storagevolumes_get_list(self, pool):
+ pool_info = self.storagepool_lookup(pool)
+ if pool_info['type'] == 'scsi':
+ return self._mock_storagevolumes.scsi_volumes.keys()
- def device_lookup(self, dev_name):
- return self._mock_devices.devices[dev_name]
+ return self._model_storagevolumes_get_list(pool)
- def isopool_lookup(self, name):
- return {'state': 'active',
- 'type': 'kimchi-iso'}
-
- def isovolumes_get_list(self):
- iso_volumes = []
- pools = self.storagepools_get_list()
-
- for pool in pools:
- try:
- volumes = self.storagevolumes_get_list(pool)
- except InvalidOperation:
- # Skip inactive pools
- continue
- for volume in volumes:
- res = self.storagevolume_lookup(pool, volume)
- if res['format'] == 'iso':
- # prevent iso from different pool having same volume name
- res['name'] = '%s-%s' % (pool, volume)
- iso_volumes.append(res)
- return iso_volumes
-
- def storageservers_get_list(self, _target_type=None):
- # FIXME: This needs to be updted when adding new storage server support
- target_type = STORAGE_SERVERS \
- if not _target_type else [_target_type]
- pools = self.storagepools_get_list()
- server_list = []
- for pool in pools:
- try:
- pool_info = self.storagepool_lookup(pool)
- if (pool_info['type'] in target_type and
- pool_info['source']['addr'] not in server_list):
- server_list.append(pool_info['source']['addr'])
- except NotFoundError:
- pass
-
- return server_list
-
- def storageserver_lookup(self, server):
- pools = self.storagepools_get_list()
- for pool in pools:
- try:
- pool_info = self.storagepool_lookup(pool)
- if pool_info['source'] and \
- pool_info['source']['addr'] == server:
- return dict(host=server)
- except NotFoundError:
- # Avoid inconsistent pool result because
- # of lease between list and lookup
- pass
-
- raise NotFoundError("KCHSR0001E", {'server': server})
-
- def dummy_interfaces(self):
- interfaces = {}
- ifaces = {"eth1": "nic", "bond0":
"bonding",
- "eth1.10": "vlan", "bridge0":
"bridge"}
- for i, name in enumerate(ifaces.iterkeys()):
- iface = Interface(name)
- iface.info['type'] = ifaces[name]
- iface.info['ipaddr'] = '192.168.%s.101' % (i + 1)
- interfaces[name] = iface
- interfaces['eth1'].info['ipaddr'] = '192.168.0.101'
- return interfaces
-
- def interfaces_get_list(self):
- return self._mock_interfaces.keys()
-
- def interface_lookup(self, name):
- return self._mock_interfaces[name].info
-
- def networks_create(self, params):
- name = params['name']
- if name in self.networks_get_list():
- raise InvalidOperation("KCHNET0001E", {'name': name})
-
- network = MockNetwork(name)
- connection = params['connection']
- network.info['connection'] = connection
- if connection == "bridge":
- try:
- interface = params['interface']
- network.info['interface'] = interface
- except KeyError:
- raise MissingParameter("KCHNET0004E",
- {'name': name})
-
- subnet = params.get('subnet', '')
- if subnet:
- network.info['subnet'] = subnet
- try:
- net = ipaddr.IPNetwork(subnet)
- except ValueError:
- msg_args = {'subnet': subnet, 'network': name}
- raise InvalidParameter("KCHNET0003E", msg_args)
-
- network.info['dhcp'] = {
- 'start': str(net.network + net.numhosts / 2),
- 'stop': str(net.network + net.numhosts - 2)}
-
- self._mock_networks[name] = network
- return name
-
- def _get_network(self, name):
- try:
- return self._mock_networks[name]
- except KeyError:
- raise NotFoundError("KCHNET0002E", {'name': name})
-
- def _get_vms_attach_to_a_network(self, network):
- vms = []
- for name, dom in self._mock_vms.iteritems():
- if network in dom.networks:
- vms.append(name)
- return vms
-
- def _is_network_used_by_template(self, network):
- for name, tmpl in self._mock_templates.iteritems():
- if network in tmpl.info['networks']:
- return True
- return False
-
- def _is_network_in_use(self, name):
- # The network "default" is used for Kimchi proposal and should not be
- # deactivate or deleted. Otherwise, we will allow user create
- # inconsistent templates from scratch
- if name == 'default':
- return True
-
- vms = self._get_vms_attach_to_a_network(name)
- return bool(vms) or self._is_network_used_by_template(name)
-
- def network_lookup(self, name):
- network = self._get_network(name)
- network.info['vms'] = self._get_vms_attach_to_a_network(name)
- network.info['in_use'] = self._is_network_in_use(name)
-
- return network.info
-
- def network_activate(self, name):
- self._get_network(name).info['state'] = 'active'
-
- def network_deactivate(self, name):
- if self._is_network_in_use(name):
- raise InvalidOperation("KCHNET0018E", {'name': name})
-
- network = self._get_network(name)
- if not network.info['persistent']:
- self.network_delete(name)
-
- network.info['state'] = 'inactive'
-
- def network_delete(self, name):
- if self._is_network_in_use(name):
- raise InvalidOperation("KCHNET0017E", {'name': name})
-
- # firstly, we should check the network actually exists
- network = self._get_network(name)
- del self._mock_networks[network.name]
-
- def networks_get_list(self):
- return sorted(self._mock_networks.keys())
-
- def vmstorages_create(self, vm_name, params):
- path = params.get('path')
- if path and path.startswith('/') and not os.path.exists(path):
- raise InvalidParameter("KCHVMSTOR0003E", {'value': path})
- if path and params.get('pool'):
- raise InvalidParameter("KCHVMSTOR0017E")
- elif params.get('pool'):
- try:
- self.storagevolume_lookup(params['pool'], params['vol'])
- except Exception as e:
- raise InvalidParameter("KCHVMSTOR0015E", {'error': e})
- dom = self._get_vm(vm_name)
- index = len(dom.storagedevices.keys()) + 1
- params['dev'] = "hd" + string.ascii_lowercase[index]
-
- vmdev = MockVMStorageDevice(params)
- dom.storagedevices[params['dev']] = vmdev
- return params['dev']
-
- def vmstorages_get_list(self, vm_name):
- dom = self._get_vm(vm_name)
- return dom.storagedevices.keys()
-
- def vmstorage_lookup(self, vm_name, dev_name):
- dom = self._get_vm(vm_name)
- if dev_name not in self.vmstorages_get_list(vm_name):
- raise NotFoundError(
- "KCHVMSTOR0007E",
- {'dev_name': dev_name, 'vm_name': vm_name})
- return dom.storagedevices.get(dev_name).info
-
- def vmstorage_delete(self, vm_name, dev_name):
- dom = self._get_vm(vm_name)
- if dev_name not in self.vmstorages_get_list(vm_name):
- raise NotFoundError(
- "KCHVMSTOR0007E",
- {'dev_name': dev_name, 'vm_name': vm_name})
- dom.storagedevices.pop(dev_name)
-
- def vmstorage_update(self, vm_name, dev_name, params):
- try:
- dom = self._get_vm(vm_name)
- dom.storagedevices[dev_name].info.update(params)
- except Exception as e:
- raise OperationFailed("KCHVMSTOR0009E", {'error':
e.message})
- return dev_name
-
- def vmifaces_create(self, vm, params):
- if (params["type"] == "network" and
- params["network"] not in self.networks_get_list()):
- msg_args = {'network': params["network"], 'name':
vm}
- raise InvalidParameter("KCHVMIF0002E", msg_args)
-
- dom = self._get_vm(vm)
- iface = MockVMIface(params["network"])
- ("model" in params.keys() and
- iface.info.update({"model": params["model"]}))
-
- mac = iface.info['mac']
- dom.ifaces[mac] = iface
- return mac
-
- def vmifaces_get_list(self, vm):
- dom = self._get_vm(vm)
- macs = dom.ifaces.keys()
- return macs
-
- def vmiface_lookup(self, vm, mac):
- dom = self._get_vm(vm)
- try:
- info = dom.ifaces[mac].info
- except KeyError:
- raise NotFoundError("KCHVMIF0001E", {'iface': mac,
'name': vm})
- return info
+ def _mock_storagevolume_lookup(self, pool, vol):
+ pool_info = self.storagepool_lookup(pool)
+ if pool_info['type'] == 'scsi':
+ return self._mock_storagevolumes.scsi_volumes[vol]
- def vmiface_delete(self, vm, mac):
- dom = self._get_vm(vm)
- try:
- del dom.ifaces[mac]
- except KeyError:
- raise NotFoundError("KCHVMIF0001E", {'iface': mac,
'name': vm})
+ return self._model_storagevolume_lookup(pool, vol)
- def vmiface_update(self, vm, mac, params):
- dom = self._get_vm(vm)
- try:
- info = dom.ifaces[mac].info
- except KeyError:
- raise NotFoundError("KCHVMIF0001E", {'iface': mac,
'name': vm})
- if info['type'] == 'network' and 'network' in params:
- info['network'] = params['network']
- if 'model' in params:
- info['model'] = params['model']
- return mac
-
- def tasks_get_list(self):
- with self.objstore as session:
- return session.get_list('task')
-
- def task_lookup(self, id):
- with self.objstore as session:
- return session.get('task', str(id))
-
- def add_task(self, target_uri, fn, opaque=None):
- id = self.next_taskid
- self.next_taskid = self.next_taskid + 1
- AsyncTask(id, target_uri, fn, self.objstore, opaque)
-
- return id
-
- def _get_storagevolume(self, pool, name):
- try:
- return self._get_storagepool(pool)._volumes[name]
- except KeyError:
- raise NotFoundError("KCHVOL0002E", {'name': name,
'pool': pool})
+ def _mock_interfaces_get_list(self):
+ return self._mock_interfaces.ifaces.keys()
- def _get_distros(self):
- distroloader = DistroLoader()
- return distroloader.get()
+ def _mock_interface_lookup(self, name):
+ return self._mock_interfaces.ifaces[name]
- def distros_get_list(self):
- return self.distros.keys()
+ def _mock_devices_get_list(self, _cap=None, _passthrough=None,
+ _passthrough_affected_by=None):
+ if _cap is None:
+ return self._mock_devices.devices.keys()
+ return [dev['name'] for dev in self._mock_devices.devices.values()
+ if dev['device_type'] == _cap]
- def distro_lookup(self, name):
- try:
- return self.distros[name]
- except KeyError:
- raise NotFoundError("KCHDISTRO0001E", {'name': name})
+ def _mock_device_lookup(self, dev_name):
+ return self._mock_devices.devices[dev_name]
- def _gen_debugreport_file(self, ident):
- return self.add_task('/debugreports/%s' % ident, self._create_log,
- ident)
+ def _mock_packagesupdate_get_list(self):
+ return self._mock_swupdate.pkgs.keys()
- def _create_log(self, cb, name):
- path = config.get_debugreports_path()
- tmpf = os.path.join(path, name + '.tmp')
- realf = os.path.join(path, name + '.txt')
- length = random.randint(1000, 10000)
- with open(tmpf, 'w') as fd:
- while length:
- fd.write('I am logged')
- length = length - 1
- os.rename(tmpf, realf)
- cb("OK", True)
+ def _mock_packageupdate_lookup(self, pkg_name):
+ return self._mock_swupdate.pkgs[pkg_name]
- def host_lookup(self, *name):
- res = {}
- res['memory'] = 6114058240
- res['cpu_model'] = 'Intel(R) Core(TM) i5 CPU M 560 @
2.67GHz'
- res['cpus'] = 4
- res['os_distro'] = 'Red Hat Enterprise Linux Server'
- res['os_version'] = '6.4'
- res['os_codename'] = 'Santiago'
-
- return res
-
- def hoststats_lookup(self, *name):
- virt_mem = psutil.virtual_memory()
- memory_stats = {'total': virt_mem.total,
- 'free': virt_mem.free,
- 'cached': virt_mem.cached,
- 'buffers': virt_mem.buffers,
- 'avail': virt_mem.available}
- return {'cpu_utilization': round(random.uniform(0, 100), 1),
- 'memory': memory_stats,
- 'disk_read_rate': round(random.uniform(0, 4000), 1),
- 'disk_write_rate': round(random.uniform(0, 4000), 1),
- 'net_recv_rate': round(random.uniform(0, 4000), 1),
- 'net_sent_rate': round(random.uniform(0, 4000), 1)}
-
- def hoststatshistory_lookup(self, *name):
- return {'cpu_utilization': random.sample(range(100), 30),
- 'memory': random.sample(range(4000), 30),
- 'disk_read_rate': random.sample(range(4000), 30),
- 'disk_write_rate': random.sample(range(4000), 30),
- 'net_recv_rate': random.sample(range(4000), 30),
- 'net_sent_rate': random.sample(range(4000), 30)}
-
- def users_get_list(self):
- return ["userA", "userB", "userC",
"admin"]
-
- def groups_get_list(self):
- return ["groupA", "groupB", "groupC",
"groupD"]
-
- def peers_get_list(self):
- if kconfig.get("server", "federation") == "off":
- return []
-
- return ["https://serverA:8001", "https://serverB:8001"]
-
- def vms_get_list_by_state(self, state):
- ret_list = []
- for name in self.vms_get_list():
- if (self._mock_vms[name].info['state']) == state:
- ret_list.append(name)
- return ret_list
-
- def host_shutdown(self, args=None):
- # Check for running vms before shutdown
- running_vms = self.vms_get_list_by_state('running')
- if len(running_vms) > 0:
- raise OperationFailed("KCHHOST0001E")
- cherrypy.engine.exit()
-
- def host_reboot(self, args=None):
- # Find running VMs
- running_vms = self.vms_get_list_by_state('running')
- if len(running_vms) > 0:
- raise OperationFailed("KCHHOST0002E")
- cherrypy.engine.stop()
- time.sleep(10)
- cherrypy.engine.start()
-
- def partitions_get_list(self):
- result = disks.get_partitions_names()
- return result
-
- def partition_lookup(self, name):
- if name not in disks.get_partitions_names():
- raise NotFoundError("KCHPART0001E", {'name': name})
-
- return disks.get_partition_details(name)
-
- def config_lookup(self, name):
- return {'display_proxy_port': kconfig.get('display',
- 'display_proxy_port'),
- 'version': config.get_version()}
-
- def packagesupdate_get_list(self):
- return self._mock_swupdate.getUpdates()
-
- def packageupdate_lookup(self, pkg_name):
- return self._mock_swupdate.getUpdate(pkg_name)
-
- def host_swupdate(self, args=None):
- task_id = self.add_task('/host/swupdate', self._mock_swupdate.doUpdate,
- None)
+ def _mock_host_swupdate(self, args=None):
+ task_id = add_task('/host/swupdate', self._mock_swupdate.doUpdate,
+ self.objstore)
return self.task_lookup(task_id)
- def repositories_get_list(self):
- return self._mock_host_repositories.getRepositories()
+ def _mock_repositories_get_list(self):
+ return self._mock_repositories.repos.keys()
- def repositories_create(self, params):
+ def _mock_repositories_create(self, params):
# Create a repo_id if not given by user. The repo_id will follow
# the format kimchi_repo_<integer>, where integer is the number of
# seconds since the Epoch (January 1st, 1970), in UTC.
@@ -1121,387 +307,62 @@ class MockModel(object):
repo_id = "kimchi_repo_%s" % str(int(time.time() * 1000))
params.update({'repo_id': repo_id})
- if repo_id in self.repositories_get_list():
- raise InvalidOperation("KCHREPOS0022E", {'repo_id':
repo_id})
-
- self._mock_host_repositories.addRepository(params)
- return repo_id
-
- def repository_lookup(self, repo_id):
- return self._mock_host_repositories.getRepository(repo_id)
-
- def repository_delete(self, repo_id):
- return self._mock_host_repositories.removeRepository(repo_id)
-
- def repository_enable(self, repo_id):
- return self._mock_host_repositories.enableRepository(repo_id)
-
- def repository_disable(self, repo_id):
- return self._mock_host_repositories.disableRepository(repo_id)
-
- def repository_update(self, repo_id, params):
- return self._mock_host_repositories.updateRepository(repo_id, params)
-
-
-class MockVMTemplate(VMTemplate):
- def __init__(self, args, mockmodel_inst=None):
- VMTemplate.__init__(self, args)
- self.model = mockmodel_inst
-
- def _get_all_networks_name(self):
- return self.model.networks_get_list()
-
- def _get_all_storagepools_name(self):
- return self.model.storagepools_get_list()
-
- def _storage_validate(self):
- pool_uri = self.info['storagepool']
- pool_name = pool_name_from_uri(pool_uri)
- try:
- pool = self.model._get_storagepool(pool_name)
- except NotFoundError:
- msg_args = {'pool': pool_name, 'template': self.name}
- raise InvalidParameter("KCHTMPL0004E", msg_args)
-
- if pool.info['state'] != 'active':
- msg_args = {'pool': pool_name, 'template': self.name}
- raise InvalidParameter("KCHTMPL0005E", msg_args)
-
- return pool
-
- def _get_storage_path(self):
- pool = self._storage_validate()
- return pool.info['path']
-
- def _get_volume_path(self, pool, vol):
- return self.model.storagevolume_lookup(pool, vol)['path']
-
- def fork_vm_storage(self, vm_name):
- pool = self._storage_validate()
- volumes = self.to_volume_list(vm_name)
- disk_paths = []
- for vol_info in volumes:
- vol_info['capacity'] = vol_info['capacity'] << 10
- vol_info['ref_cnt'] = 1
- if 'base' in self.info:
- vol_info['base'] = copy.deepcopy(self.info['base'])
- self.model.storagevolumes_create(pool.name, vol_info)
- disk_paths.append({'pool': pool.name, 'volume':
vol_info['name']})
- return disk_paths
-
-
-class MockVMStorageDevice(object):
- def __init__(self, params):
- self.info = {'dev': params.get('dev'),
- 'type': params.get('type'),
- 'pool': params.get('pool'),
- 'vol': params.get('vol'),
- 'path': params.get('path')}
-
-
-class MockVMIface(object):
- counter = 0
-
- def __init__(self, network=None):
- self.__class__.counter += 1
- self.info = {'type': 'network',
- 'model': 'virtio',
- 'network': network if network
- else "net-%s" % self.counter,
- 'mac': self.get_mac()
- }
-
- @classmethod
- def get_mac(cls):
- mac = ":".join(["52", "54"] +
- ["%02x" % (cls.counter / (256 ** i) % 256)
- for i in range(3, -1, -1)])
- return mac
-
-
-class MockVM(object):
- def __init__(self, uuid, name, template_info):
- self.uuid = uuid
- self.name = name
- self.memory = template_info['memory']
- self.cpus = template_info['cpus']
- self.disk_paths = []
- self.networks = template_info['networks']
- ifaces = [MockVMIface(net) for net in self.networks]
- self.storagedevices = {}
- self.ifaces = dict([(iface.info['mac'], iface) for iface in ifaces])
-
- stats = {'cpu_utilization': 20,
- 'net_throughput': 35,
- 'net_throughput_peak': 100,
- 'io_throughput': 45,
- 'io_throughput_peak': 100}
- self.info = {'name': self.name,
- 'state': 'shutoff',
- 'stats': stats,
- 'uuid': self.uuid,
- 'memory': self.memory,
- 'cpus': self.cpus,
- 'icon': None,
- 'graphics': {'type': 'vnc',
'listen': '127.0.0.1',
- 'port': None, 'passwd':
'123456',
- 'passwdValidTo': None},
- 'users': ['user1', 'user2',
'root'],
- 'groups': ['group1', 'group2',
'admin'],
- 'access': 'full'
- }
- self.info['graphics'].update(template_info['graphics'])
-
-
-class MockStoragePool(object):
- def __init__(self, name):
- self.name = name
- self.info = {'state': 'inactive',
- 'capacity': 1024 << 20,
- 'allocated': 512 << 20,
- 'available': 512 << 20,
- 'path': '/var/lib/libvirt/images',
- 'source': {},
- 'type': 'dir',
- 'nr_volumes': 0,
- 'autostart': 0,
- 'persistent': True}
- self._volumes = {}
-
- def refresh(self):
- state = self.info['state']
- self.info['nr_volumes'] = len(self._volumes) \
- if state == 'active' else 0
-
-
-class Interface(object):
- def __init__(self, name):
- self.name = name
- self.info = {'type': 'nic',
- 'ipaddr': '192.168.0.101',
- 'netmask': '255.255.255.0',
- 'status': 'active'}
-
-
-class MockNetwork(object):
- def __init__(self, name):
- self.name = name
- self.info = {'state': 'inactive',
- 'autostart': True,
- 'connection': 'nat',
- 'interface': 'virbr0',
- 'subnet': '192.168.122.0/24',
- 'dhcp': {'start': '192.168.122.128',
- 'stop': '192.168.122.254'},
- 'persistent': True
- }
-
-
-class MockTask(object):
- def __init__(self, id):
- self.id = id
-
-
-class MockStorageVolume(object):
- def __init__(self, pool, name, params={}):
- self.name = name
- self.pool = pool
- # Check if volume should be scsi lun
- if params.get('type') == 'lun':
- params = self._def_lun(name)
- fmt = params.get('format', 'raw')
- capacity = params.get('capacity', 1024)
- self.info = {'type': params.get('type', 'disk'),
- 'capacity': capacity << 20,
- 'allocation': params.get('allocation',
'512'),
- 'path': params.get('path'),
- 'ref_cnt': params.get('ref_cnt'),
- 'format': fmt}
- if fmt == 'iso':
- self.info['allocation'] = self.info['capacity']
- self.info['os_version'] = '17'
- self.info['os_distro'] = 'fedora'
- self.info['bootable'] = True
-
- def _def_lun(self, name):
- capacity = int(random.uniform(100, 300)) << 20
- path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20999980e52e4492-lun"
- return {
- "capacity": capacity,
- "name": name,
- "format": random.choice(['dos', 'unknown']),
- "allocation": capacity,
- "path": path + name[-1],
- "type": "block"}
-
-
-class MockVMScreenshot(VMScreenshot):
- OUTDATED_SECS = 5
- BACKGROUND_COLOR = ['blue', 'green', 'purple', 'red',
'yellow']
- BOX_COORD = (50, 115, 206, 141)
- BAR_COORD = (50, 115, 50, 141)
-
- def __init__(self, vm_name):
- VMScreenshot.__init__(self, vm_name)
- self.coord = MockVMScreenshot.BAR_COORD
- self.background = random.choice(MockVMScreenshot.BACKGROUND_COLOR)
-
- def _generate_scratch(self, thumbnail):
- self.coord = (self.coord[0],
- self.coord[1],
- min(MockVMScreenshot.BOX_COORD[2],
- self.coord[2] + random.randrange(50)),
- self.coord[3])
-
- image = Image.new("RGB", (256, 256), self.background)
- d = ImageDraw.Draw(image)
- d.rectangle(MockVMScreenshot.BOX_COORD, outline='black')
- d.rectangle(self.coord, outline='black', fill='black')
- image.save(thumbnail)
-
-
-class MockSoftwareUpdate(object):
- def __init__(self):
- self._packages = {
- 'udevmountd': {'repository': 'openSUSE-13.1-Update',
- 'version': '0.81.5-14.1',
- 'arch': 'x86_64',
- 'package_name': 'udevmountd'},
- 'sysconfig-network': {'repository':
'openSUSE-13.1-Extras',
- 'version': '0.81.5-14.1',
- 'arch': 'x86_64',
- 'package_name': 'sysconfig-network'},
- 'libzypp': {'repository': 'openSUSE-13.1-Update',
- 'version': '13.9.0-10.1',
- 'arch': 'noarch',
- 'package_name': 'libzypp'}}
- self._num2update = 3
-
- def getUpdates(self):
- return self._packages.keys()
-
- def getUpdate(self, name):
- if name not in self._packages.keys():
- raise NotFoundError('KCHPKGUPD0002E', {'name': name})
- return self._packages[name]
-
- def getNumOfUpdates(self):
- return self._num2update
-
- def doUpdate(self, cb, params):
- msgs = []
- for pkg in self._packages.keys():
- msgs.append("Updating package %s" % pkg)
- cb('\n'.join(msgs))
- time.sleep(1)
-
- time.sleep(2)
- msgs.append("All packages updated")
- cb('\n'.join(msgs), True)
-
- # After updating all packages any package should be listed to be
- # updated, so reset self._packages
- self._packages = {}
-
-
-class MockRepositories(object):
- def __init__(self):
- self._repos = {"kimchi_repo_1392167832":
- {"repo_id": "kimchi_repo_1392167832",
- "enabled": True,
- "baseurl": "http://www.fedora.org",
- "config": {"repo_name":
"kimchi_repo_1392167832",
- "gpgkey": [],
- "gpgcheck": True,
- "mirrorlist": ""}
- }
- }
-
- def addRepository(self, params):
- # Create and enable the repository
- repo_id = params['repo_id']
config = params.get('config', {})
- baseurl = params.get('baseurl')
- mirrorlist = config.get('mirrorlist', "")
-
- if baseurl:
- validate_repo_url(baseurl)
-
- if mirrorlist:
- validate_repo_url(mirrorlist)
-
- repo = {'repo_id': repo_id,
- 'baseurl': baseurl,
+ info = {'repo_id': repo_id,
+ 'baseurl': params['baseurl'],
'enabled': True,
'config': {'repo_name': config.get('repo_name',
repo_id),
'gpgkey': config.get('gpgkey', []),
'gpgcheck': True,
- 'mirrorlist': mirrorlist}
- }
-
- self._repos[repo_id] = repo
+ 'mirrorlist': params.get('mirrorlist',
'')}}
+ self._mock_repositories.repos[repo_id] = info
return repo_id
- def getRepositories(self):
- return self._repos.keys()
-
- def getRepository(self, repo_id):
- if repo_id not in self._repos.keys():
- raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
-
- return self._repos[repo_id]
-
- def enableRepository(self, repo_id):
- if repo_id not in self._repos.keys():
- raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
-
- info = self._repos[repo_id]
- # Check if repo_id is already enabled
- if info['enabled']:
- raise NotFoundError("KCHREPOS0015E", {'repo_id': repo_id})
+ def _mock_repository_lookup(self, repo_id):
+ return self._mock_repositories.repos[repo_id]
- info['enabled'] = True
- self._repos[repo_id] = info
- return repo_id
+ def _mock_repository_delete(self, repo_id):
+ del self._mock_repositories.repos[repo_id]
- def disableRepository(self, repo_id):
- if repo_id not in self._repos.keys():
- raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
+ def _mock_repository_enable(self, repo_id):
+ self._mock_repositories.repos[repo_id]['enabled'] = True
- info = self._repos[repo_id]
- # Check if repo_id is already disabled
- if not info['enabled']:
- raise NotFoundError("KCHREPOS0016E", {'repo_id': repo_id})
+ def _mock_repository_disable(self, repo_id):
+ self._mock_repositories.repos[repo_id]['enabled'] = False
- info['enabled'] = False
- self._repos[repo_id] = info
+ def _mock_repository_update(self, repo_id, params):
+ self._mock_repositories.repos[repo_id].update(params)
return repo_id
- def updateRepository(self, repo_id, params):
- if repo_id not in self._repos.keys():
- raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
-
- baseurl = params.get('baseurl', None)
- config = params.get('config', {})
- mirrorlist = config.get('mirrorlist', None)
-
- if baseurl:
- validate_repo_url(baseurl)
-
- if mirrorlist:
- validate_repo_url(mirrorlist)
-
- info = self._repos[repo_id]
- info.update(params)
- del self._repos[repo_id]
- self._repos[info['repo_id']] = info
- return info['repo_id']
- def removeRepository(self, repo_id):
- if repo_id not in self._repos.keys():
- raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
-
- del self._repos[repo_id]
+class MockStorageVolumes(object):
+ def __init__(self):
+ base_path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20-lun"
+ self.scsi_volumes = {'unit:0:0:1': {'capacity': 1024,
+ 'format': 'unknown',
+ 'allocation': 512,
+ 'type': 'block',
+ 'path': base_path + '1',
+ 'ref_cnt': 0},
+ 'unit:0:0:2': {'capacity': 2048,
+ 'format': 'unknown',
+ 'allocation': 512,
+ 'type': 'block',
+ 'path': base_path + '2',
+ 'ref_cnt': 0}}
+
+
+class MockInterfaces(object):
+ def __init__(self):
+ self.ifaces = {}
+ params = {"eth1": "nic", "bond0":
"bonding",
+ "eth1.10": "vlan", "bridge0":
"bridge"}
+ for i, name in enumerate(params.iterkeys()):
+ info = {'name': name, 'type': params[name],
+ 'ipaddr': '192.168.%s.101' % (i + 1),
+ 'netmask': '255.255.255.0', 'status':
'active'}
+ self.ifaces[name] = info
+ self.ifaces['eth1']['ipaddr'] = '192.168.0.101'
class MockDevices(object):
@@ -1581,52 +442,46 @@ class MockDevices(object):
'path':
'/sys/devices/pci0000:00/0000:40:00.0/2'}}
-def get_mock_environment():
- model = MockModel()
- for i in xrange(5):
- name = 'test-template-%i' % i
- params = {'name': name, 'cdrom': '/file.iso'}
- t = MockVMTemplate(params, model)
- model._mock_templates[name] = t
-
- for name in ('test-template-1', 'test-template-3'):
- model._mock_templates[name].info.update({'folder': ['rhel',
'6']})
-
- for i in xrange(10):
- name = u'test-vm-%i' % i
- vm_uuid = str(uuid.uuid4())
- vm = MockVM(vm_uuid, name, model.template_lookup('test-template-0'))
- model._mock_vms[name] = vm
-
- # mock storagepool
- for i in xrange(5):
- name = 'default-pool-%i' % i
- defaultstoragepool = MockStoragePool(name)
- defaultstoragepool.info['path'] += '/%i' % i
- model._mock_storagepools[name] = defaultstoragepool
- for j in xrange(5):
- vol_name = 'volume-%i' % j
- defaultstoragevolume = MockStorageVolume(name, vol_name)
- defaultstoragevolume.info['path'] = '%s/%s' % (
- defaultstoragepool.info['path'], vol_name)
- mockpool = model._mock_storagepools[name]
- mockpool._volumes[vol_name] = defaultstoragevolume
- vol_name = 'Fedora17.iso'
- defaultstoragevolume = MockStorageVolume(name, vol_name,
- {'format': 'iso'})
- defaultstoragevolume.info['path'] = '%s/%s' % (
- defaultstoragepool.info['path'], vol_name)
- mockpool = model._mock_storagepools[name]
- mockpool._volumes[vol_name] = defaultstoragevolume
-
- # mock network
- for i in xrange(5):
- name = 'test-network-%i' % i
- testnetwork = MockNetwork(name)
- testnetwork.info['interface'] = 'virbr%i' % (i + 1)
- testnetwork.info['subnet'] = '192.168.%s.0/24' % (i + 1)
- testnetwork.info['dhcp']['start'] = '192.168.%s.128' % (i
+ 1)
- testnetwork.info['dhcp']['end'] = '192.168.%s.254' % (i +
1)
- model._mock_networks[name] = testnetwork
-
- return model
+class MockSoftwareUpdate(object):
+ def __init__(self):
+ self.pkgs = {
+ 'udevmountd': {'repository': 'openSUSE-13.1-Update',
+ 'version': '0.81.5-14.1',
+ 'arch': 'x86_64',
+ 'package_name': 'udevmountd'},
+ 'sysconfig-network': {'repository':
'openSUSE-13.1-Extras',
+ 'version': '0.81.5-14.1',
+ 'arch': 'x86_64',
+ 'package_name': 'sysconfig-network'},
+ 'libzypp': {'repository': 'openSUSE-13.1-Update',
+ 'version': '13.9.0-10.1',
+ 'arch': 'noarch',
+ 'package_name': 'libzypp'}}
+ self._num2update = 3
+
+ def doUpdate(self, cb, params):
+ msgs = []
+ for pkg in self.pkgs.keys():
+ msgs.append("Updating package %s" % pkg)
+ cb('\n'.join(msgs))
+ time.sleep(1)
+
+ time.sleep(2)
+ msgs.append("All packages updated")
+ cb('\n'.join(msgs), True)
+
+ # After updating all packages any package should be listed to be
+ # updated, so reset self._packages
+ self.pkgs = {}
+
+
+class MockRepositories(object):
+ def __init__(self):
+ self.repos = {"kimchi_repo_1392167832":
+ {"repo_id": "kimchi_repo_1392167832",
+ "enabled": True,
+ "baseurl": "http://www.fedora.org",
+ "config": {"repo_name":
"kimchi_repo_1392167832",
+ "gpgkey": [],
+ "gpgcheck": True,
+ "mirrorlist": ""}}}
diff --git a/src/kimchi/vmtemplate.py b/src/kimchi/vmtemplate.py
index 5dbbdd4..4202ae6 100644
--- a/src/kimchi/vmtemplate.py
+++ b/src/kimchi/vmtemplate.py
@@ -26,9 +26,9 @@ import uuid
from lxml import etree
from lxml.builder import E
+from kimchi import imageinfo
from kimchi import osinfo
from kimchi.exception import InvalidParameter, IsoFormatError, MissingParameter
-from kimchi.imageinfo import probe_image, probe_img_info
from kimchi.isoinfo import IsoImage
from kimchi.utils import check_url_path, pool_name_from_uri
from kimchi.xmlutils.disk import get_disk_xml
@@ -90,10 +90,11 @@ class VMTemplate(object):
if 'base' in d.keys():
base_imgs.append(d)
if scan:
- distro, version = probe_image(d['base'])
+ distro, version = imageinfo.probe_image(d['base'])
if 'size' not in d.keys():
- d['size'] =
probe_img_info(d['base'])['virtual-size']
+ d_info = imageinfo.probe_img_info(d['base'])
+ d['size'] = d_info['virtual-size']
if len(base_imgs) == 0:
raise MissingParameter("KCHTMPL0016E")
@@ -201,7 +202,7 @@ class VMTemplate(object):
if 'base' in d:
info['base'] = dict()
- base_fmt = probe_img_info(d['base'])['format']
+ base_fmt = imageinfo.probe_img_info(d['base'])['format']
if base_fmt is None:
raise InvalidParameter("KCHTMPL0024E", {'path':
d['base']})
info['base']['path'] = d['base']
diff --git a/tests/run_tests.sh.in b/tests/run_tests.sh.in
index a097761..6ba486f 100644
--- a/tests/run_tests.sh.in
+++ b/tests/run_tests.sh.in
@@ -24,11 +24,23 @@ PYTHON_VER=@PYTHON_VERSION@
if [ $# -ne 0 ]; then
ARGS="$@"
else
- ARGS="discover"
+ ARGS=`find -name "test_*.py" | xargs -I @ basename @ .py`
fi
if [ "$HAVE_UNITTEST" != "yes" -o "$PYTHON_VER" ==
"2.6" ]; then
- PYTHONPATH=../src:./:../ unit2 $ARGS
+ CMD="unit2"
else
- PYTHONPATH=../src:../ python -m unittest $ARGS
+ CMD="python -m unittest"
fi
+
+SORTED_LIST=($ARGS)
+for ((i=0;i<${#SORTED_LIST[@]};i++)); do
+
+ if [[ ${SORTED_LIST[$i]} == test_model* ]]; then
+ FIRST=${SORTED_LIST[$i]}
+ SORTED_LIST[$i]=${SORTED_LIST[0]}
+ SORTED_LIST[0]=$FIRST
+ fi
+done
+
+PYTHONPATH=../src:../ $CMD ${SORTED_LIST[@]}
diff --git a/tests/test_authorization.py b/tests/test_authorization.py
index 71b416f..6e463d4 100644
--- a/tests/test_authorization.py
+++ b/tests/test_authorization.py
@@ -21,11 +21,10 @@ import json
import os
import unittest
-
from functools import partial
-
import kimchi.mockmodel
+from iso_gen import construct_fake_iso
from utils import get_free_port, patch_auth, request
from utils import run_server
@@ -35,6 +34,7 @@ model = None
host = None
port = None
ssl_port = None
+fake_iso = '/tmp/fake.iso'
def setUpModule():
@@ -47,10 +47,14 @@ def setUpModule():
ssl_port = get_free_port('https')
test_server = run_server(host, port, ssl_port, test_mode=True, model=model)
+ # Create fake ISO to do the tests
+ construct_fake_iso(fake_iso, True, '12.04', 'ubuntu')
+
def tearDownModule():
test_server.stop()
os.unlink('/tmp/obj-store-test')
+ os.unlink(fake_iso)
class AuthorizationTests(unittest.TestCase):
@@ -103,7 +107,7 @@ class AuthorizationTests(unittest.TestCase):
# but he can get and create a new one
resp = self.request('/templates', '{}', 'GET')
self.assertEquals(403, resp.status)
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(403, resp.status)
resp = self.request('/templates/test', '{}', 'PUT')
@@ -112,7 +116,7 @@ class AuthorizationTests(unittest.TestCase):
self.assertEquals(403, resp.status)
# Non-root users can only get vms authorized to them
- model.templates_create({'name': u'test', 'cdrom':
'/nonexistent.iso'})
+ model.templates_create({'name': u'test', 'cdrom':
fake_iso})
model.vms_create({'name': u'test-me', 'template':
'/templates/test'})
model.vm_update(u'test-me',
@@ -121,11 +125,11 @@ class AuthorizationTests(unittest.TestCase):
model.vms_create({'name': u'test-usera',
'template': '/templates/test'})
- model.vm_update(u'test-usera', {'users': ['userA'],
'groups': []})
+ model.vm_update(u'test-usera', {'users': ['root'],
'groups': []})
model.vms_create({'name': u'test-groupa',
'template': '/templates/test'})
- model.vm_update(u'test-groupa', {'groups': ['groupA']})
+ model.vm_update(u'test-groupa', {'groups': ['wheel']})
resp = self.request('/vms', '{}', 'GET')
self.assertEquals(200, resp.status)
@@ -136,11 +140,12 @@ class AuthorizationTests(unittest.TestCase):
self.assertEquals(403, resp.status)
# Create a vm using mockmodel directly to test Resource access
- model.vms_create({'name': 'test', 'template':
'/templates/test'})
+ model.vms_create({'name': 'kimchi-test',
+ 'template': '/templates/test'})
- resp = self.request('/vms/test', '{}', 'PUT')
+ resp = self.request('/vms/kimchi-test', '{}', 'PUT')
self.assertEquals(403, resp.status)
- resp = self.request('/vms/test', '{}', 'DELETE')
+ resp = self.request('/vms/kimchi-test', '{}', 'DELETE')
self.assertEquals(403, resp.status)
# Non-root users can only update VMs authorized by them
@@ -150,4 +155,4 @@ class AuthorizationTests(unittest.TestCase):
self.assertEquals(403, resp.status)
model.template_delete('test')
- model.vm_delete('test')
+ model.vm_delete('test-me')
diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
index 4276832..29354aa 100644
--- a/tests/test_mockmodel.py
+++ b/tests/test_mockmodel.py
@@ -38,24 +38,29 @@ ssl_port = None
fake_iso = None
+def setUpModule():
+ global host, port, ssl_port, model, test_server, fake_iso
+ cherrypy.request.headers = {'Accept': 'application/json'}
+ model = kimchi.mockmodel.MockModel('/tmp/obj-store-test')
+ patch_auth()
+ port = get_free_port('http')
+ ssl_port = get_free_port('https')
+ host = '127.0.0.1'
+ test_server = run_server(host, port, ssl_port, test_mode=True,
+ model=model)
+ fake_iso = '/tmp/fake.iso'
+ open(fake_iso, 'w').close()
+
+
+def tearDown():
+ test_server.stop()
+ os.unlink('/tmp/obj-store-test')
+ os.unlink(fake_iso)
+
+
class MockModelTests(unittest.TestCase):
def setUp(self):
- global host, port, ssl_port, model, test_server, fake_iso
- cherrypy.request.headers = {'Accept': 'application/json'}
- model = kimchi.mockmodel.MockModel('/tmp/obj-store-test')
- patch_auth()
- port = get_free_port('http')
- ssl_port = get_free_port('https')
- host = '127.0.0.1'
- test_server = run_server(host, port, ssl_port, test_mode=True,
- model=model)
- fake_iso = '/tmp/fake.iso'
- open(fake_iso, 'w').close()
-
- def tearDown(self):
- test_server.stop()
- os.unlink('/tmp/obj-store-test')
- os.unlink(fake_iso)
+ model.reset()
def test_collection(self):
c = Collection(model)
@@ -190,38 +195,37 @@ class MockModelTests(unittest.TestCase):
request(host, ssl_port, '/templates', req, 'POST')
def add_vm(name):
-
# Create a VM
req = json.dumps({'name': name, 'template':
'/templates/test'})
request(host, ssl_port, '/vms', req, 'POST')
- add_vm('bca')
- add_vm('xba')
- add_vm('abc')
- add_vm('cab')
+ vms = [u'abc', u'bca', u'cab', u'xba']
+ for vm in vms:
+ add_vm(vm)
- self.assertEqual(model.vms_get_list(), ['abc', 'bca',
'cab', 'xba'])
+ vms.append(u'test')
+ self.assertEqual(model.vms_get_list(), sorted(vms))
def test_vm_info(self):
model.templates_create({'name': u'test',
'cdrom': fake_iso})
- model.vms_create({'name': u'test', 'template':
'/templates/test'})
+ model.vms_create({'name': u'test-vm', 'template':
'/templates/test'})
vms = model.vms_get_list()
- self.assertEquals(1, len(vms))
- self.assertEquals(u'test', vms[0])
+ self.assertEquals(2, len(vms))
+ self.assertIn(u'test-vm', vms)
keys = set(('name', 'state', 'stats', 'uuid',
'memory', 'cpus',
'screenshot', 'icon', 'graphics',
'users', 'groups',
- 'access'))
+ 'access', 'persistent'))
stats_keys = set(('cpu_utilization',
'net_throughput', 'net_throughput_peak',
'io_throughput', 'io_throughput_peak'))
- info = model.vm_lookup(u'test')
+ info = model.vm_lookup(u'test-vm')
self.assertEquals(keys, set(info.keys()))
self.assertEquals('shutoff', info['state'])
- self.assertEquals('test', info['name'])
+ self.assertEquals('test-vm', info['name'])
self.assertEquals(1024, info['memory'])
self.assertEquals(1, info['cpus'])
self.assertEquals('images/icon-vm.png', info['icon'])
diff --git a/tests/test_rest.py b/tests/test_rest.py
index 6770647..7f14b50 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -21,23 +21,22 @@
import base64
import json
import os
-import random
import re
import requests
import shutil
import time
import unittest
import urllib2
-
+import urlparse
from functools import partial
-
import iso_gen
import kimchi.mockmodel
import kimchi.server
from kimchi.config import paths
from kimchi.rollbackcontext import RollbackContext
+from kimchi.utils import add_task
from utils import get_free_port, patch_auth, request
from utils import run_server, wait_task
@@ -48,8 +47,7 @@ host = None
port = None
ssl_port = None
cherrypy_port = None
-
-# utils.silence_server()
+fake_iso = '/tmp/fake.iso'
def setUpModule():
@@ -64,10 +62,14 @@ def setUpModule():
test_server = run_server(host, port, ssl_port, test_mode=True,
cherrypy_port=cherrypy_port, model=model)
+ # Create fake ISO to do the tests
+ iso_gen.construct_fake_iso(fake_iso, True, '12.04', 'ubuntu')
+
def tearDownModule():
test_server.stop()
os.unlink('/tmp/obj-store-test')
+ os.unlink(fake_iso)
class RestTests(unittest.TestCase):
@@ -172,15 +174,17 @@ class RestTests(unittest.TestCase):
def test_get_vms(self):
vms = json.loads(self.request('/vms').read())
- self.assertEquals(0, len(vms))
+ # test_rest.py uses MockModel() which connects to libvirt URI
+ # test:///default. By default this driver already has one VM created
+ self.assertEquals(1, len(vms))
# Create a template as a base for our VMs
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
- test_users = ['user1', 'user2', 'root']
- test_groups = ['group1', 'group2', 'admin']
+ test_users = ['root']
+ test_groups = ['wheel']
# Now add a couple of VMs to the mock model
for i in xrange(10):
name = 'vm-%i' % i
@@ -190,16 +194,16 @@ class RestTests(unittest.TestCase):
self.assertEquals(201, resp.status)
vms = json.loads(self.request('/vms').read())
- self.assertEquals(10, len(vms))
+ self.assertEquals(11, len(vms))
vm = json.loads(self.request('/vms/vm-1').read())
self.assertEquals('vm-1', vm['name'])
self.assertEquals('shutoff', vm['state'])
- self.assertEquals(test_users, vm['users'])
- self.assertEquals(test_groups, vm['groups'])
+ self.assertEquals([], vm['users'])
+ self.assertEquals([], vm['groups'])
def test_edit_vm(self):
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -311,7 +315,7 @@ class RestTests(unittest.TestCase):
# Create a Template
req = json.dumps({'name': 'test', 'disks':
[{'size': 1}],
'icon': 'images/icon-debian.png',
- 'cdrom': '/nonexistent.iso'})
+ 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -326,8 +330,8 @@ class RestTests(unittest.TestCase):
self.assertEquals('images/icon-debian.png', vm['icon'])
# Verify the volume was created
- vol_uri = '/storagepools/default/storagevolumes/%s-0.img' %
vm['uuid']
- resp = self.request(vol_uri)
+ vol_uri = '/storagepools/default-pool/storagevolumes/%s-0.img'
+ resp = self.request(vol_uri % vm['uuid'])
vol = json.loads(resp.read())
self.assertEquals(1 << 30, vol['capacity'])
self.assertEquals(1, vol['ref_cnt'])
@@ -391,11 +395,11 @@ class RestTests(unittest.TestCase):
self.assertEquals(204, resp.status)
# Verify the volume was deleted
- self.assertHTTPStatus(404, vol_uri)
+ self.assertHTTPStatus(404, vol_uri % vm['uuid'])
def test_vm_graphics(self):
# Create a Template
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -475,7 +479,7 @@ class RestTests(unittest.TestCase):
with RollbackContext() as rollback:
# Create a template as a base for our VMs
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom':
fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
# Delete the template
@@ -574,8 +578,6 @@ class RestTests(unittest.TestCase):
self.assertEquals(201, resp.status)
cd_info = json.loads(resp.read())
self.assertEquals('disk', cd_info['type'])
- self.assertEquals('tmp', cd_info['pool'])
- self.assertEquals('attach-volume', cd_info['vol'])
# Attach a cdrom with existent dev name
req = json.dumps({'type': 'cdrom',
@@ -592,24 +594,29 @@ class RestTests(unittest.TestCase):
os.remove('/tmp/existent.iso')
# Change path of storage cdrom
- req = json.dumps({'path': 'http://myserver.com/myiso.iso'})
- resp = self.request('/vms/test-vm/storages/'+cd_dev, req,
'PUT')
+ cdrom =
u'http://fedora.mirrors.tds.net/pub/fedora/releases/20/'\
+ 'Live/x86_64/Fedora-Live-Desktop-x86_64-20-1.iso'
+ req = json.dumps({'path': cdrom})
+ resp = self.request('/vms/test-vm/storages/' + cd_dev, req,
'PUT')
self.assertEquals(200, resp.status)
cd_info = json.loads(resp.read())
-
self.assertEquals('http://myserver.com/myiso.iso';,
cd_info['path'])
+ self.assertEquals(urlparse.urlparse(cdrom).path,
+ urlparse.urlparse(cd_info['path']).path)
# Test GET
devs = json.loads(self.request('/vms/test-vm/storages').read())
self.assertEquals(4, len(devs))
# Detach storage cdrom
- resp = self.request('/vms/test-vm/storages/'+cd_dev,
+ resp = self.request('/vms/test-vm/storages/' + cd_dev,
'{}', 'DELETE')
self.assertEquals(204, resp.status)
# Test GET
devs = json.loads(self.request('/vms/test-vm/storages').read())
self.assertEquals(3, len(devs))
+ resp = self.request('/storagepools/tmp/deactivate', {},
'POST')
+ self.assertEquals(200, resp.status)
resp = self.request('/storagepools/tmp', {}, 'DELETE')
self.assertEquals(204, resp.status)
@@ -617,7 +624,7 @@ class RestTests(unittest.TestCase):
with RollbackContext() as rollback:
# Create a template as a base for our VMs
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom':
fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
# Delete the template
@@ -651,7 +658,7 @@ class RestTests(unittest.TestCase):
iface['mac']).read())
self.assertEquals('default', res['network'])
self.assertEquals(17, len(res['mac']))
- self.assertEquals('virtio', res['model'])
+ self.assertEquals('e1000', res['model'])
# attach network interface to vm
req = json.dumps({"type": "network",
@@ -667,12 +674,12 @@ class RestTests(unittest.TestCase):
self.assertEquals('network', iface['type'])
# update vm interface
- req = json.dumps({"network": "default",
"model": "e1000"})
+ req = json.dumps({"network": "default",
"model": "virtio"})
resp = self.request('/vms/test-vm/ifaces/%s' % iface['mac'],
req, 'PUT')
self.assertEquals(200, resp.status)
update_iface = json.loads(resp.read())
- self.assertEquals('e1000', update_iface['model'])
+ self.assertEquals(u'virtio', update_iface['model'])
self.assertEquals('default', update_iface['network'])
# detach network interface from vm
@@ -682,7 +689,7 @@ class RestTests(unittest.TestCase):
def test_vm_customise_storage(self):
# Create a Template
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso',
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso,
'disks': [{'size': 1}]})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -707,7 +714,7 @@ class RestTests(unittest.TestCase):
# Test template not changed after vm customise its pool
t = json.loads(self.request('/templates/test').read())
- self.assertEquals(t['storagepool'], '/storagepools/default')
+ self.assertEquals(t['storagepool'],
'/storagepools/default-pool')
# Verify the volume was created
vol_uri = '/storagepools/alt/storagevolumes/%s-0.img' %
vm_info['uuid']
@@ -726,35 +733,34 @@ class RestTests(unittest.TestCase):
# Create scsi fc pool
req = json.dumps({'name': 'scsi_fc_pool',
'type': 'scsi',
- 'source': {'adapter_name':
'scsi_host3'}})
+ 'source': {'adapter_name':
'scsi_host2'}})
resp = self.request('/storagepools', req, 'POST')
self.assertEquals(201, resp.status)
- # Create template with this pool
- req = json.dumps({'name': 'test_fc_pool', 'cdrom':
'/nonexistent.iso',
- 'storagepool': '/storagepools/scsi_fc_pool'})
- resp = self.request('/templates', req, 'POST')
- self.assertEquals(201, resp.status)
-
# Test create vms using lun of this pool
# activate the storage pool
resp = self.request('/storagepools/scsi_fc_pool/activate', '{}',
'POST')
- # Get scsi pool luns and choose one
+ # Create template fails because SCSI volume is missing
+ tmpl_params = {'name': 'test_fc_pool', 'cdrom':
fake_iso,
+ 'storagepool': '/storagepools/scsi_fc_pool'}
+ req = json.dumps(tmpl_params)
+ resp = self.request('/templates', req, 'POST')
+ self.assertEquals(400, resp.status)
+
+ # Choose SCSI volume to create template
resp = self.request('/storagepools/scsi_fc_pool/storagevolumes')
- luns = json.loads(resp.read())
- lun_name = random.choice(luns).get('name')
+ lun_name = json.loads(resp.read())[0]['name']
- # Create vm in scsi pool without volumes: Error
- req = json.dumps({'template': '/templates/test_fc_pool'})
- resp = self.request('/vms', req, 'POST')
- self.assertEquals(400, resp.status)
+ tmpl_params['disks'] = [{'index': 0, 'volume':
lun_name}]
+ req = json.dumps(tmpl_params)
+ resp = self.request('/templates', req, 'POST')
+ self.assertEquals(201, resp.status)
# Create vm in scsi pool
req = json.dumps({'name': 'test-vm',
- 'template': '/templates/test_fc_pool',
- 'volumes': [lun_name]})
+ 'template': '/templates/test_fc_pool'})
resp = self.request('/vms', req, 'POST')
self.assertEquals(201, resp.status)
@@ -773,7 +779,7 @@ class RestTests(unittest.TestCase):
self.assertEquals(204, resp.status)
def test_template_customise_storage(self):
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso',
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso,
'disks': [{'size': 1}]})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -825,7 +831,7 @@ class RestTests(unittest.TestCase):
def test_template_customise_network(self):
with RollbackContext() as rollback:
- tmpl = {'name': 'test', 'cdrom':
'/nonexistent.iso',
+ tmpl = {'name': 'test', 'cdrom': fake_iso,
'disks': [{'size': 1}]}
req = json.dumps(tmpl)
resp = self.request('/templates', req, 'POST')
@@ -883,7 +889,7 @@ class RestTests(unittest.TestCase):
def test_unnamed_vms(self):
# Create a Template
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -893,7 +899,7 @@ class RestTests(unittest.TestCase):
vm = json.loads(self.request('/vms', req, 'POST').read())
self.assertEquals('test-vm-%i' % i, vm['name'])
count = len(json.loads(self.request('/vms').read()))
- self.assertEquals(5, count)
+ self.assertEquals(6, count)
def test_create_vm_without_template(self):
req = json.dumps({'name': 'vm-without-template'})
@@ -912,7 +918,7 @@ class RestTests(unittest.TestCase):
def test_create_vm_with_img_based_template(self):
resp = json.loads(
- self.request('/storagepools/default/storagevolumes').read())
+ self.request('/storagepools/default-pool/storagevolumes').read())
self.assertEquals(0, len(resp))
# Create a Template
@@ -927,14 +933,13 @@ class RestTests(unittest.TestCase):
# Test storage volume created with backing store of base file
resp = json.loads(
- self.request('/storagepools/default/storagevolumes').read())
+ self.request('/storagepools/default-pool/storagevolumes').read())
self.assertEquals(1, len(resp))
- self.assertEquals(mock_base, resp[0]['base']['path'])
def test_get_storagepools(self):
storagepools = json.loads(self.request('/storagepools').read())
self.assertEquals(2, len(storagepools))
- self.assertEquals('default', storagepools[0]['name'])
+ self.assertEquals('default-pool', storagepools[0]['name'])
self.assertEquals('active', storagepools[0]['state'])
self.assertEquals('kimchi_isos', storagepools[1]['name'])
self.assertEquals('kimchi-iso', storagepools[1]['type'])
@@ -1066,12 +1071,13 @@ class RestTests(unittest.TestCase):
self.assertEquals('/var/lib/libvirt/images/volume-1',
storagevolume['path'])
- req = json.dumps({'url': 'https://anyurl.wor.kz'})
+ url = 'https://github.com/kimchi-project/kimchi/blob/master/COPYING'
+ req = json.dumps({'url': url})
resp = self.request('/storagepools/pool-1/storagevolumes', req,
'POST')
self.assertEquals(202, resp.status)
task = json.loads(resp.read())
vol_name = task['target_uri'].split('/')[-1]
- self.assertEquals('anyurl.wor.kz', vol_name)
+ self.assertEquals('COPYING', vol_name)
wait_task(self._task_lookup, task['id'])
task = json.loads(self.request('/tasks/%s' % task['id']).read())
self.assertEquals('finished', task['status'])
@@ -1095,13 +1101,16 @@ class RestTests(unittest.TestCase):
cloned_vol = json.loads(resp.read())
self.assertNotEquals(vol['name'], cloned_vol['name'])
- del vol['name']
- del cloned_vol['name']
self.assertNotEquals(vol['path'], cloned_vol['path'])
- del vol['path']
- del cloned_vol['path']
+ for key in ['name', 'path', 'allocation']:
+ del vol[key]
+ del cloned_vol[key]
+
self.assertEquals(vol, cloned_vol)
+ resp = self.request('/storagepools/pool-1/deactivate', '{}',
'POST')
+ self.assertEquals(200, resp.status)
+
# Now remove the StoragePool from mock model
self._delete_pool('pool-1')
@@ -1138,7 +1147,7 @@ class RestTests(unittest.TestCase):
resp = self.request(uri, req, 'POST')
uri = '/storagepools/pool-2/storagevolumes/test-volume'
storagevolume = json.loads(self.request(uri).read())
- self.assertEquals(768, storagevolume['capacity'])
+ self.assertEquals(768 << 20, storagevolume['capacity'])
# Wipe the storage volume
uri = '/storagepools/pool-2/storagevolumes/test-volume/wipe'
@@ -1152,6 +1161,9 @@ class RestTests(unittest.TestCase):
'{}', 'DELETE')
self.assertEquals(204, resp.status)
+ resp = self.request('/storagepools/pool-2/deactivate', '{}',
'POST')
+ self.assertEquals(200, resp.status)
+
# Now remove the StoragePool from mock model
self._delete_pool('pool-2')
@@ -1198,7 +1210,7 @@ class RestTests(unittest.TestCase):
open('/tmp/mock.img', 'w').close()
t = {'name': 'test_img_template', 'os_distro':
'ImagineOS',
'os_version': '1.0', 'memory': 1024, 'cpus':
1,
- 'storagepool': '/storagepools/alt',
+ 'storagepool': '/storagepools/default-pool',
'disks': [{'base': '/tmp/mock.img'}]}
req = json.dumps(t)
resp = self.request('/templates', req, 'POST')
@@ -1210,8 +1222,8 @@ class RestTests(unittest.TestCase):
graphics = {'type': 'spice', 'listen':
'127.0.0.1'}
t = {'name': 'test', 'os_distro': 'ImagineOS',
'os_version': '1.0', 'memory': 1024, 'cpus':
1,
- 'storagepool': '/storagepools/alt', 'cdrom':
'/tmp/mock.iso',
- 'graphics': graphics}
+ 'storagepool': '/storagepools/default-pool',
+ 'cdrom': '/tmp/mock.iso', 'graphics': graphics}
req = json.dumps(t)
resp = self.request('/templates', req, 'POST')
self.assertEquals(201, resp.status)
@@ -1239,8 +1251,8 @@ class RestTests(unittest.TestCase):
# Create a template with same name fails with 400
t = {'name': 'test', 'os_distro': 'ImagineOS',
'os_version': '1.0', 'memory': 1024, 'cpus':
1,
- 'storagepool': '/storagepools/default',
- 'cdrom': '/nonexistent.iso'}
+ 'storagepool': '/storagepools/default-pool',
+ 'cdrom': fake_iso}
req = json.dumps(t)
resp = self.request('/templates', req, 'POST')
self.assertEquals(400, resp.status)
@@ -1388,15 +1400,15 @@ class RestTests(unittest.TestCase):
'DELETE')
self.assertEquals(204, resp.status)
- # Delete the storagepool
+ # Try to delete the storagepool
+ # It should fail as it is associated to a template
resp = request(host, ssl_port, '/storagepools/test-storagepool',
'{}', 'DELETE')
- self.assertEquals(204, resp.status)
+ self.assertEquals(400, resp.status)
# Verify the template
res = json.loads(self.request('/templates/test').read())
self.assertEquals(res['invalid']['cdrom'], [iso])
- self.assertEquals(res['invalid']['storagepools'],
['test-storagepool'])
# Delete the template
resp = request(host, ssl_port, '/templates/test', '{}',
'DELETE')
@@ -1414,15 +1426,15 @@ class RestTests(unittest.TestCase):
storagevolume = json.loads(self.request(
'/storagepools/kimchi_isos/storagevolumes/').read())[0]
- self.assertEquals('pool-3-fedora.iso', storagevolume['name'])
+ self.assertEquals('fedora.iso', storagevolume['name'])
self.assertEquals('iso', storagevolume['format'])
self.assertEquals('/var/lib/libvirt/images/fedora.iso',
storagevolume['path'])
self.assertEquals(1024 << 20, storagevolume['capacity'])
- self.assertEquals(1024 << 20, storagevolume['allocation'])
- self.assertEquals('17', storagevolume['os_version'])
- self.assertEquals('fedora', storagevolume['os_distro'])
- self.assertEquals(True, storagevolume['bootable'])
+ self.assertEquals(0, storagevolume['allocation'])
+ self.assertEquals('unknown', storagevolume['os_version'])
+ self.assertEquals('unknown', storagevolume['os_distro'])
+ self.assertEquals(False, storagevolume['bootable'])
# Create a template
# In real model os distro/version can be omitted
@@ -1437,8 +1449,8 @@ class RestTests(unittest.TestCase):
# Verify the template
t = json.loads(self.request('/templates/test').read())
self.assertEquals('test', t['name'])
- self.assertEquals('fedora', t['os_distro'])
- self.assertEquals('17', t['os_version'])
+ self.assertEquals('unknown', t['os_distro'])
+ self.assertEquals('unknown', t['os_version'])
self.assertEquals(1024, t['memory'])
# Deactivate or destroy scan pool return 405
@@ -1454,22 +1466,25 @@ class RestTests(unittest.TestCase):
resp = self.request('/templates/%s' % t['name'], '{}',
'DELETE')
self.assertEquals(204, resp.status)
+ resp = self.request('/storagepools/pool-3/deactivate', '{}',
'POST')
+ self.assertEquals(200, resp.status)
self._delete_pool('pool-3')
def test_screenshot_refresh(self):
# Create a VM
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
resp = self.request('/templates', req, 'POST')
req = json.dumps({'name': 'test-vm', 'template':
'/templates/test'})
- self.request('/vms', req, 'POST')
+ resp = self.request('/vms', req, 'POST')
# Test screenshot for shut-off state vm
resp = self.request('/vms/test-vm/screenshot')
self.assertEquals(404, resp.status)
# Test screenshot for running vm
- self.request('/vms/test-vm/start', '{}', 'POST')
+ resp = self.request('/vms/test-vm/start', '{}', 'POST')
vm = json.loads(self.request('/vms/test-vm').read())
+
resp = self.request(vm['screenshot'], method='HEAD')
self.assertEquals(200, resp.status)
self.assertTrue(resp.getheader('Content-type').startswith('image'))
@@ -1589,9 +1604,9 @@ class RestTests(unittest.TestCase):
return json.loads(self.request('/tasks/%s' % taskid).read())
def test_tasks(self):
- id1 = model.add_task('/tasks/1', self._async_op)
- id2 = model.add_task('/tasks/2', self._except_op)
- id3 = model.add_task('/tasks/3', self._intermid_op)
+ id1 = add_task('/tasks/1', self._async_op, model.objstore)
+ id2 = add_task('/tasks/2', self._except_op, model.objstore)
+ id3 = add_task('/tasks/3', self._intermid_op, model.objstore)
target_uri = urllib2.quote('^/tasks/*', safe="")
filter_data = 'status=running&target_uri=%s' % target_uri
@@ -1621,9 +1636,9 @@ class RestTests(unittest.TestCase):
resp = self.request('/config/capabilities').read()
conf = json.loads(resp)
- keys = ['libvirt_stream_protocols', 'qemu_stream',
'qemu_spice',
- 'screenshot', 'system_report_tool',
'update_tool',
- 'repo_mngt_tool', 'federation']
+ keys = [u'libvirt_stream_protocols', u'qemu_stream',
u'qemu_spice',
+ u'screenshot', u'system_report_tool',
u'update_tool',
+ u'repo_mngt_tool', u'federation',
u'kernel_vfio']
self.assertEquals(sorted(keys), sorted(conf.keys()))
def test_peers(self):
@@ -1803,13 +1818,10 @@ class RestTests(unittest.TestCase):
def test_host(self):
resp = self.request('/host').read()
info = json.loads(resp)
- self.assertEquals('Red Hat Enterprise Linux Server',
info['os_distro'])
- self.assertEquals('6.4', info['os_version'])
- self.assertEquals('Santiago', info['os_codename'])
- self.assertEquals('Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz',
- info['cpu_model'])
- self.assertEquals(6114058240, info['memory'])
- self.assertEquals(4, info['cpus'])
+
+ keys = ['os_distro', 'os_version', 'os_codename',
'cpu_model',
+ 'memory', 'cpus']
+ self.assertEquals(sorted(keys), sorted(info.keys()))
def test_hoststats(self):
stats_keys = ['cpu_utilization', 'memory',
'disk_read_rate',
@@ -1856,14 +1868,14 @@ class RestTests(unittest.TestCase):
resp = self.request('/tasks/' + task[u'id'], None,
'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'running')
- time.sleep(6)
+ wait_task(self._task_lookup, task_info['id'])
resp = self.request('/tasks/' + task[u'id'], None,
'GET')
task_info = json.loads(resp.read())
self.assertEquals(task_info['status'], 'finished')
self.assertIn(u'All packages updated', task_info['message'])
def test_get_param(self):
- req = json.dumps({'name': 'test', 'cdrom':
'/nonexistent.iso'})
+ req = json.dumps({'name': 'test', 'cdrom': fake_iso})
self.request('/templates', req, 'POST')
# Create a VM
@@ -1877,13 +1889,15 @@ class RestTests(unittest.TestCase):
resp = request(host, ssl_port, '/vms')
self.assertEquals(200, resp.status)
res = json.loads(resp.read())
- self.assertEquals(2, len(res))
+ self.assertEquals(3, len(res))
+ # FIXME: control/base.py also allows filter by regex so it is returning
+ # 2 vms when querying for 'test-vm1': 'test' and
'test-vm1'
resp = request(host, ssl_port, '/vms?name=test-vm1')
self.assertEquals(200, resp.status)
res = json.loads(resp.read())
- self.assertEquals(1, len(res))
- self.assertEquals('test-vm1', res[0]['name'])
+ self.assertEquals(2, len(res))
+ self.assertIn('test-vm1', [r['name'] for r in res])
def test_repositories(self):
def verify_repo(t, res):
@@ -1897,25 +1911,6 @@ class RestTests(unittest.TestCase):
# Already have one repo in Kimchi's system
self.assertEquals(1, len(json.loads(resp.read())))
- invalid_urls = ['www.fedora.org', # missing protocol
- '://www.fedora.org', # missing protocol
- 'http://www.fedora', # invalid domain name
- 'file:///home/userdoesnotexist'] # invalid path
-
- # Create repositories with invalid baseurl
- for url in invalid_urls:
- repo = {'repo_id': 'fedora-fake', 'baseurl': url}
- req = json.dumps(repo)
- resp = self.request(base_uri, req, 'POST')
- self.assertEquals(400, resp.status)
-
- # Create repositories with invalid mirrorlist
- for url in invalid_urls:
- repo = {'repo_id': 'fedora-fake', 'mirrorlist': url}
- req = json.dumps(repo)
- resp = self.request(base_uri, req, 'POST')
- self.assertEquals(400, resp.status)
-
# Create a repository
repo = {'repo_id': 'fedora-fake',
'baseurl': 'http://www.fedora.org'}
@@ -1927,22 +1922,6 @@ class RestTests(unittest.TestCase):
res = json.loads(self.request('%s/fedora-fake' % base_uri).read())
verify_repo(repo, res)
- # Update repositories with invalid baseurl
- for url in invalid_urls:
- params = {}
- params['baseurl'] = url
- resp = self.request('%s/fedora-fake' % base_uri,
- json.dumps(params), 'PUT')
- self.assertEquals(400, resp.status)
-
- # Update repositories with invalid mirrorlist
- for url in invalid_urls:
- params = {}
- params['mirrorlist'] = url
- resp = self.request('%s/fedora-fake' % base_uri,
- json.dumps(params), 'PUT')
- self.assertEquals(400, resp.status)
-
# Update the repository
params = {}
params['baseurl'] = repo['baseurl'] =
'http://www.fedoraproject.org'
@@ -1969,7 +1948,7 @@ class RestTests(unittest.TestCase):
with RollbackContext() as rollback:
vol_path = os.path.join(paths.get_prefix(), 'COPYING')
- url = "https://%s:%s/storagepools/default/storagevolumes" % \
+ url = "https://%s:%s/storagepools/default-pool/storagevolumes" % \
(host, ssl_port)
with open(vol_path, 'rb') as fd:
@@ -1981,8 +1960,8 @@ class RestTests(unittest.TestCase):
self.assertEquals(r.status_code, 202)
task = r.json()
wait_task(self._task_lookup, task['id'])
- resp = self.request('/storagepools/default/storagevolumes/%s' %
- task['target_uri'].split('/')[-1])
+ uri = '/storagepools/default-pool/storagevolumes/%s'
+ resp = self.request(uri % task['target_uri'].split('/')[-1])
self.assertEquals(200, resp.status)
# Create a file with 3M to upload
@@ -2001,8 +1980,7 @@ class RestTests(unittest.TestCase):
self.assertEquals(r.status_code, 202)
task = r.json()
wait_task(self._task_lookup, task['id'], 15)
- resp = self.request('/storagepools/default/storagevolumes/%s' %
- task['target_uri'].split('/')[-1])
+ resp = self.request(uri % task['target_uri'].split('/')[-1])
self.assertEquals(200, resp.status)
--
1.9.3