[PATCH V3] New MockModel

V2 -> V3: - Fix "make check-local" errors - Fix typo V1 -> V2: - Fix some tests cases - Update volune ref_cnt when VM is deleted Aline Manera (1): MockModel refactor: Create MockModel based on Model("test:///default") src/kimchi/i18n.py | 2 - src/kimchi/mockmodel.py | 1771 ++++++++----------------------------------- src/kimchi/vmtemplate.py | 9 +- tests/run_tests.sh.in | 18 +- tests/test_authorization.py | 27 +- tests/test_mockmodel.py | 60 +- tests/test_model.py | 5 + tests/test_rest.py | 254 +++---- 8 files changed, 503 insertions(+), 1643 deletions(-) -- 1.9.3

The mockmodel was designed to provide a way to user tests Kimchi without affecting the system (by "kimchid --test") and also to make the tests easier to do. But in fact, we have a bunch of code that completely differs from the real model, so the developer needs to do 2 kinds of implementations while developing a new feature. This patch change MockModel to be Model("test:///default") and only overrides what is not supported by the libvirt Test Driver. It also fixes the test cases after that change: only delete a inactive storage pool, the Test Driver already has on VM created on start up, the ISO file must be an existing path, etc. The run_tests.sh.in script was also updated to run the test_model.py prior to the MockModel tests as the MockModel will override some Model methods which can cause problems if the MockModel runs before Model. Signed-off-by: Aline Manera <alinefm@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 2 - src/kimchi/mockmodel.py | 1771 ++++++++----------------------------------- src/kimchi/vmtemplate.py | 9 +- tests/run_tests.sh.in | 18 +- tests/test_authorization.py | 27 +- tests/test_mockmodel.py | 60 +- tests/test_model.py | 5 + tests/test_rest.py | 254 +++---- 8 files changed, 503 insertions(+), 1643 deletions(-) diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index e823f2b..4cec189 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -124,7 +124,6 @@ messages = { "KCHVMIF0008E": _("Specify type and network to update a virtual machine interface"), "KCHTMPL0001E": _("Template %(name)s already exists"), - "KCHTMPL0002E": _("Template %(name)s does not exist"), "KCHTMPL0003E": _("Network '%(network)s' specified for template %(template)s does not exist"), "KCHTMPL0004E": _("Storage pool %(pool)s specified for template %(template)s does not exist"), "KCHTMPL0005E": _("Storage pool %(pool)s specified for template %(template)s is not active"), @@ -192,7 +191,6 @@ messages = { "KCHVOL0002E": _("Storage volume %(name)s does not exist in storage pool %(pool)s"), "KCHVOL0003E": _("Unable to create storage volume %(volume)s because storage pool %(pool)s is not active"), "KCHVOL0004E": _("Specify %(item)s in order to create storage volume %(volume)s"), - "KCHVOL0005E": _("Unable to retrieve storage volume %(volume)s because storage pool %(pool)s is not active"), "KCHVOL0006E": _("Unable to list storage volumes because storage pool %(pool)s is not active"), "KCHVOL0007E": _("Unable to create storage volume %(name)s in storage pool %(pool)s. Details: %(err)s"), "KCHVOL0008E": _("Unable to list storage volumes in storage pool %(pool)s. Details: %(err)s"), diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py index 626ef35..c0c82b8 100644 --- a/src/kimchi/mockmodel.py +++ b/src/kimchi/mockmodel.py @@ -14,1105 +14,291 @@ # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the -# Free Software Foundation, Inc. -# 51 Franklin Street, Fifth Floor, -# Boston, MA 02110-1301 USA - -import cherrypy -import copy -import disks -import glob -import ipaddr +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import libvirt +import lxml.etree as ET import os -import shutil -import psutil import random -import string import time -import uuid - - -try: - from PIL import Image - from PIL import ImageDraw -except ImportError: - import Image - import ImageDraw +from lxml import objectify from kimchi import config -from kimchi.asynctask import AsyncTask -from kimchi.config import READONLY_POOL_TYPE, config as kconfig -from kimchi.distroloader import DistroLoader -from kimchi.exception import InvalidOperation, InvalidParameter -from kimchi.exception import MissingParameter, NotFoundError, OperationFailed -from kimchi.model.storagepools import ISO_POOL_NAME -from kimchi.model.storageservers import STORAGE_SERVERS -from kimchi.model.utils import get_vm_name +from kimchi import imageinfo +from kimchi import osinfo +from kimchi.model.debugreports import DebugReportsModel +from kimchi.model.host import DeviceModel +from kimchi.model.model import Model +from kimchi.model.storagevolumes import StorageVolumesModel +from kimchi.model.templates import LibvirtVMTemplate from kimchi.objectstore import ObjectStore -from kimchi.screenshot import VMScreenshot -from kimchi.utils import get_next_clone_name, pool_name_from_uri -from kimchi.utils import validate_repo_url, template_name_from_uri +from kimchi.utils import add_task from kimchi.vmtemplate import VMTemplate +from kimchi.xmlutils.utils import xml_item_update + +fake_user = {'root': 'letmein!'} +mockmodel_defaults = {'storagepool': '/storagepools/default-pool', + 'domain': 'test', 'arch': 'i686'} -fake_user = {'admin': 'letmein!'} +class MockModel(Model): + _mock_vms = {} + _XMLDesc = libvirt.virDomain.XMLDesc + _defineXML = libvirt.virConnect.defineXML + _undefineDomain = libvirt.virDomain.undefine + _libvirt_get_vol_path = LibvirtVMTemplate._get_volume_path -class MockModel(object): def __init__(self, objstore_loc=None): - self.reset() - self.objstore = ObjectStore(objstore_loc) - self.objstore_loc = objstore_loc - self.distros = self._get_distros() - - def capabilities_lookup(self, *ident): - return {'libvirt_stream_protocols': - ['http', 'https', 'ftp', 'ftps', 'tftp'], - 'qemu_spice': True, - 'qemu_stream': True, - 'screenshot': True, - 'system_report_tool': True, - 'update_tool': True, - 'repo_mngt_tool': 'yum', - 'federation': 'off'} + # Override osinfo.defaults to ajust the values according to + # test:///default driver + defaults = dict(osinfo.defaults) + defaults.update(mockmodel_defaults) + osinfo.defaults = dict(defaults) - def reset(self): - if hasattr(self, 'objstore'): - self.objstore = ObjectStore(self.objstore_loc) - self._mock_vms = {} - self._mock_screenshots = {} - self._mock_templates = {} - self._mock_storagepools = {'default': MockStoragePool('default')} - self._mock_networks = {'default': MockNetwork('default')} - self._mock_interfaces = self.dummy_interfaces() - self._mock_swupdate = MockSoftwareUpdate() - self.next_taskid = 1 - self.storagepool_activate('default') - self._mock_host_repositories = MockRepositories() self._mock_devices = MockDevices() + self._mock_interfaces = MockInterfaces() + self._mock_storagevolumes = MockStorageVolumes() + self._mock_swupdate = MockSoftwareUpdate() + self._mock_repositories = MockRepositories() + + libvirt.virConnect.defineXML = MockModel.domainDefineXML + libvirt.virDomain.XMLDesc = MockModel.domainXMLDesc + libvirt.virDomain.undefine = MockModel.undefineDomain + libvirt.virDomain.attachDeviceFlags = MockModel.attachDeviceFlags + libvirt.virDomain.detachDeviceFlags = MockModel.detachDeviceFlags + libvirt.virDomain.updateDeviceFlags = MockModel.updateDeviceFlags + libvirt.virStorageVol.resize = MockModel.volResize + libvirt.virStorageVol.wipePattern = MockModel.volWipePattern + + super(MockModel, self).__init__('test:///default', objstore_loc) + self.objstore_loc = objstore_loc + self.objstore = ObjectStore(objstore_loc) - def _static_vm_update(self, dom, params): - state = dom.info['state'] - - for key, val in params.items(): - if key == 'name': - if state == 'running' or params['name'] in self.vms_get_list(): - msg_args = {'name': dom.name, 'new_name': params['name']} - raise InvalidParameter("KCHVM0003E", msg_args) - - del self._mock_vms[dom.name] - dom.name = params['name'] - self._mock_vms[dom.name] = dom - - elif key == 'users': - invalid_users = set(val) - set(self.users_get_list()) - if len(invalid_users) != 0: - raise InvalidParameter("KCHVM0027E", - {'users': ", ".join(invalid_users)}) + # The MockModel methods are instantiated on runtime according to Model + # and BaseModel + # Because that a normal method override will not work here + # Instead of that we also need to do the override on runtime + for method in dir(self): + if method.startswith('_mock_'): + mock_method = getattr(self, method) + if not callable(mock_method): + continue + + m = method.strip('_mock_') + model_method = getattr(self, m) + setattr(self, '_model_' + m, model_method) + setattr(self, m, mock_method) + + DeviceModel.lookup = self._mock_device_lookup + StorageVolumesModel.get_list = self._mock_storagevolumes_get_list + DebugReportsModel._gen_debugreport_file = self._gen_debugreport_file + LibvirtVMTemplate._get_volume_path = self._get_volume_path + VMTemplate.get_iso_info = self._probe_image + imageinfo.probe_image = self._probe_image - elif key == 'groups': - invalid_groups = set(val) - set(self.groups_get_list()) - if len(invalid_groups) != 0: - raise InvalidParameter("KCHVM0028E", - {'groups': - ", ".join(invalid_groups)}) + def reset(self): + MockModel._mock_vms = {} + self._mock_swupdate = MockSoftwareUpdate() + self._mock_repositories = MockRepositories() - dom.info[key] = val + if hasattr(self, 'objstore'): + self.objstore = ObjectStore(self.objstore_loc) - def _live_vm_update(self, dom, params): - if 'graphics' not in params: - return + params = {'vms': [u'test'], 'templates': [], + 'networks': [u'default'], 'storagepools': [u'default-pool']} - graphics = params.pop('graphics') - passwd = graphics.get('passwd') - if passwd is None: - passwd = "".join(random.sample(string.ascii_letters + - string.digits, 8)) + for res, items in params.iteritems(): + resources = getattr(self, '%s_get_list' % res)() + for i in resources: + if i in items: + continue - expire = graphics.get('passwdValidTo') - if expire is not None: - expire = round(time.time()) + expire + try: + getattr(self, '%s_deactivate' % res[:-1])(i) + except: + pass - dom.info['graphics']["passwd"] = passwd - dom.info['graphics']["passwdValidTo"] = expire + getattr(self, '%s_delete' % res[:-1])(i) - def vm_update(self, name, params): - dom = self._get_vm(name) - self._static_vm_update(dom, params) - self._live_vm_update(dom, params) + volumes = self.storagevolumes_get_list('default-pool') + for v in volumes: + self.storagevolume_delete('default-pool', v) - return dom.name - - def vm_lookup(self, name): - vm = self._get_vm(name) - if vm.info['state'] == 'running': - vm.info['screenshot'] = self.vmscreenshot_lookup(name) - else: - vm.info['screenshot'] = None - - validTo = vm.info['graphics']['passwdValidTo'] - validTo = (validTo - round(time.time()) if validTo is not None - else None) - vm.info['graphics']['passwdValidTo'] = validTo - return vm.info - - def vm_delete(self, name): - vm = self._get_vm(name) - self._vmscreenshot_delete(vm.uuid) - for disk in vm.disk_paths: - self.storagevolume_delete(disk['pool'], disk['volume']) - - del self._mock_vms[vm.name] - - def vm_start(self, name): - self._get_vm(name).info['state'] = 'running' - - def vm_poweroff(self, name): - self._get_vm(name).info['state'] = 'shutoff' - - def vm_shutdown(self, name): - self._get_vm(name).info['state'] = 'shutoff' - - def vm_reset(self, name): - pass - - def vm_connect(self, name): - pass - - def vm_clone(self, name): - vm = self._mock_vms[name] - if vm.info['state'] != u'shutoff': - raise InvalidParameter('KCHVM0033E', {'name': name}) - - new_name = get_next_clone_name(self.vms_get_list(), name) - - taskid = self.add_task(u'/vms/%s' % new_name, self._do_clone, - {'name': name, 'new_name': new_name}) - return self.task_lookup(taskid) - - def _do_clone(self, cb, params): - name = params['name'] - new_name = params['new_name'] - - vm = self._mock_vms[name] - new_vm = copy.deepcopy(vm) - - new_uuid = unicode(uuid.uuid4()) - - new_vm.name = new_name - new_vm.info['name'] = new_name - new_vm.uuid = new_uuid - new_vm.info['uuid'] = new_uuid - - for mac, iface in new_vm.ifaces.items(): - new_mac = MockVMIface.get_mac() - iface.info['mac'] = new_mac - new_vm.ifaces[new_mac] = iface - - storage_names = new_vm.storagedevices.keys() - for i, storage_name in enumerate(storage_names): - storage = new_vm.storagedevices[storage_name] - basename, ext = os.path.splitext(storage.info['path']) - new_path = u'%s-%d%s' % (basename, i, ext) - new_vm.storagedevices[storage_name].path = new_path - - self._mock_vms[new_name] = new_vm - - cb('OK', True) - - def vms_create(self, params): - t_name = template_name_from_uri(params['template']) - name = get_vm_name(params.get('name'), t_name, self._mock_vms.keys()) - if name in self._mock_vms: - raise InvalidOperation("KCHVM0001E", {'name': name}) - - vm_uuid = str(uuid.uuid4()) - vm_overrides = dict() - pool_uri = params.get('storagepool') - if pool_uri: - vm_overrides['storagepool'] = pool_uri - - t = self._get_template(t_name, vm_overrides) - t.validate() - - t_info = copy.deepcopy(t.info) - graphics = params.get('graphics') - if graphics: - t_info.update({'graphics': graphics}) - - vm = MockVM(vm_uuid, name, t_info) - icon = t_info.get('icon') - if icon: - vm.info['icon'] = icon - - pool = t._storage_validate() - if pool.info['type'] == 'scsi': - vm.disk_paths = [] - if not params.get('volumes'): - raise MissingParameter('KCHVM0017E') - for vol in params['volumes']: - vm.disk_paths.append({'pool': pool.name, - 'volume': vol}) - - else: - vm.disk_paths = t.fork_vm_storage(vm_uuid) - - index = 0 - for disk in vm.disk_paths: - storagepath = self._mock_storagepools[disk['pool']].info['path'] - fullpath = os.path.join(storagepath, disk['volume']) - dev_name = "hd" + string.ascii_lowercase[index] - params = {'dev': dev_name, 'path': fullpath, 'type': 'disk'} - vm.storagedevices[dev_name] = MockVMStorageDevice(params) - index += 1 - - cdrom = "hd" + string.ascii_lowercase[index + 1] - if t_info.get('cdrom'): - cdrom_params = { - 'dev': cdrom, 'path': t_info['cdrom'], 'type': 'cdrom'} - vm.storagedevices[cdrom] = MockVMStorageDevice(cdrom_params) - - self._mock_vms[name] = vm - return name - - def vms_get_list(self): - names = self._mock_vms.keys() - return sorted(names, key=unicode.lower) - - def vmscreenshot_lookup(self, name): - vm = self._get_vm(name) - if vm.info['state'] != 'running': - raise NotFoundError("KCHVM0004E", {'name': name}) - - screenshot = self._mock_screenshots.setdefault( - vm.uuid, MockVMScreenshot({'uuid': vm.uuid})) - return screenshot.lookup() - - def _vmscreenshot_delete(self, vm_uuid): - screenshot = self._mock_screenshots.get(vm_uuid) - if screenshot: - screenshot.delete() - del self._mock_screenshots[vm_uuid] - - def template_lookup(self, name): - t = self._get_template(name) - return t.validate_integrity() - - def template_delete(self, name): - try: - del self._mock_templates[name] - except KeyError: - raise NotFoundError("KCHTMPL0002E", {'name': name}) - - def templates_create(self, params): - name = params.get('name', '').strip() - - for net_name in params.get(u'networks', []): - try: - self._get_network(net_name) - except NotFoundError: - msg_args = {'network': net_name, 'template': name} - raise InvalidParameter("KCHTMPL0003E", msg_args) - - if params.get('cpu_info') is None: - params['cpu_info'] = dict() - - t = MockVMTemplate(params, self) - if t.name in self._mock_templates: - raise InvalidOperation("KCHTMPL0001E", {'name': name}) - - self._mock_templates[name] = t - return name - - def template_clone(self, name): - # set default name - subfixs = [v[len(name):] for v in self.templates_get_list() - if v.startswith(name)] - indexs = [int(v.lstrip("-clone")) for v in subfixs - if v.startswith("-clone") and - v.lstrip("-clone").isdigit()] - indexs.sort() - index = "1" if not indexs else str(indexs[-1] + 1) - clone_name = name + "-clone" + index - - temp = self.template_lookup(name) - temp['name'] = clone_name - ident = self.templates_create(temp) - return ident - - def template_update(self, name, params): - old_t = self.template_lookup(name) - new_t = copy.copy(old_t) - - new_t.update(params) - ident = name - - new_storagepool = new_t.get(u'storagepool', '') - try: - self._get_storagepool(pool_name_from_uri(new_storagepool)) - except Exception: - msg_args = {'pool': new_storagepool, 'template': name} - raise InvalidParameter("KCHTMPL0004E", msg_args) - - for net_name in params.get(u'networks', []): - try: - self._get_network(net_name) - except NotFoundError: - msg_args = {'network': net_name, 'template': name} - raise InvalidParameter("KCHTMPL0003E", msg_args) - - self.template_delete(name) + @staticmethod + def domainDefineXML(conn, xml): + name = objectify.fromstring(xml).name.text try: - ident = self.templates_create(new_t) + dom = conn.lookupByName(name) + if not dom.isActive(): + MockModel._mock_vms[name] = xml except: - ident = self.templates_create(old_t) - raise - return ident + pass + + return MockModel._defineXML(conn, xml) + + @staticmethod + def domainXMLDesc(dom, flags=0): + return MockModel._mock_vms.get(dom.name(), + MockModel._XMLDesc(dom, flags)) + + @staticmethod + def undefineDomain(dom): + name = dom.name() + if name in MockModel._mock_vms.keys(): + del MockModel._mock_vms[dom.name()] + return MockModel._undefineDomain(dom) + + @staticmethod + def attachDeviceFlags(dom, xml, flags=0): + old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE) + root = objectify.fromstring(old_xml) + dev = objectify.fromstring(xml) + root.devices.append(dev) + + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8") + + @staticmethod + def _get_device_node(dom, xml): + xpath_map = {'disk': 'target', + 'interface': 'mac', + 'graphics': 'listen'} + + dev = objectify.fromstring(xml) + dev_id = dev.find(xpath_map[dev.tag]).items() + + dev_filter = '' + for key, value in dev_id: + dev_filter += "[@%s='%s']" % (key, value) + + old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE) + root = objectify.fromstring(old_xml) + devices = root.devices + + dev = devices.find("./%s/%s%s/.." % (dev.tag, xpath_map[dev.tag], + dev_filter)) + + return (root, dev) + + @staticmethod + def detachDeviceFlags(dom, xml, flags=0): + root, dev = MockModel._get_device_node(dom, xml) + root.devices.remove(dev) + + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8") + + @staticmethod + def updateDeviceFlags(dom, xml, flags=0): + root, old_dev = MockModel._get_device_node(dom, xml) + root.devices.replace(old_dev, objectify.fromstring(xml)) + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8") + + @staticmethod + def volResize(vol, size, flags=0): + new_xml = xml_item_update(vol.XMLDesc(0), './capacity', str(size)) + vol.delete(0) + pool = vol.storagePoolLookupByVolume() + pool.createXML(new_xml) + + @staticmethod + def volWipePattern(vol, algorithm, flags=0): + new_xml = xml_item_update(vol.XMLDesc(0), './allocation', '0') + vol.delete(0) + pool = vol.storagePoolLookupByVolume() + pool.createXML(new_xml) + + def _probe_image(self, path): + return ('unknown', 'unknown') - def templates_get_list(self): - return self._mock_templates.keys() - - def _get_template(self, name, overrides=None): - try: - t = self._mock_templates[name] - if overrides: - args = copy.copy(t.info) - args.update(overrides) - return MockVMTemplate(args, self) - else: - return t - except KeyError: - raise NotFoundError("KCHTMPL0002E", {'name': name}) - - def debugreport_lookup(self, name): - path = config.get_debugreports_path() - file_pattern = os.path.join(path, name + '.txt') - try: - file_target = glob.glob(file_pattern)[0] - except IndexError: - raise NotFoundError("KCHDR0001E", {'name': name}) - - ctime = os.stat(file_target).st_mtime - ctime = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(ctime)) - file_target = os.path.split(file_target)[-1] - file_target = os.path.join("/data/debugreports", file_target) - return {'uri': file_target, - 'ctime': ctime} - - def debugreportcontent_lookup(self, name): - return self.debugreport_lookup(name) - - def debugreport_update(self, name, params): - path = config.get_debugreports_path() - file_pattern = os.path.join(path, name + '.txt') - try: - file_source = glob.glob(file_pattern)[0] - except IndexError: - raise NotFoundError("KCHDR0001E", {'name': name}) + def _get_volume_path(self, pool, vol): + pool_info = self.storagepool_lookup(pool) + if pool_info['type'] == 'scsi': + return self._mock_storagevolumes.scsi_volumes[vol]['path'] - file_target = file_source.replace(name, params['name']) - if os.path.isfile(file_target): - raise InvalidParameter('KCHDR0008E', {'name': params['name']}) + return MockModel._libvirt_get_vol_path(pool, vol) - shutil.move(file_source, file_target) - return params['name'] + def _gen_debugreport_file(self, name): + return add_task('/debugreports/%s' % name, self._create_log, + self.objstore, name) - def debugreport_delete(self, name): - path = config.get_debugreports_path() - file_pattern = os.path.join(path, name + '.txt') - try: - file_target = glob.glob(file_pattern)[0] - except IndexError: - raise NotFoundError("KCHDR0001E", {'name': name}) - - os.remove(file_target) - - def debugreports_create(self, params): - ident = params.get('name').strip() - # Generate a name with time and millisec precision, if necessary - if ident is None or ident == "": - ident = 'report-' + str(int(time.time() * 1000)) - else: - if ident in self.debugreports_get_list(): - raise InvalidParameter("KCHDR0008E", {"name": ident}) - taskid = self._gen_debugreport_file(ident) - return self.task_lookup(taskid) - - def debugreports_get_list(self): + def _create_log(self, cb, name): path = config.get_debugreports_path() - file_pattern = os.path.join(path, '*.txt') - file_lists = glob.glob(file_pattern) - file_lists = [os.path.split(file)[1] for file in file_lists] - name_lists = [file.split('.', 1)[0] for file in file_lists] - - return name_lists - - def _get_vm(self, name): - try: - return self._mock_vms[name] - except KeyError: - raise NotFoundError("KCHVM0002E", {'name': name}) - - def storagepools_create(self, params): - try: - name = params['name'] - pool = MockStoragePool(name) - pool.info['type'] = params['type'] - if params['type'] == 'scsi': - pool.info['path'] = '/dev/disk/by-path' - pool.info['source'] = params['source'] - if not pool.info['source'].get('adapter_name'): - raise MissingParameter('KCHPOOL0004E', - {'item': 'adapter_name', - 'name': name}) - for vol in ['unit:0:0:1', 'unit:0:0:2', - 'unit:0:0:3', 'unit:0:0:4']: - mockvol = MockStorageVolume(name, vol, - dict([('type', 'lun')])) - pool._volumes[vol] = mockvol - else: - pool.info['path'] = params['path'] - if params['type'] in ['dir', 'scsi']: - pool.info['autostart'] = True - else: - pool.info['autostart'] = False - except KeyError, item: - raise MissingParameter("KCHPOOL0004E", - {'item': str(item), 'name': name}) - - if name in self._mock_storagepools or name in (ISO_POOL_NAME,): - raise InvalidOperation("KCHPOOL0001E", {'name': name}) - - self._mock_storagepools[name] = pool - return name - - def storagepool_lookup(self, name): - storagepool = self._get_storagepool(name) - storagepool.refresh() - return storagepool.info - - def storagepool_update(self, name, params): - pool = self._get_storagepool(name) - if 'autostart' in params: - pool.info['autostart'] = params['autostart'] - if 'disks' in params: - # check if pool is type 'logical' - if pool.info['type'] != 'logical': - raise InvalidOperation('KCHPOOL0029E') - self._update_lvm_disks(name, params['disks']) - ident = pool.name - return ident - - def storagepool_activate(self, name): - self._get_storagepool(name).info['state'] = 'active' - - def storagepool_deactivate(self, name): - self._get_storagepool(name).info['state'] = 'inactive' - - def storagepool_delete(self, name): - # firstly, we should check the pool actually exists - pool = self._get_storagepool(name) - del self._mock_storagepools[pool.name] - - def storagepools_get_list(self): - return sorted(self._mock_storagepools.keys()) - - def _get_storagepool(self, name): - try: - return self._mock_storagepools[name] - except KeyError: - raise NotFoundError("KCHPOOL0002E", {'name': name}) + tmpf = os.path.join(path, name + '.tmp') + realf = os.path.join(path, name + '.txt') + length = random.randint(1000, 10000) + with open(tmpf, 'w') as fd: + while length: + fd.write('I am logged') + length = length - 1 + os.rename(tmpf, realf) + cb("OK", True) - def storagevolumes_create(self, pool_name, params): + def _mock_storagevolumes_create(self, pool, params): vol_source = ['file', 'url', 'capacity'] - require_name_params = ['capacity'] - - name = params.get('name') - index_list = list(i for i in range(len(vol_source)) if vol_source[i] in params) - if len(index_list) != 1: - raise InvalidParameter("KCHVOL0018E", - {'param': ",".join(vol_source)}) - create_param = vol_source[index_list[0]] - + name = params.get('name') if name is None: - if create_param in require_name_params: - raise InvalidParameter('KCHVOL0016E') - if create_param == 'file': name = os.path.basename(params['file'].filename) + del params['file'] + params['capacity'] = 1024 elif create_param == 'url': name = os.path.basename(params['url']) - else: - name = 'upload-%s' % int(time.time()) + del params['url'] + params['capacity'] = 1024 params['name'] = name - try: - create_func = getattr(self, '_create_volume_with_%s' % - create_param) - except AttributeError: - raise InvalidParameter("KCHVOL0019E", {'param': create_param}) - - pool = self._get_storagepool(pool_name) - if pool.info['type'] in READONLY_POOL_TYPE: - raise InvalidParameter("KCHVOL0012E", {'type': pool.info['type']}) - if pool.info['state'] == 'inactive': - raise InvalidParameter('KCHVOL0003E', {'pool': pool_name, - 'volume': name}) - if name in pool._volumes: - raise InvalidOperation("KCHVOL0001E", {'name': name}) - - params['pool'] = pool_name - targeturi = '/storagepools/%s/storagevolumes/%s' % (pool_name, name) - taskid = self.add_task(targeturi, create_func, params) - return self.task_lookup(taskid) - - def _create_volume_with_file(self, cb, params): - upload_file = params['file'] - params['name'] = params['name'] - params['format'] = 'raw' - params['capacity'] = upload_file.fp.length - size = 0 - try: - while True: - data = upload_file.file.read(8192*32) - if not data: - break - size += len(data) - cb('%s/%s' % (size, params['capacity']), True) - except Exception as e: - raise OperationFailed('KCHVOL0007E', - {'name': params['name'], - 'pool': params['pool'], - 'err': e.message}) - self._create_volume_with_capacity(cb, params) - cb('%s/%s' % (size, params['capacity']), True) - - def _create_volume_with_capacity(self, cb, params): - pool_name = params.pop('pool') - pool = self._get_storagepool(pool_name) + return self._model_storagevolumes_create(pool, params) - try: - name = params['name'] - volume = MockStorageVolume(pool, name, params) - volume.info['type'] = 'file' - volume.info['ref_cnt'] = params.get('ref_cnt', 0) - volume.info['format'] = params['format'] - volume.info['path'] = os.path.join( - pool.info['path'], name) - if 'base' in params: - volume.info['base'] = copy.deepcopy(params['base']) - except KeyError, item: - raise MissingParameter("KCHVOL0004E", - {'item': str(item), 'volume': name}) - - pool._volumes[name] = volume - cb('OK', True) - - def _create_volume_with_url(self, cb, params): - pool_name = params['pool'] - name = params['name'] - url = params['url'] - - pool = self._get_storagepool(pool_name) - - params['path'] = os.path.join(pool.info['path'], name) - params['type'] = 'file' - params['base'] = url - - volume = MockStorageVolume(pool, name, params) - pool._volumes[name] = volume - - cb('OK', True) - - def storagevolume_clone(self, pool, name, new_pool=None, new_name=None): - if new_name is None: - base, ext = os.path.splitext(name) - new_name = get_next_clone_name(self.vms_get_list(), base, ext) - - if new_pool is None: - new_pool = pool - - params = {'name': name, - 'pool': pool, - 'new_name': new_name, - 'new_pool': new_pool} - taskid = self.add_task('/storagepools/%s/storagevolumes/%s' % - (new_pool, new_name), - self._storagevolume_clone_task, params) - return self.task_lookup(taskid) - - def _storagevolume_clone_task(self, cb, params): - try: - vol_name = params['name'].decode('utf-8') - pool_name = params['pool'].decode('utf-8') - new_vol_name = params['new_name'].decode('utf-8') - new_pool_name = params['new_pool'].decode('utf-8') - - orig_pool = self._get_storagepool(pool_name) - orig_vol = self._get_storagevolume(pool_name, vol_name) - - new_vol = copy.deepcopy(orig_vol) - new_vol.info['name'] = new_vol_name - new_vol.info['path'] = os.path.join(orig_pool.info['path'], - new_vol_name) - - new_pool = self._get_storagepool(new_pool_name) - new_pool._volumes[new_vol_name] = new_vol - except (KeyError, NotFoundError), e: - raise OperationFailed('KCHVOL0023E', - {'name': vol_name, 'pool': pool_name, - 'err': e.message}) - - cb('OK', True) - - def storagevolume_lookup(self, pool, name): - if self._get_storagepool(pool).info['state'] != 'active': - raise InvalidOperation("KCHVOL0005E", {'pool': pool, - 'volume': name}) - - storagevolume = self._get_storagevolume(pool, name) - return storagevolume.info - - def storagevolume_wipe(self, pool, name): - volume = self._get_storagevolume(pool, name) - volume.info['allocation'] = 0 - - def storagevolume_delete(self, pool, name): - # firstly, we should check the pool actually exists - volume = self._get_storagevolume(pool, name) - del self._get_storagepool(pool)._volumes[volume.name] - - def storagevolume_resize(self, pool, name, size): - volume = self._get_storagevolume(pool, name) - volume.info['capacity'] = size - - def storagevolumes_get_list(self, pool): - res = self._get_storagepool(pool) - if res.info['state'] == 'inactive': - raise InvalidOperation("KCHVOL0006E", {'pool': pool}) - return res._volumes.keys() - - def devices_get_list(self, _cap=None, _passthrough=None, - _passthrough_affected_by=None): - if _cap is None: - return self._mock_devices.devices.keys() - return [dev['name'] for dev in self._mock_devices.devices.values() - if dev['device_type'] == _cap] + def _mock_storagevolumes_get_list(self, pool): + pool_info = self.storagepool_lookup(pool) + if pool_info['type'] == 'scsi': + return self._mock_storagevolumes.scsi_volumes.keys() - def device_lookup(self, dev_name): - return self._mock_devices.devices[dev_name] + return self._model_storagevolumes_get_list(pool) - def isopool_lookup(self, name): - return {'state': 'active', - 'type': 'kimchi-iso'} - - def isovolumes_get_list(self): - iso_volumes = [] - pools = self.storagepools_get_list() - - for pool in pools: - try: - volumes = self.storagevolumes_get_list(pool) - except InvalidOperation: - # Skip inactive pools - continue - for volume in volumes: - res = self.storagevolume_lookup(pool, volume) - if res['format'] == 'iso': - # prevent iso from different pool having same volume name - res['name'] = '%s-%s' % (pool, volume) - iso_volumes.append(res) - return iso_volumes - - def storageservers_get_list(self, _target_type=None): - # FIXME: This needs to be updted when adding new storage server support - target_type = STORAGE_SERVERS \ - if not _target_type else [_target_type] - pools = self.storagepools_get_list() - server_list = [] - for pool in pools: - try: - pool_info = self.storagepool_lookup(pool) - if (pool_info['type'] in target_type and - pool_info['source']['addr'] not in server_list): - server_list.append(pool_info['source']['addr']) - except NotFoundError: - pass - - return server_list - - def storageserver_lookup(self, server): - pools = self.storagepools_get_list() - for pool in pools: - try: - pool_info = self.storagepool_lookup(pool) - if pool_info['source'] and \ - pool_info['source']['addr'] == server: - return dict(host=server) - except NotFoundError: - # Avoid inconsistent pool result because - # of lease between list and lookup - pass - - raise NotFoundError("KCHSR0001E", {'server': server}) - - def dummy_interfaces(self): - interfaces = {} - ifaces = {"eth1": "nic", "bond0": "bonding", - "eth1.10": "vlan", "bridge0": "bridge"} - for i, name in enumerate(ifaces.iterkeys()): - iface = Interface(name) - iface.info['type'] = ifaces[name] - iface.info['ipaddr'] = '192.168.%s.101' % (i + 1) - interfaces[name] = iface - interfaces['eth1'].info['ipaddr'] = '192.168.0.101' - return interfaces - - def interfaces_get_list(self): - return self._mock_interfaces.keys() - - def interface_lookup(self, name): - return self._mock_interfaces[name].info - - def networks_create(self, params): - name = params['name'] - if name in self.networks_get_list(): - raise InvalidOperation("KCHNET0001E", {'name': name}) - - network = MockNetwork(name) - connection = params['connection'] - network.info['connection'] = connection - if connection == "bridge": - try: - interface = params['interface'] - network.info['interface'] = interface - except KeyError: - raise MissingParameter("KCHNET0004E", - {'name': name}) - - subnet = params.get('subnet', '') - if subnet: - network.info['subnet'] = subnet - try: - net = ipaddr.IPNetwork(subnet) - except ValueError: - msg_args = {'subnet': subnet, 'network': name} - raise InvalidParameter("KCHNET0003E", msg_args) - - network.info['dhcp'] = { - 'start': str(net.network + net.numhosts / 2), - 'stop': str(net.network + net.numhosts - 2)} - - self._mock_networks[name] = network - return name - - def _get_network(self, name): - try: - return self._mock_networks[name] - except KeyError: - raise NotFoundError("KCHNET0002E", {'name': name}) - - def _get_vms_attach_to_a_network(self, network): - vms = [] - for name, dom in self._mock_vms.iteritems(): - if network in dom.networks: - vms.append(name) - return vms - - def _is_network_used_by_template(self, network): - for name, tmpl in self._mock_templates.iteritems(): - if network in tmpl.info['networks']: - return True - return False - - def _is_network_in_use(self, name): - # The network "default" is used for Kimchi proposal and should not be - # deactivate or deleted. Otherwise, we will allow user create - # inconsistent templates from scratch - if name == 'default': - return True - - vms = self._get_vms_attach_to_a_network(name) - return bool(vms) or self._is_network_used_by_template(name) - - def network_lookup(self, name): - network = self._get_network(name) - network.info['vms'] = self._get_vms_attach_to_a_network(name) - network.info['in_use'] = self._is_network_in_use(name) - - return network.info - - def network_activate(self, name): - self._get_network(name).info['state'] = 'active' - - def network_deactivate(self, name): - if self._is_network_in_use(name): - raise InvalidOperation("KCHNET0018E", {'name': name}) - - network = self._get_network(name) - if not network.info['persistent']: - self.network_delete(name) - - network.info['state'] = 'inactive' - - def network_delete(self, name): - if self._is_network_in_use(name): - raise InvalidOperation("KCHNET0017E", {'name': name}) - - # firstly, we should check the network actually exists - network = self._get_network(name) - del self._mock_networks[network.name] - - def networks_get_list(self): - return sorted(self._mock_networks.keys()) - - def vmstorages_create(self, vm_name, params): - path = params.get('path') - if path and path.startswith('/') and not os.path.exists(path): - raise InvalidParameter("KCHVMSTOR0003E", {'value': path}) - if path and params.get('pool'): - raise InvalidParameter("KCHVMSTOR0017E") - elif params.get('pool'): - try: - self.storagevolume_lookup(params['pool'], params['vol']) - except Exception as e: - raise InvalidParameter("KCHVMSTOR0015E", {'error': e}) - dom = self._get_vm(vm_name) - index = len(dom.storagedevices.keys()) + 1 - params['dev'] = "hd" + string.ascii_lowercase[index] - - vmdev = MockVMStorageDevice(params) - dom.storagedevices[params['dev']] = vmdev - return params['dev'] - - def vmstorages_get_list(self, vm_name): - dom = self._get_vm(vm_name) - return dom.storagedevices.keys() - - def vmstorage_lookup(self, vm_name, dev_name): - dom = self._get_vm(vm_name) - if dev_name not in self.vmstorages_get_list(vm_name): - raise NotFoundError( - "KCHVMSTOR0007E", - {'dev_name': dev_name, 'vm_name': vm_name}) - return dom.storagedevices.get(dev_name).info - - def vmstorage_delete(self, vm_name, dev_name): - dom = self._get_vm(vm_name) - if dev_name not in self.vmstorages_get_list(vm_name): - raise NotFoundError( - "KCHVMSTOR0007E", - {'dev_name': dev_name, 'vm_name': vm_name}) - dom.storagedevices.pop(dev_name) - - def vmstorage_update(self, vm_name, dev_name, params): - try: - dom = self._get_vm(vm_name) - dom.storagedevices[dev_name].info.update(params) - except Exception as e: - raise OperationFailed("KCHVMSTOR0009E", {'error': e.message}) - return dev_name - - def vmifaces_create(self, vm, params): - if (params["type"] == "network" and - params["network"] not in self.networks_get_list()): - msg_args = {'network': params["network"], 'name': vm} - raise InvalidParameter("KCHVMIF0002E", msg_args) - - dom = self._get_vm(vm) - iface = MockVMIface(params["network"]) - ("model" in params.keys() and - iface.info.update({"model": params["model"]})) - - mac = iface.info['mac'] - dom.ifaces[mac] = iface - return mac - - def vmifaces_get_list(self, vm): - dom = self._get_vm(vm) - macs = dom.ifaces.keys() - return macs - - def vmiface_lookup(self, vm, mac): - dom = self._get_vm(vm) - try: - info = dom.ifaces[mac].info - except KeyError: - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm}) - return info + def _mock_storagevolume_lookup(self, pool, vol): + pool_info = self.storagepool_lookup(pool) + if pool_info['type'] == 'scsi': + return self._mock_storagevolumes.scsi_volumes[vol] - def vmiface_delete(self, vm, mac): - dom = self._get_vm(vm) - try: - del dom.ifaces[mac] - except KeyError: - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm}) + return self._model_storagevolume_lookup(pool, vol) - def vmiface_update(self, vm, mac, params): - dom = self._get_vm(vm) - try: - info = dom.ifaces[mac].info - except KeyError: - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm}) - if info['type'] == 'network' and 'network' in params: - info['network'] = params['network'] - if 'model' in params: - info['model'] = params['model'] - return mac - - def tasks_get_list(self): - with self.objstore as session: - return session.get_list('task') - - def task_lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) - - def add_task(self, target_uri, fn, opaque=None): - id = self.next_taskid - self.next_taskid = self.next_taskid + 1 - AsyncTask(id, target_uri, fn, self.objstore, opaque) - - return id - - def _get_storagevolume(self, pool, name): - try: - return self._get_storagepool(pool)._volumes[name] - except KeyError: - raise NotFoundError("KCHVOL0002E", {'name': name, 'pool': pool}) + def _mock_interfaces_get_list(self): + return self._mock_interfaces.ifaces.keys() - def _get_distros(self): - distroloader = DistroLoader() - return distroloader.get() + def _mock_interface_lookup(self, name): + return self._mock_interfaces.ifaces[name] - def distros_get_list(self): - return self.distros.keys() + def _mock_devices_get_list(self, _cap=None, _passthrough=None, + _passthrough_affected_by=None): + if _cap is None: + return self._mock_devices.devices.keys() + return [dev['name'] for dev in self._mock_devices.devices.values() + if dev['device_type'] == _cap] - def distro_lookup(self, name): - try: - return self.distros[name] - except KeyError: - raise NotFoundError("KCHDISTRO0001E", {'name': name}) + def _mock_device_lookup(self, dev_name): + return self._mock_devices.devices[dev_name] - def _gen_debugreport_file(self, ident): - return self.add_task('/debugreports/%s' % ident, self._create_log, - ident) + def _mock_packagesupdate_get_list(self): + return self._mock_swupdate.pkgs.keys() - def _create_log(self, cb, name): - path = config.get_debugreports_path() - tmpf = os.path.join(path, name + '.tmp') - realf = os.path.join(path, name + '.txt') - length = random.randint(1000, 10000) - with open(tmpf, 'w') as fd: - while length: - fd.write('I am logged') - length = length - 1 - os.rename(tmpf, realf) - cb("OK", True) + def _mock_packageupdate_lookup(self, pkg_name): + return self._mock_swupdate.pkgs[pkg_name] - def host_lookup(self, *name): - res = {} - res['memory'] = 6114058240 - res['cpu_model'] = 'Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz' - res['cpus'] = 4 - res['os_distro'] = 'Red Hat Enterprise Linux Server' - res['os_version'] = '6.4' - res['os_codename'] = 'Santiago' - - return res - - def hoststats_lookup(self, *name): - virt_mem = psutil.virtual_memory() - memory_stats = {'total': virt_mem.total, - 'free': virt_mem.free, - 'cached': virt_mem.cached, - 'buffers': virt_mem.buffers, - 'avail': virt_mem.available} - return {'cpu_utilization': round(random.uniform(0, 100), 1), - 'memory': memory_stats, - 'disk_read_rate': round(random.uniform(0, 4000), 1), - 'disk_write_rate': round(random.uniform(0, 4000), 1), - 'net_recv_rate': round(random.uniform(0, 4000), 1), - 'net_sent_rate': round(random.uniform(0, 4000), 1)} - - def hoststatshistory_lookup(self, *name): - return {'cpu_utilization': random.sample(range(100), 30), - 'memory': random.sample(range(4000), 30), - 'disk_read_rate': random.sample(range(4000), 30), - 'disk_write_rate': random.sample(range(4000), 30), - 'net_recv_rate': random.sample(range(4000), 30), - 'net_sent_rate': random.sample(range(4000), 30)} - - def users_get_list(self): - return ["userA", "userB", "userC", "admin"] - - def groups_get_list(self): - return ["groupA", "groupB", "groupC", "groupD"] - - def peers_get_list(self): - if kconfig.get("server", "federation") == "off": - return [] - - return ["https://serverA:8001", "https://serverB:8001"] - - def vms_get_list_by_state(self, state): - ret_list = [] - for name in self.vms_get_list(): - if (self._mock_vms[name].info['state']) == state: - ret_list.append(name) - return ret_list - - def host_shutdown(self, args=None): - # Check for running vms before shutdown - running_vms = self.vms_get_list_by_state('running') - if len(running_vms) > 0: - raise OperationFailed("KCHHOST0001E") - cherrypy.engine.exit() - - def host_reboot(self, args=None): - # Find running VMs - running_vms = self.vms_get_list_by_state('running') - if len(running_vms) > 0: - raise OperationFailed("KCHHOST0002E") - cherrypy.engine.stop() - time.sleep(10) - cherrypy.engine.start() - - def partitions_get_list(self): - result = disks.get_partitions_names() - return result - - def partition_lookup(self, name): - if name not in disks.get_partitions_names(): - raise NotFoundError("KCHPART0001E", {'name': name}) - - return disks.get_partition_details(name) - - def config_lookup(self, name): - return {'display_proxy_port': kconfig.get('display', - 'display_proxy_port'), - 'version': config.get_version()} - - def packagesupdate_get_list(self): - return self._mock_swupdate.getUpdates() - - def packageupdate_lookup(self, pkg_name): - return self._mock_swupdate.getUpdate(pkg_name) - - def host_swupdate(self, args=None): - task_id = self.add_task('/host/swupdate', self._mock_swupdate.doUpdate, - None) + def _mock_host_swupdate(self, args=None): + task_id = add_task('/host/swupdate', self._mock_swupdate.doUpdate, + self.objstore) return self.task_lookup(task_id) - def repositories_get_list(self): - return self._mock_host_repositories.getRepositories() + def _mock_repositories_get_list(self): + return self._mock_repositories.repos.keys() - def repositories_create(self, params): + def _mock_repositories_create(self, params): # Create a repo_id if not given by user. The repo_id will follow # the format kimchi_repo_<integer>, where integer is the number of # seconds since the Epoch (January 1st, 1970), in UTC. @@ -1121,387 +307,62 @@ class MockModel(object): repo_id = "kimchi_repo_%s" % str(int(time.time() * 1000)) params.update({'repo_id': repo_id}) - if repo_id in self.repositories_get_list(): - raise InvalidOperation("KCHREPOS0022E", {'repo_id': repo_id}) - - self._mock_host_repositories.addRepository(params) - return repo_id - - def repository_lookup(self, repo_id): - return self._mock_host_repositories.getRepository(repo_id) - - def repository_delete(self, repo_id): - return self._mock_host_repositories.removeRepository(repo_id) - - def repository_enable(self, repo_id): - return self._mock_host_repositories.enableRepository(repo_id) - - def repository_disable(self, repo_id): - return self._mock_host_repositories.disableRepository(repo_id) - - def repository_update(self, repo_id, params): - return self._mock_host_repositories.updateRepository(repo_id, params) - - -class MockVMTemplate(VMTemplate): - def __init__(self, args, mockmodel_inst=None): - VMTemplate.__init__(self, args) - self.model = mockmodel_inst - - def _get_all_networks_name(self): - return self.model.networks_get_list() - - def _get_all_storagepools_name(self): - return self.model.storagepools_get_list() - - def _storage_validate(self): - pool_uri = self.info['storagepool'] - pool_name = pool_name_from_uri(pool_uri) - try: - pool = self.model._get_storagepool(pool_name) - except NotFoundError: - msg_args = {'pool': pool_name, 'template': self.name} - raise InvalidParameter("KCHTMPL0004E", msg_args) - - if pool.info['state'] != 'active': - msg_args = {'pool': pool_name, 'template': self.name} - raise InvalidParameter("KCHTMPL0005E", msg_args) - - return pool - - def _get_storage_path(self): - pool = self._storage_validate() - return pool.info['path'] - - def _get_volume_path(self, pool, vol): - return self.model.storagevolume_lookup(pool, vol)['path'] - - def fork_vm_storage(self, vm_name): - pool = self._storage_validate() - volumes = self.to_volume_list(vm_name) - disk_paths = [] - for vol_info in volumes: - vol_info['capacity'] = vol_info['capacity'] << 10 - vol_info['ref_cnt'] = 1 - if 'base' in self.info: - vol_info['base'] = copy.deepcopy(self.info['base']) - self.model.storagevolumes_create(pool.name, vol_info) - disk_paths.append({'pool': pool.name, 'volume': vol_info['name']}) - return disk_paths - - -class MockVMStorageDevice(object): - def __init__(self, params): - self.info = {'dev': params.get('dev'), - 'type': params.get('type'), - 'pool': params.get('pool'), - 'vol': params.get('vol'), - 'path': params.get('path')} - - -class MockVMIface(object): - counter = 0 - - def __init__(self, network=None): - self.__class__.counter += 1 - self.info = {'type': 'network', - 'model': 'virtio', - 'network': network if network - else "net-%s" % self.counter, - 'mac': self.get_mac() - } - - @classmethod - def get_mac(cls): - mac = ":".join(["52", "54"] + - ["%02x" % (cls.counter / (256 ** i) % 256) - for i in range(3, -1, -1)]) - return mac - - -class MockVM(object): - def __init__(self, uuid, name, template_info): - self.uuid = uuid - self.name = name - self.memory = template_info['memory'] - self.cpus = template_info['cpus'] - self.disk_paths = [] - self.networks = template_info['networks'] - ifaces = [MockVMIface(net) for net in self.networks] - self.storagedevices = {} - self.ifaces = dict([(iface.info['mac'], iface) for iface in ifaces]) - - stats = {'cpu_utilization': 20, - 'net_throughput': 35, - 'net_throughput_peak': 100, - 'io_throughput': 45, - 'io_throughput_peak': 100} - self.info = {'name': self.name, - 'state': 'shutoff', - 'stats': stats, - 'uuid': self.uuid, - 'memory': self.memory, - 'cpus': self.cpus, - 'icon': None, - 'graphics': {'type': 'vnc', 'listen': '127.0.0.1', - 'port': None, 'passwd': '123456', - 'passwdValidTo': None}, - 'users': ['user1', 'user2', 'root'], - 'groups': ['group1', 'group2', 'admin'], - 'access': 'full' - } - self.info['graphics'].update(template_info['graphics']) - - -class MockStoragePool(object): - def __init__(self, name): - self.name = name - self.info = {'state': 'inactive', - 'capacity': 1024 << 20, - 'allocated': 512 << 20, - 'available': 512 << 20, - 'path': '/var/lib/libvirt/images', - 'source': {}, - 'type': 'dir', - 'nr_volumes': 0, - 'autostart': 0, - 'persistent': True} - self._volumes = {} - - def refresh(self): - state = self.info['state'] - self.info['nr_volumes'] = len(self._volumes) \ - if state == 'active' else 0 - - -class Interface(object): - def __init__(self, name): - self.name = name - self.info = {'type': 'nic', - 'ipaddr': '192.168.0.101', - 'netmask': '255.255.255.0', - 'status': 'active'} - - -class MockNetwork(object): - def __init__(self, name): - self.name = name - self.info = {'state': 'inactive', - 'autostart': True, - 'connection': 'nat', - 'interface': 'virbr0', - 'subnet': '192.168.122.0/24', - 'dhcp': {'start': '192.168.122.128', - 'stop': '192.168.122.254'}, - 'persistent': True - } - - -class MockTask(object): - def __init__(self, id): - self.id = id - - -class MockStorageVolume(object): - def __init__(self, pool, name, params={}): - self.name = name - self.pool = pool - # Check if volume should be scsi lun - if params.get('type') == 'lun': - params = self._def_lun(name) - fmt = params.get('format', 'raw') - capacity = params.get('capacity', 1024) - self.info = {'type': params.get('type', 'disk'), - 'capacity': capacity << 20, - 'allocation': params.get('allocation', '512'), - 'path': params.get('path'), - 'ref_cnt': params.get('ref_cnt'), - 'format': fmt} - if fmt == 'iso': - self.info['allocation'] = self.info['capacity'] - self.info['os_version'] = '17' - self.info['os_distro'] = 'fedora' - self.info['bootable'] = True - - def _def_lun(self, name): - capacity = int(random.uniform(100, 300)) << 20 - path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20999980e52e4492-lun" - return { - "capacity": capacity, - "name": name, - "format": random.choice(['dos', 'unknown']), - "allocation": capacity, - "path": path + name[-1], - "type": "block"} - - -class MockVMScreenshot(VMScreenshot): - OUTDATED_SECS = 5 - BACKGROUND_COLOR = ['blue', 'green', 'purple', 'red', 'yellow'] - BOX_COORD = (50, 115, 206, 141) - BAR_COORD = (50, 115, 50, 141) - - def __init__(self, vm_name): - VMScreenshot.__init__(self, vm_name) - self.coord = MockVMScreenshot.BAR_COORD - self.background = random.choice(MockVMScreenshot.BACKGROUND_COLOR) - - def _generate_scratch(self, thumbnail): - self.coord = (self.coord[0], - self.coord[1], - min(MockVMScreenshot.BOX_COORD[2], - self.coord[2] + random.randrange(50)), - self.coord[3]) - - image = Image.new("RGB", (256, 256), self.background) - d = ImageDraw.Draw(image) - d.rectangle(MockVMScreenshot.BOX_COORD, outline='black') - d.rectangle(self.coord, outline='black', fill='black') - image.save(thumbnail) - - -class MockSoftwareUpdate(object): - def __init__(self): - self._packages = { - 'udevmountd': {'repository': 'openSUSE-13.1-Update', - 'version': '0.81.5-14.1', - 'arch': 'x86_64', - 'package_name': 'udevmountd'}, - 'sysconfig-network': {'repository': 'openSUSE-13.1-Extras', - 'version': '0.81.5-14.1', - 'arch': 'x86_64', - 'package_name': 'sysconfig-network'}, - 'libzypp': {'repository': 'openSUSE-13.1-Update', - 'version': '13.9.0-10.1', - 'arch': 'noarch', - 'package_name': 'libzypp'}} - self._num2update = 3 - - def getUpdates(self): - return self._packages.keys() - - def getUpdate(self, name): - if name not in self._packages.keys(): - raise NotFoundError('KCHPKGUPD0002E', {'name': name}) - return self._packages[name] - - def getNumOfUpdates(self): - return self._num2update - - def doUpdate(self, cb, params): - msgs = [] - for pkg in self._packages.keys(): - msgs.append("Updating package %s" % pkg) - cb('\n'.join(msgs)) - time.sleep(1) - - time.sleep(2) - msgs.append("All packages updated") - cb('\n'.join(msgs), True) - - # After updating all packages any package should be listed to be - # updated, so reset self._packages - self._packages = {} - - -class MockRepositories(object): - def __init__(self): - self._repos = {"kimchi_repo_1392167832": - {"repo_id": "kimchi_repo_1392167832", - "enabled": True, - "baseurl": "http://www.fedora.org", - "config": {"repo_name": "kimchi_repo_1392167832", - "gpgkey": [], - "gpgcheck": True, - "mirrorlist": ""} - } - } - - def addRepository(self, params): - # Create and enable the repository - repo_id = params['repo_id'] config = params.get('config', {}) - baseurl = params.get('baseurl') - mirrorlist = config.get('mirrorlist', "") - - if baseurl: - validate_repo_url(baseurl) - - if mirrorlist: - validate_repo_url(mirrorlist) - - repo = {'repo_id': repo_id, - 'baseurl': baseurl, + info = {'repo_id': repo_id, + 'baseurl': params['baseurl'], 'enabled': True, 'config': {'repo_name': config.get('repo_name', repo_id), 'gpgkey': config.get('gpgkey', []), 'gpgcheck': True, - 'mirrorlist': mirrorlist} - } - - self._repos[repo_id] = repo + 'mirrorlist': params.get('mirrorlist', '')}} + self._mock_repositories.repos[repo_id] = info return repo_id - def getRepositories(self): - return self._repos.keys() - - def getRepository(self, repo_id): - if repo_id not in self._repos.keys(): - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id}) - - return self._repos[repo_id] - - def enableRepository(self, repo_id): - if repo_id not in self._repos.keys(): - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id}) - - info = self._repos[repo_id] - # Check if repo_id is already enabled - if info['enabled']: - raise NotFoundError("KCHREPOS0015E", {'repo_id': repo_id}) + def _mock_repository_lookup(self, repo_id): + return self._mock_repositories.repos[repo_id] - info['enabled'] = True - self._repos[repo_id] = info - return repo_id + def _mock_repository_delete(self, repo_id): + del self._mock_repositories.repos[repo_id] - def disableRepository(self, repo_id): - if repo_id not in self._repos.keys(): - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id}) + def _mock_repository_enable(self, repo_id): + self._mock_repositories.repos[repo_id]['enabled'] = True - info = self._repos[repo_id] - # Check if repo_id is already disabled - if not info['enabled']: - raise NotFoundError("KCHREPOS0016E", {'repo_id': repo_id}) + def _mock_repository_disable(self, repo_id): + self._mock_repositories.repos[repo_id]['enabled'] = False - info['enabled'] = False - self._repos[repo_id] = info + def _mock_repository_update(self, repo_id, params): + self._mock_repositories.repos[repo_id].update(params) return repo_id - def updateRepository(self, repo_id, params): - if repo_id not in self._repos.keys(): - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id}) - - baseurl = params.get('baseurl', None) - config = params.get('config', {}) - mirrorlist = config.get('mirrorlist', None) - - if baseurl: - validate_repo_url(baseurl) - - if mirrorlist: - validate_repo_url(mirrorlist) - - info = self._repos[repo_id] - info.update(params) - del self._repos[repo_id] - self._repos[info['repo_id']] = info - return info['repo_id'] - def removeRepository(self, repo_id): - if repo_id not in self._repos.keys(): - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id}) - - del self._repos[repo_id] +class MockStorageVolumes(object): + def __init__(self): + base_path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20-lun" + self.scsi_volumes = {'unit:0:0:1': {'capacity': 1024, + 'format': 'unknown', + 'allocation': 512, + 'type': 'block', + 'path': base_path + '1', + 'ref_cnt': 0}, + 'unit:0:0:2': {'capacity': 2048, + 'format': 'unknown', + 'allocation': 512, + 'type': 'block', + 'path': base_path + '2', + 'ref_cnt': 0}} + + +class MockInterfaces(object): + def __init__(self): + self.ifaces = {} + params = {"eth1": "nic", "bond0": "bonding", + "eth1.10": "vlan", "bridge0": "bridge"} + for i, name in enumerate(params.iterkeys()): + info = {'name': name, 'type': params[name], + 'ipaddr': '192.168.%s.101' % (i + 1), + 'netmask': '255.255.255.0', 'status': 'active'} + self.ifaces[name] = info + self.ifaces['eth1']['ipaddr'] = '192.168.0.101' class MockDevices(object): @@ -1581,52 +442,46 @@ class MockDevices(object): 'path': '/sys/devices/pci0000:00/0000:40:00.0/2'}} -def get_mock_environment(): - model = MockModel() - for i in xrange(5): - name = 'test-template-%i' % i - params = {'name': name, 'cdrom': '/file.iso'} - t = MockVMTemplate(params, model) - model._mock_templates[name] = t - - for name in ('test-template-1', 'test-template-3'): - model._mock_templates[name].info.update({'folder': ['rhel', '6']}) - - for i in xrange(10): - name = u'test-vm-%i' % i - vm_uuid = str(uuid.uuid4()) - vm = MockVM(vm_uuid, name, model.template_lookup('test-template-0')) - model._mock_vms[name] = vm - - # mock storagepool - for i in xrange(5): - name = 'default-pool-%i' % i - defaultstoragepool = MockStoragePool(name) - defaultstoragepool.info['path'] += '/%i' % i - model._mock_storagepools[name] = defaultstoragepool - for j in xrange(5): - vol_name = 'volume-%i' % j - defaultstoragevolume = MockStorageVolume(name, vol_name) - defaultstoragevolume.info['path'] = '%s/%s' % ( - defaultstoragepool.info['path'], vol_name) - mockpool = model._mock_storagepools[name] - mockpool._volumes[vol_name] = defaultstoragevolume - vol_name = 'Fedora17.iso' - defaultstoragevolume = MockStorageVolume(name, vol_name, - {'format': 'iso'}) - defaultstoragevolume.info['path'] = '%s/%s' % ( - defaultstoragepool.info['path'], vol_name) - mockpool = model._mock_storagepools[name] - mockpool._volumes[vol_name] = defaultstoragevolume - - # mock network - for i in xrange(5): - name = 'test-network-%i' % i - testnetwork = MockNetwork(name) - testnetwork.info['interface'] = 'virbr%i' % (i + 1) - testnetwork.info['subnet'] = '192.168.%s.0/24' % (i + 1) - testnetwork.info['dhcp']['start'] = '192.168.%s.128' % (i + 1) - testnetwork.info['dhcp']['end'] = '192.168.%s.254' % (i + 1) - model._mock_networks[name] = testnetwork - - return model +class MockSoftwareUpdate(object): + def __init__(self): + self.pkgs = { + 'udevmountd': {'repository': 'openSUSE-13.1-Update', + 'version': '0.81.5-14.1', + 'arch': 'x86_64', + 'package_name': 'udevmountd'}, + 'sysconfig-network': {'repository': 'openSUSE-13.1-Extras', + 'version': '0.81.5-14.1', + 'arch': 'x86_64', + 'package_name': 'sysconfig-network'}, + 'libzypp': {'repository': 'openSUSE-13.1-Update', + 'version': '13.9.0-10.1', + 'arch': 'noarch', + 'package_name': 'libzypp'}} + self._num2update = 3 + + def doUpdate(self, cb, params): + msgs = [] + for pkg in self.pkgs.keys(): + msgs.append("Updating package %s" % pkg) + cb('\n'.join(msgs)) + time.sleep(1) + + time.sleep(2) + msgs.append("All packages updated") + cb('\n'.join(msgs), True) + + # After updating all packages any package should be listed to be + # updated, so reset self._packages + self.pkgs = {} + + +class MockRepositories(object): + def __init__(self): + self.repos = {"kimchi_repo_1392167832": + {"repo_id": "kimchi_repo_1392167832", + "enabled": True, + "baseurl": "http://www.fedora.org", + "config": {"repo_name": "kimchi_repo_1392167832", + "gpgkey": [], + "gpgcheck": True, + "mirrorlist": ""}}} diff --git a/src/kimchi/vmtemplate.py b/src/kimchi/vmtemplate.py index 5dbbdd4..4202ae6 100644 --- a/src/kimchi/vmtemplate.py +++ b/src/kimchi/vmtemplate.py @@ -26,9 +26,9 @@ import uuid from lxml import etree from lxml.builder import E +from kimchi import imageinfo from kimchi import osinfo from kimchi.exception import InvalidParameter, IsoFormatError, MissingParameter -from kimchi.imageinfo import probe_image, probe_img_info from kimchi.isoinfo import IsoImage from kimchi.utils import check_url_path, pool_name_from_uri from kimchi.xmlutils.disk import get_disk_xml @@ -90,10 +90,11 @@ class VMTemplate(object): if 'base' in d.keys(): base_imgs.append(d) if scan: - distro, version = probe_image(d['base']) + distro, version = imageinfo.probe_image(d['base']) if 'size' not in d.keys(): - d['size'] = probe_img_info(d['base'])['virtual-size'] + d_info = imageinfo.probe_img_info(d['base']) + d['size'] = d_info['virtual-size'] if len(base_imgs) == 0: raise MissingParameter("KCHTMPL0016E") @@ -201,7 +202,7 @@ class VMTemplate(object): if 'base' in d: info['base'] = dict() - base_fmt = probe_img_info(d['base'])['format'] + base_fmt = imageinfo.probe_img_info(d['base'])['format'] if base_fmt is None: raise InvalidParameter("KCHTMPL0024E", {'path': d['base']}) info['base']['path'] = d['base'] diff --git a/tests/run_tests.sh.in b/tests/run_tests.sh.in index a097761..6ba486f 100644 --- a/tests/run_tests.sh.in +++ b/tests/run_tests.sh.in @@ -24,11 +24,23 @@ PYTHON_VER=@PYTHON_VERSION@ if [ $# -ne 0 ]; then ARGS="$@" else - ARGS="discover" + ARGS=`find -name "test_*.py" | xargs -I @ basename @ .py` fi if [ "$HAVE_UNITTEST" != "yes" -o "$PYTHON_VER" == "2.6" ]; then - PYTHONPATH=../src:./:../ unit2 $ARGS + CMD="unit2" else - PYTHONPATH=../src:../ python -m unittest $ARGS + CMD="python -m unittest" fi + +SORTED_LIST=($ARGS) +for ((i=0;i<${#SORTED_LIST[@]};i++)); do + + if [[ ${SORTED_LIST[$i]} == test_model* ]]; then + FIRST=${SORTED_LIST[$i]} + SORTED_LIST[$i]=${SORTED_LIST[0]} + SORTED_LIST[0]=$FIRST + fi +done + +PYTHONPATH=../src:../ $CMD ${SORTED_LIST[@]} diff --git a/tests/test_authorization.py b/tests/test_authorization.py index 71b416f..a8a94fe 100644 --- a/tests/test_authorization.py +++ b/tests/test_authorization.py @@ -21,11 +21,10 @@ import json import os import unittest - from functools import partial - import kimchi.mockmodel +from iso_gen import construct_fake_iso from utils import get_free_port, patch_auth, request from utils import run_server @@ -35,6 +34,7 @@ model = None host = None port = None ssl_port = None +fake_iso = '/tmp/fake.iso' def setUpModule(): @@ -47,10 +47,14 @@ def setUpModule(): ssl_port = get_free_port('https') test_server = run_server(host, port, ssl_port, test_mode=True, model=model) + # Create fake ISO to do the tests + construct_fake_iso(fake_iso, True, '12.04', 'ubuntu') + def tearDownModule(): test_server.stop() os.unlink('/tmp/obj-store-test') + os.unlink(fake_iso) class AuthorizationTests(unittest.TestCase): @@ -103,7 +107,7 @@ class AuthorizationTests(unittest.TestCase): # but he can get and create a new one resp = self.request('/templates', '{}', 'GET') self.assertEquals(403, resp.status) - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(403, resp.status) resp = self.request('/templates/test', '{}', 'PUT') @@ -112,7 +116,7 @@ class AuthorizationTests(unittest.TestCase): self.assertEquals(403, resp.status) # Non-root users can only get vms authorized to them - model.templates_create({'name': u'test', 'cdrom': '/nonexistent.iso'}) + model.templates_create({'name': u'test', 'cdrom': fake_iso}) model.vms_create({'name': u'test-me', 'template': '/templates/test'}) model.vm_update(u'test-me', @@ -121,11 +125,13 @@ class AuthorizationTests(unittest.TestCase): model.vms_create({'name': u'test-usera', 'template': '/templates/test'}) - model.vm_update(u'test-usera', {'users': ['userA'], 'groups': []}) + + non_root = list(set(model.users_get_list()) - set(['root']))[0] + model.vm_update(u'test-usera', {'users': [non_root], 'groups': []}) model.vms_create({'name': u'test-groupa', 'template': '/templates/test'}) - model.vm_update(u'test-groupa', {'groups': ['groupA']}) + model.vm_update(u'test-groupa', {'groups': ['wheel']}) resp = self.request('/vms', '{}', 'GET') self.assertEquals(200, resp.status) @@ -136,11 +142,12 @@ class AuthorizationTests(unittest.TestCase): self.assertEquals(403, resp.status) # Create a vm using mockmodel directly to test Resource access - model.vms_create({'name': 'test', 'template': '/templates/test'}) + model.vms_create({'name': 'kimchi-test', + 'template': '/templates/test'}) - resp = self.request('/vms/test', '{}', 'PUT') + resp = self.request('/vms/kimchi-test', '{}', 'PUT') self.assertEquals(403, resp.status) - resp = self.request('/vms/test', '{}', 'DELETE') + resp = self.request('/vms/kimchi-test', '{}', 'DELETE') self.assertEquals(403, resp.status) # Non-root users can only update VMs authorized by them @@ -150,4 +157,4 @@ class AuthorizationTests(unittest.TestCase): self.assertEquals(403, resp.status) model.template_delete('test') - model.vm_delete('test') + model.vm_delete('test-me') diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py index 4276832..29354aa 100644 --- a/tests/test_mockmodel.py +++ b/tests/test_mockmodel.py @@ -38,24 +38,29 @@ ssl_port = None fake_iso = None +def setUpModule(): + global host, port, ssl_port, model, test_server, fake_iso + cherrypy.request.headers = {'Accept': 'application/json'} + model = kimchi.mockmodel.MockModel('/tmp/obj-store-test') + patch_auth() + port = get_free_port('http') + ssl_port = get_free_port('https') + host = '127.0.0.1' + test_server = run_server(host, port, ssl_port, test_mode=True, + model=model) + fake_iso = '/tmp/fake.iso' + open(fake_iso, 'w').close() + + +def tearDown(): + test_server.stop() + os.unlink('/tmp/obj-store-test') + os.unlink(fake_iso) + + class MockModelTests(unittest.TestCase): def setUp(self): - global host, port, ssl_port, model, test_server, fake_iso - cherrypy.request.headers = {'Accept': 'application/json'} - model = kimchi.mockmodel.MockModel('/tmp/obj-store-test') - patch_auth() - port = get_free_port('http') - ssl_port = get_free_port('https') - host = '127.0.0.1' - test_server = run_server(host, port, ssl_port, test_mode=True, - model=model) - fake_iso = '/tmp/fake.iso' - open(fake_iso, 'w').close() - - def tearDown(self): - test_server.stop() - os.unlink('/tmp/obj-store-test') - os.unlink(fake_iso) + model.reset() def test_collection(self): c = Collection(model) @@ -190,38 +195,37 @@ class MockModelTests(unittest.TestCase): request(host, ssl_port, '/templates', req, 'POST') def add_vm(name): - # Create a VM req = json.dumps({'name': name, 'template': '/templates/test'}) request(host, ssl_port, '/vms', req, 'POST') - add_vm('bca') - add_vm('xba') - add_vm('abc') - add_vm('cab') + vms = [u'abc', u'bca', u'cab', u'xba'] + for vm in vms: + add_vm(vm) - self.assertEqual(model.vms_get_list(), ['abc', 'bca', 'cab', 'xba']) + vms.append(u'test') + self.assertEqual(model.vms_get_list(), sorted(vms)) def test_vm_info(self): model.templates_create({'name': u'test', 'cdrom': fake_iso}) - model.vms_create({'name': u'test', 'template': '/templates/test'}) + model.vms_create({'name': u'test-vm', 'template': '/templates/test'}) vms = model.vms_get_list() - self.assertEquals(1, len(vms)) - self.assertEquals(u'test', vms[0]) + self.assertEquals(2, len(vms)) + self.assertIn(u'test-vm', vms) keys = set(('name', 'state', 'stats', 'uuid', 'memory', 'cpus', 'screenshot', 'icon', 'graphics', 'users', 'groups', - 'access')) + 'access', 'persistent')) stats_keys = set(('cpu_utilization', 'net_throughput', 'net_throughput_peak', 'io_throughput', 'io_throughput_peak')) - info = model.vm_lookup(u'test') + info = model.vm_lookup(u'test-vm') self.assertEquals(keys, set(info.keys())) self.assertEquals('shutoff', info['state']) - self.assertEquals('test', info['name']) + self.assertEquals('test-vm', info['name']) self.assertEquals(1024, info['memory']) self.assertEquals(1, info['cpus']) self.assertEquals('images/icon-vm.png', info['icon']) diff --git a/tests/test_model.py b/tests/test_model.py index f4d842f..cb83e78 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -43,6 +43,7 @@ from kimchi.exception import ImageFormatError, InvalidOperation from kimchi.exception import InvalidParameter, NotFoundError, OperationFailed from kimchi.iscsi import TargetClient from kimchi.model import model +from kimchi.model.libvirtconnection import LibvirtConnection from kimchi.rollbackcontext import RollbackContext from kimchi.utils import add_task @@ -63,6 +64,10 @@ class ModelTests(unittest.TestCase): iso_gen.construct_fake_iso(self.kimchi_iso, True, '12.04', 'ubuntu') def tearDown(self): + # FIXME: Tests using 'test:///default' URI should be moved to + # test_rest or test_mockmodel to avoid overriding problems + LibvirtConnection._connections['test:///default'] = {} + os.unlink(self.tmp_store) shutil.rmtree(self.iso_path) diff --git a/tests/test_rest.py b/tests/test_rest.py index 6770647..7f14b50 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -21,23 +21,22 @@ import base64 import json import os -import random import re import requests import shutil import time import unittest import urllib2 - +import urlparse from functools import partial - import iso_gen import kimchi.mockmodel import kimchi.server from kimchi.config import paths from kimchi.rollbackcontext import RollbackContext +from kimchi.utils import add_task from utils import get_free_port, patch_auth, request from utils import run_server, wait_task @@ -48,8 +47,7 @@ host = None port = None ssl_port = None cherrypy_port = None - -# utils.silence_server() +fake_iso = '/tmp/fake.iso' def setUpModule(): @@ -64,10 +62,14 @@ def setUpModule(): test_server = run_server(host, port, ssl_port, test_mode=True, cherrypy_port=cherrypy_port, model=model) + # Create fake ISO to do the tests + iso_gen.construct_fake_iso(fake_iso, True, '12.04', 'ubuntu') + def tearDownModule(): test_server.stop() os.unlink('/tmp/obj-store-test') + os.unlink(fake_iso) class RestTests(unittest.TestCase): @@ -172,15 +174,17 @@ class RestTests(unittest.TestCase): def test_get_vms(self): vms = json.loads(self.request('/vms').read()) - self.assertEquals(0, len(vms)) + # test_rest.py uses MockModel() which connects to libvirt URI + # test:///default. By default this driver already has one VM created + self.assertEquals(1, len(vms)) # Create a template as a base for our VMs - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) - test_users = ['user1', 'user2', 'root'] - test_groups = ['group1', 'group2', 'admin'] + test_users = ['root'] + test_groups = ['wheel'] # Now add a couple of VMs to the mock model for i in xrange(10): name = 'vm-%i' % i @@ -190,16 +194,16 @@ class RestTests(unittest.TestCase): self.assertEquals(201, resp.status) vms = json.loads(self.request('/vms').read()) - self.assertEquals(10, len(vms)) + self.assertEquals(11, len(vms)) vm = json.loads(self.request('/vms/vm-1').read()) self.assertEquals('vm-1', vm['name']) self.assertEquals('shutoff', vm['state']) - self.assertEquals(test_users, vm['users']) - self.assertEquals(test_groups, vm['groups']) + self.assertEquals([], vm['users']) + self.assertEquals([], vm['groups']) def test_edit_vm(self): - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -311,7 +315,7 @@ class RestTests(unittest.TestCase): # Create a Template req = json.dumps({'name': 'test', 'disks': [{'size': 1}], 'icon': 'images/icon-debian.png', - 'cdrom': '/nonexistent.iso'}) + 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -326,8 +330,8 @@ class RestTests(unittest.TestCase): self.assertEquals('images/icon-debian.png', vm['icon']) # Verify the volume was created - vol_uri = '/storagepools/default/storagevolumes/%s-0.img' % vm['uuid'] - resp = self.request(vol_uri) + vol_uri = '/storagepools/default-pool/storagevolumes/%s-0.img' + resp = self.request(vol_uri % vm['uuid']) vol = json.loads(resp.read()) self.assertEquals(1 << 30, vol['capacity']) self.assertEquals(1, vol['ref_cnt']) @@ -391,11 +395,11 @@ class RestTests(unittest.TestCase): self.assertEquals(204, resp.status) # Verify the volume was deleted - self.assertHTTPStatus(404, vol_uri) + self.assertHTTPStatus(404, vol_uri % vm['uuid']) def test_vm_graphics(self): # Create a Template - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -475,7 +479,7 @@ class RestTests(unittest.TestCase): with RollbackContext() as rollback: # Create a template as a base for our VMs - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) # Delete the template @@ -574,8 +578,6 @@ class RestTests(unittest.TestCase): self.assertEquals(201, resp.status) cd_info = json.loads(resp.read()) self.assertEquals('disk', cd_info['type']) - self.assertEquals('tmp', cd_info['pool']) - self.assertEquals('attach-volume', cd_info['vol']) # Attach a cdrom with existent dev name req = json.dumps({'type': 'cdrom', @@ -592,24 +594,29 @@ class RestTests(unittest.TestCase): os.remove('/tmp/existent.iso') # Change path of storage cdrom - req = json.dumps({'path': 'http://myserver.com/myiso.iso'}) - resp = self.request('/vms/test-vm/storages/'+cd_dev, req, 'PUT') + cdrom = u'http://fedora.mirrors.tds.net/pub/fedora/releases/20/'\ + 'Live/x86_64/Fedora-Live-Desktop-x86_64-20-1.iso' + req = json.dumps({'path': cdrom}) + resp = self.request('/vms/test-vm/storages/' + cd_dev, req, 'PUT') self.assertEquals(200, resp.status) cd_info = json.loads(resp.read()) - self.assertEquals('http://myserver.com/myiso.iso', cd_info['path']) + self.assertEquals(urlparse.urlparse(cdrom).path, + urlparse.urlparse(cd_info['path']).path) # Test GET devs = json.loads(self.request('/vms/test-vm/storages').read()) self.assertEquals(4, len(devs)) # Detach storage cdrom - resp = self.request('/vms/test-vm/storages/'+cd_dev, + resp = self.request('/vms/test-vm/storages/' + cd_dev, '{}', 'DELETE') self.assertEquals(204, resp.status) # Test GET devs = json.loads(self.request('/vms/test-vm/storages').read()) self.assertEquals(3, len(devs)) + resp = self.request('/storagepools/tmp/deactivate', {}, 'POST') + self.assertEquals(200, resp.status) resp = self.request('/storagepools/tmp', {}, 'DELETE') self.assertEquals(204, resp.status) @@ -617,7 +624,7 @@ class RestTests(unittest.TestCase): with RollbackContext() as rollback: # Create a template as a base for our VMs - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) # Delete the template @@ -651,7 +658,7 @@ class RestTests(unittest.TestCase): iface['mac']).read()) self.assertEquals('default', res['network']) self.assertEquals(17, len(res['mac'])) - self.assertEquals('virtio', res['model']) + self.assertEquals('e1000', res['model']) # attach network interface to vm req = json.dumps({"type": "network", @@ -667,12 +674,12 @@ class RestTests(unittest.TestCase): self.assertEquals('network', iface['type']) # update vm interface - req = json.dumps({"network": "default", "model": "e1000"}) + req = json.dumps({"network": "default", "model": "virtio"}) resp = self.request('/vms/test-vm/ifaces/%s' % iface['mac'], req, 'PUT') self.assertEquals(200, resp.status) update_iface = json.loads(resp.read()) - self.assertEquals('e1000', update_iface['model']) + self.assertEquals(u'virtio', update_iface['model']) self.assertEquals('default', update_iface['network']) # detach network interface from vm @@ -682,7 +689,7 @@ class RestTests(unittest.TestCase): def test_vm_customise_storage(self): # Create a Template - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso', + req = json.dumps({'name': 'test', 'cdrom': fake_iso, 'disks': [{'size': 1}]}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -707,7 +714,7 @@ class RestTests(unittest.TestCase): # Test template not changed after vm customise its pool t = json.loads(self.request('/templates/test').read()) - self.assertEquals(t['storagepool'], '/storagepools/default') + self.assertEquals(t['storagepool'], '/storagepools/default-pool') # Verify the volume was created vol_uri = '/storagepools/alt/storagevolumes/%s-0.img' % vm_info['uuid'] @@ -726,35 +733,34 @@ class RestTests(unittest.TestCase): # Create scsi fc pool req = json.dumps({'name': 'scsi_fc_pool', 'type': 'scsi', - 'source': {'adapter_name': 'scsi_host3'}}) + 'source': {'adapter_name': 'scsi_host2'}}) resp = self.request('/storagepools', req, 'POST') self.assertEquals(201, resp.status) - # Create template with this pool - req = json.dumps({'name': 'test_fc_pool', 'cdrom': '/nonexistent.iso', - 'storagepool': '/storagepools/scsi_fc_pool'}) - resp = self.request('/templates', req, 'POST') - self.assertEquals(201, resp.status) - # Test create vms using lun of this pool # activate the storage pool resp = self.request('/storagepools/scsi_fc_pool/activate', '{}', 'POST') - # Get scsi pool luns and choose one + # Create template fails because SCSI volume is missing + tmpl_params = {'name': 'test_fc_pool', 'cdrom': fake_iso, + 'storagepool': '/storagepools/scsi_fc_pool'} + req = json.dumps(tmpl_params) + resp = self.request('/templates', req, 'POST') + self.assertEquals(400, resp.status) + + # Choose SCSI volume to create template resp = self.request('/storagepools/scsi_fc_pool/storagevolumes') - luns = json.loads(resp.read()) - lun_name = random.choice(luns).get('name') + lun_name = json.loads(resp.read())[0]['name'] - # Create vm in scsi pool without volumes: Error - req = json.dumps({'template': '/templates/test_fc_pool'}) - resp = self.request('/vms', req, 'POST') - self.assertEquals(400, resp.status) + tmpl_params['disks'] = [{'index': 0, 'volume': lun_name}] + req = json.dumps(tmpl_params) + resp = self.request('/templates', req, 'POST') + self.assertEquals(201, resp.status) # Create vm in scsi pool req = json.dumps({'name': 'test-vm', - 'template': '/templates/test_fc_pool', - 'volumes': [lun_name]}) + 'template': '/templates/test_fc_pool'}) resp = self.request('/vms', req, 'POST') self.assertEquals(201, resp.status) @@ -773,7 +779,7 @@ class RestTests(unittest.TestCase): self.assertEquals(204, resp.status) def test_template_customise_storage(self): - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso', + req = json.dumps({'name': 'test', 'cdrom': fake_iso, 'disks': [{'size': 1}]}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -825,7 +831,7 @@ class RestTests(unittest.TestCase): def test_template_customise_network(self): with RollbackContext() as rollback: - tmpl = {'name': 'test', 'cdrom': '/nonexistent.iso', + tmpl = {'name': 'test', 'cdrom': fake_iso, 'disks': [{'size': 1}]} req = json.dumps(tmpl) resp = self.request('/templates', req, 'POST') @@ -883,7 +889,7 @@ class RestTests(unittest.TestCase): def test_unnamed_vms(self): # Create a Template - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -893,7 +899,7 @@ class RestTests(unittest.TestCase): vm = json.loads(self.request('/vms', req, 'POST').read()) self.assertEquals('test-vm-%i' % i, vm['name']) count = len(json.loads(self.request('/vms').read())) - self.assertEquals(5, count) + self.assertEquals(6, count) def test_create_vm_without_template(self): req = json.dumps({'name': 'vm-without-template'}) @@ -912,7 +918,7 @@ class RestTests(unittest.TestCase): def test_create_vm_with_img_based_template(self): resp = json.loads( - self.request('/storagepools/default/storagevolumes').read()) + self.request('/storagepools/default-pool/storagevolumes').read()) self.assertEquals(0, len(resp)) # Create a Template @@ -927,14 +933,13 @@ class RestTests(unittest.TestCase): # Test storage volume created with backing store of base file resp = json.loads( - self.request('/storagepools/default/storagevolumes').read()) + self.request('/storagepools/default-pool/storagevolumes').read()) self.assertEquals(1, len(resp)) - self.assertEquals(mock_base, resp[0]['base']['path']) def test_get_storagepools(self): storagepools = json.loads(self.request('/storagepools').read()) self.assertEquals(2, len(storagepools)) - self.assertEquals('default', storagepools[0]['name']) + self.assertEquals('default-pool', storagepools[0]['name']) self.assertEquals('active', storagepools[0]['state']) self.assertEquals('kimchi_isos', storagepools[1]['name']) self.assertEquals('kimchi-iso', storagepools[1]['type']) @@ -1066,12 +1071,13 @@ class RestTests(unittest.TestCase): self.assertEquals('/var/lib/libvirt/images/volume-1', storagevolume['path']) - req = json.dumps({'url': 'https://anyurl.wor.kz'}) + url = 'https://github.com/kimchi-project/kimchi/blob/master/COPYING' + req = json.dumps({'url': url}) resp = self.request('/storagepools/pool-1/storagevolumes', req, 'POST') self.assertEquals(202, resp.status) task = json.loads(resp.read()) vol_name = task['target_uri'].split('/')[-1] - self.assertEquals('anyurl.wor.kz', vol_name) + self.assertEquals('COPYING', vol_name) wait_task(self._task_lookup, task['id']) task = json.loads(self.request('/tasks/%s' % task['id']).read()) self.assertEquals('finished', task['status']) @@ -1095,13 +1101,16 @@ class RestTests(unittest.TestCase): cloned_vol = json.loads(resp.read()) self.assertNotEquals(vol['name'], cloned_vol['name']) - del vol['name'] - del cloned_vol['name'] self.assertNotEquals(vol['path'], cloned_vol['path']) - del vol['path'] - del cloned_vol['path'] + for key in ['name', 'path', 'allocation']: + del vol[key] + del cloned_vol[key] + self.assertEquals(vol, cloned_vol) + resp = self.request('/storagepools/pool-1/deactivate', '{}', 'POST') + self.assertEquals(200, resp.status) + # Now remove the StoragePool from mock model self._delete_pool('pool-1') @@ -1138,7 +1147,7 @@ class RestTests(unittest.TestCase): resp = self.request(uri, req, 'POST') uri = '/storagepools/pool-2/storagevolumes/test-volume' storagevolume = json.loads(self.request(uri).read()) - self.assertEquals(768, storagevolume['capacity']) + self.assertEquals(768 << 20, storagevolume['capacity']) # Wipe the storage volume uri = '/storagepools/pool-2/storagevolumes/test-volume/wipe' @@ -1152,6 +1161,9 @@ class RestTests(unittest.TestCase): '{}', 'DELETE') self.assertEquals(204, resp.status) + resp = self.request('/storagepools/pool-2/deactivate', '{}', 'POST') + self.assertEquals(200, resp.status) + # Now remove the StoragePool from mock model self._delete_pool('pool-2') @@ -1198,7 +1210,7 @@ class RestTests(unittest.TestCase): open('/tmp/mock.img', 'w').close() t = {'name': 'test_img_template', 'os_distro': 'ImagineOS', 'os_version': '1.0', 'memory': 1024, 'cpus': 1, - 'storagepool': '/storagepools/alt', + 'storagepool': '/storagepools/default-pool', 'disks': [{'base': '/tmp/mock.img'}]} req = json.dumps(t) resp = self.request('/templates', req, 'POST') @@ -1210,8 +1222,8 @@ class RestTests(unittest.TestCase): graphics = {'type': 'spice', 'listen': '127.0.0.1'} t = {'name': 'test', 'os_distro': 'ImagineOS', 'os_version': '1.0', 'memory': 1024, 'cpus': 1, - 'storagepool': '/storagepools/alt', 'cdrom': '/tmp/mock.iso', - 'graphics': graphics} + 'storagepool': '/storagepools/default-pool', + 'cdrom': '/tmp/mock.iso', 'graphics': graphics} req = json.dumps(t) resp = self.request('/templates', req, 'POST') self.assertEquals(201, resp.status) @@ -1239,8 +1251,8 @@ class RestTests(unittest.TestCase): # Create a template with same name fails with 400 t = {'name': 'test', 'os_distro': 'ImagineOS', 'os_version': '1.0', 'memory': 1024, 'cpus': 1, - 'storagepool': '/storagepools/default', - 'cdrom': '/nonexistent.iso'} + 'storagepool': '/storagepools/default-pool', + 'cdrom': fake_iso} req = json.dumps(t) resp = self.request('/templates', req, 'POST') self.assertEquals(400, resp.status) @@ -1388,15 +1400,15 @@ class RestTests(unittest.TestCase): 'DELETE') self.assertEquals(204, resp.status) - # Delete the storagepool + # Try to delete the storagepool + # It should fail as it is associated to a template resp = request(host, ssl_port, '/storagepools/test-storagepool', '{}', 'DELETE') - self.assertEquals(204, resp.status) + self.assertEquals(400, resp.status) # Verify the template res = json.loads(self.request('/templates/test').read()) self.assertEquals(res['invalid']['cdrom'], [iso]) - self.assertEquals(res['invalid']['storagepools'], ['test-storagepool']) # Delete the template resp = request(host, ssl_port, '/templates/test', '{}', 'DELETE') @@ -1414,15 +1426,15 @@ class RestTests(unittest.TestCase): storagevolume = json.loads(self.request( '/storagepools/kimchi_isos/storagevolumes/').read())[0] - self.assertEquals('pool-3-fedora.iso', storagevolume['name']) + self.assertEquals('fedora.iso', storagevolume['name']) self.assertEquals('iso', storagevolume['format']) self.assertEquals('/var/lib/libvirt/images/fedora.iso', storagevolume['path']) self.assertEquals(1024 << 20, storagevolume['capacity']) - self.assertEquals(1024 << 20, storagevolume['allocation']) - self.assertEquals('17', storagevolume['os_version']) - self.assertEquals('fedora', storagevolume['os_distro']) - self.assertEquals(True, storagevolume['bootable']) + self.assertEquals(0, storagevolume['allocation']) + self.assertEquals('unknown', storagevolume['os_version']) + self.assertEquals('unknown', storagevolume['os_distro']) + self.assertEquals(False, storagevolume['bootable']) # Create a template # In real model os distro/version can be omitted @@ -1437,8 +1449,8 @@ class RestTests(unittest.TestCase): # Verify the template t = json.loads(self.request('/templates/test').read()) self.assertEquals('test', t['name']) - self.assertEquals('fedora', t['os_distro']) - self.assertEquals('17', t['os_version']) + self.assertEquals('unknown', t['os_distro']) + self.assertEquals('unknown', t['os_version']) self.assertEquals(1024, t['memory']) # Deactivate or destroy scan pool return 405 @@ -1454,22 +1466,25 @@ class RestTests(unittest.TestCase): resp = self.request('/templates/%s' % t['name'], '{}', 'DELETE') self.assertEquals(204, resp.status) + resp = self.request('/storagepools/pool-3/deactivate', '{}', 'POST') + self.assertEquals(200, resp.status) self._delete_pool('pool-3') def test_screenshot_refresh(self): # Create a VM - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) resp = self.request('/templates', req, 'POST') req = json.dumps({'name': 'test-vm', 'template': '/templates/test'}) - self.request('/vms', req, 'POST') + resp = self.request('/vms', req, 'POST') # Test screenshot for shut-off state vm resp = self.request('/vms/test-vm/screenshot') self.assertEquals(404, resp.status) # Test screenshot for running vm - self.request('/vms/test-vm/start', '{}', 'POST') + resp = self.request('/vms/test-vm/start', '{}', 'POST') vm = json.loads(self.request('/vms/test-vm').read()) + resp = self.request(vm['screenshot'], method='HEAD') self.assertEquals(200, resp.status) self.assertTrue(resp.getheader('Content-type').startswith('image')) @@ -1589,9 +1604,9 @@ class RestTests(unittest.TestCase): return json.loads(self.request('/tasks/%s' % taskid).read()) def test_tasks(self): - id1 = model.add_task('/tasks/1', self._async_op) - id2 = model.add_task('/tasks/2', self._except_op) - id3 = model.add_task('/tasks/3', self._intermid_op) + id1 = add_task('/tasks/1', self._async_op, model.objstore) + id2 = add_task('/tasks/2', self._except_op, model.objstore) + id3 = add_task('/tasks/3', self._intermid_op, model.objstore) target_uri = urllib2.quote('^/tasks/*', safe="") filter_data = 'status=running&target_uri=%s' % target_uri @@ -1621,9 +1636,9 @@ class RestTests(unittest.TestCase): resp = self.request('/config/capabilities').read() conf = json.loads(resp) - keys = ['libvirt_stream_protocols', 'qemu_stream', 'qemu_spice', - 'screenshot', 'system_report_tool', 'update_tool', - 'repo_mngt_tool', 'federation'] + keys = [u'libvirt_stream_protocols', u'qemu_stream', u'qemu_spice', + u'screenshot', u'system_report_tool', u'update_tool', + u'repo_mngt_tool', u'federation', u'kernel_vfio'] self.assertEquals(sorted(keys), sorted(conf.keys())) def test_peers(self): @@ -1803,13 +1818,10 @@ class RestTests(unittest.TestCase): def test_host(self): resp = self.request('/host').read() info = json.loads(resp) - self.assertEquals('Red Hat Enterprise Linux Server', info['os_distro']) - self.assertEquals('6.4', info['os_version']) - self.assertEquals('Santiago', info['os_codename']) - self.assertEquals('Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz', - info['cpu_model']) - self.assertEquals(6114058240, info['memory']) - self.assertEquals(4, info['cpus']) + + keys = ['os_distro', 'os_version', 'os_codename', 'cpu_model', + 'memory', 'cpus'] + self.assertEquals(sorted(keys), sorted(info.keys())) def test_hoststats(self): stats_keys = ['cpu_utilization', 'memory', 'disk_read_rate', @@ -1856,14 +1868,14 @@ class RestTests(unittest.TestCase): resp = self.request('/tasks/' + task[u'id'], None, 'GET') task_info = json.loads(resp.read()) self.assertEquals(task_info['status'], 'running') - time.sleep(6) + wait_task(self._task_lookup, task_info['id']) resp = self.request('/tasks/' + task[u'id'], None, 'GET') task_info = json.loads(resp.read()) self.assertEquals(task_info['status'], 'finished') self.assertIn(u'All packages updated', task_info['message']) def test_get_param(self): - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'}) + req = json.dumps({'name': 'test', 'cdrom': fake_iso}) self.request('/templates', req, 'POST') # Create a VM @@ -1877,13 +1889,15 @@ class RestTests(unittest.TestCase): resp = request(host, ssl_port, '/vms') self.assertEquals(200, resp.status) res = json.loads(resp.read()) - self.assertEquals(2, len(res)) + self.assertEquals(3, len(res)) + # FIXME: control/base.py also allows filter by regex so it is returning + # 2 vms when querying for 'test-vm1': 'test' and 'test-vm1' resp = request(host, ssl_port, '/vms?name=test-vm1') self.assertEquals(200, resp.status) res = json.loads(resp.read()) - self.assertEquals(1, len(res)) - self.assertEquals('test-vm1', res[0]['name']) + self.assertEquals(2, len(res)) + self.assertIn('test-vm1', [r['name'] for r in res]) def test_repositories(self): def verify_repo(t, res): @@ -1897,25 +1911,6 @@ class RestTests(unittest.TestCase): # Already have one repo in Kimchi's system self.assertEquals(1, len(json.loads(resp.read()))) - invalid_urls = ['www.fedora.org', # missing protocol - '://www.fedora.org', # missing protocol - 'http://www.fedora', # invalid domain name - 'file:///home/userdoesnotexist'] # invalid path - - # Create repositories with invalid baseurl - for url in invalid_urls: - repo = {'repo_id': 'fedora-fake', 'baseurl': url} - req = json.dumps(repo) - resp = self.request(base_uri, req, 'POST') - self.assertEquals(400, resp.status) - - # Create repositories with invalid mirrorlist - for url in invalid_urls: - repo = {'repo_id': 'fedora-fake', 'mirrorlist': url} - req = json.dumps(repo) - resp = self.request(base_uri, req, 'POST') - self.assertEquals(400, resp.status) - # Create a repository repo = {'repo_id': 'fedora-fake', 'baseurl': 'http://www.fedora.org'} @@ -1927,22 +1922,6 @@ class RestTests(unittest.TestCase): res = json.loads(self.request('%s/fedora-fake' % base_uri).read()) verify_repo(repo, res) - # Update repositories with invalid baseurl - for url in invalid_urls: - params = {} - params['baseurl'] = url - resp = self.request('%s/fedora-fake' % base_uri, - json.dumps(params), 'PUT') - self.assertEquals(400, resp.status) - - # Update repositories with invalid mirrorlist - for url in invalid_urls: - params = {} - params['mirrorlist'] = url - resp = self.request('%s/fedora-fake' % base_uri, - json.dumps(params), 'PUT') - self.assertEquals(400, resp.status) - # Update the repository params = {} params['baseurl'] = repo['baseurl'] = 'http://www.fedoraproject.org' @@ -1969,7 +1948,7 @@ class RestTests(unittest.TestCase): with RollbackContext() as rollback: vol_path = os.path.join(paths.get_prefix(), 'COPYING') - url = "https://%s:%s/storagepools/default/storagevolumes" % \ + url = "https://%s:%s/storagepools/default-pool/storagevolumes" % \ (host, ssl_port) with open(vol_path, 'rb') as fd: @@ -1981,8 +1960,8 @@ class RestTests(unittest.TestCase): self.assertEquals(r.status_code, 202) task = r.json() wait_task(self._task_lookup, task['id']) - resp = self.request('/storagepools/default/storagevolumes/%s' % - task['target_uri'].split('/')[-1]) + uri = '/storagepools/default-pool/storagevolumes/%s' + resp = self.request(uri % task['target_uri'].split('/')[-1]) self.assertEquals(200, resp.status) # Create a file with 3M to upload @@ -2001,8 +1980,7 @@ class RestTests(unittest.TestCase): self.assertEquals(r.status_code, 202) task = r.json() wait_task(self._task_lookup, task['id'], 15) - resp = self.request('/storagepools/default/storagevolumes/%s' % - task['target_uri'].split('/')[-1]) + resp = self.request(uri % task['target_uri'].split('/')[-1]) self.assertEquals(200, resp.status) -- 1.9.3

Reviewed-by: Crístian Viana <vianac@linux.vnet.ibm.com> On 14-11-2014 17:26, Aline Manera wrote:
V2 -> V3: - Fix "make check-local" errors - Fix typo
V1 -> V2: - Fix some tests cases - Update volune ref_cnt when VM is deleted
Aline Manera (1): MockModel refactor: Create MockModel based on Model("test:///default")
src/kimchi/i18n.py | 2 - src/kimchi/mockmodel.py | 1771 ++++++++----------------------------------- src/kimchi/vmtemplate.py | 9 +- tests/run_tests.sh.in | 18 +- tests/test_authorization.py | 27 +- tests/test_mockmodel.py | 60 +- tests/test_model.py | 5 + tests/test_rest.py | 254 +++---- 8 files changed, 503 insertions(+), 1643 deletions(-)
participants (2)
-
Aline Manera
-
Crístian Viana