[Kimchi-devel] [PATCH 6/6] MockModel refactor: Create MockModel based on Model("test:///default")
Daniel Henrique Barboza
danielhb at linux.vnet.ibm.com
Thu Nov 13 15:58:29 UTC 2014
Reviewed-by: Daniel Barboza <danielhb at linux.vnet.ibm.com>
On 11/11/2014 06:46 PM, Aline Manera wrote:
> The mockmodel was designed to provide a way to user tests Kimchi
> without affecting the system (by "kimchid --test") and also to make the
> tests easier to do.
> But in fact, we have a bunch of code that completely differs from
> the real model, so the developer needs to do 2 kinds of implementations
> while developing a new feature.
>
> This patch change MockModel to be Model("test:///default") and only
> overrides what is not supported by the libvirt Test Driver.
>
> It also fixes the test cases after that change: only delete a inactive
> storage pool, the Test Driver already has on VM created on start up, the
> ISO file must be an existing path, etc.
>
> The run_tests.sh.in script was also updated to run the test_model.py
> prior to the MockModel tests as the MockModel will override some Model
> methods which can cause problems if the MockModel runs before Model.
>
> Signed-off-by: Aline Manera <alinefm at linux.vnet.ibm.com>
> ---
> src/kimchi/mockmodel.py | 1769 ++++++++-----------------------------------
> src/kimchi/vmtemplate.py | 9 +-
> tests/run_tests.sh.in | 18 +-
> tests/test_authorization.py | 25 +-
> tests/test_mockmodel.py | 60 +-
> tests/test_rest.py | 254 +++----
> 6 files changed, 495 insertions(+), 1640 deletions(-)
>
> diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
> index 626ef35..7d3eb38 100644
> --- a/src/kimchi/mockmodel.py
> +++ b/src/kimchi/mockmodel.py
> @@ -14,1105 +14,291 @@
> # Lesser General Public License for more details.
> #
> # You should have received a copy of the GNU Lesser General Public
> -# License along with this library; if not, write to the
> -# Free Software Foundation, Inc.
> -# 51 Franklin Street, Fifth Floor,
> -# Boston, MA 02110-1301 USA
> -
> -import cherrypy
> -import copy
> -import disks
> -import glob
> -import ipaddr
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> +
> +import libvirt
> +import lxml.etree as ET
> import os
> -import shutil
> -import psutil
> import random
> -import string
> import time
> -import uuid
> -
> -
> -try:
> - from PIL import Image
> - from PIL import ImageDraw
> -except ImportError:
> - import Image
> - import ImageDraw
>
> +from lxml import objectify
>
> from kimchi import config
> -from kimchi.asynctask import AsyncTask
> -from kimchi.config import READONLY_POOL_TYPE, config as kconfig
> -from kimchi.distroloader import DistroLoader
> -from kimchi.exception import InvalidOperation, InvalidParameter
> -from kimchi.exception import MissingParameter, NotFoundError, OperationFailed
> -from kimchi.model.storagepools import ISO_POOL_NAME
> -from kimchi.model.storageservers import STORAGE_SERVERS
> -from kimchi.model.utils import get_vm_name
> +from kimchi import imageinfo
> +from kimchi import osinfo
> +from kimchi.model.debugreports import DebugReportsModel
> +from kimchi.model.host import DeviceModel
> +from kimchi.model.model import Model
> +from kimchi.model.storagevolumes import StorageVolumesModel
> +from kimchi.model.templates import LibvirtVMTemplate
> from kimchi.objectstore import ObjectStore
> -from kimchi.screenshot import VMScreenshot
> -from kimchi.utils import get_next_clone_name, pool_name_from_uri
> -from kimchi.utils import validate_repo_url, template_name_from_uri
> +from kimchi.utils import add_task
> from kimchi.vmtemplate import VMTemplate
> +from kimchi.xmlutils.utils import xml_item_update
>
>
> fake_user = {'admin': 'letmein!'}
> +mockmodel_defaults = {'storagepool': '/storagepools/default-pool',
> + 'domain': 'test', 'arch': 'i686'}
> +
>
> +class MockModel(Model):
> + _mock_vms = {}
> + _XMLDesc = libvirt.virDomain.XMLDesc
> + _defineXML = libvirt.virConnect.defineXML
> + _undefineDomain = libvirt.virDomain.undefine
> + _libvirt_get_vol_path = LibvirtVMTemplate._get_volume_path
>
> -class MockModel(object):
> def __init__(self, objstore_loc=None):
> - self.reset()
> - self.objstore = ObjectStore(objstore_loc)
> - self.objstore_loc = objstore_loc
> - self.distros = self._get_distros()
> -
> - def capabilities_lookup(self, *ident):
> - return {'libvirt_stream_protocols':
> - ['http', 'https', 'ftp', 'ftps', 'tftp'],
> - 'qemu_spice': True,
> - 'qemu_stream': True,
> - 'screenshot': True,
> - 'system_report_tool': True,
> - 'update_tool': True,
> - 'repo_mngt_tool': 'yum',
> - 'federation': 'off'}
> + # Override osinfo.defaults to ajust the values according to
> + # test:///default driver
> + defaults = dict(osinfo.defaults)
> + defaults.update(mockmodel_defaults)
> + osinfo.defaults = dict(defaults)
>
> - def reset(self):
> - if hasattr(self, 'objstore'):
> - self.objstore = ObjectStore(self.objstore_loc)
> - self._mock_vms = {}
> - self._mock_screenshots = {}
> - self._mock_templates = {}
> - self._mock_storagepools = {'default': MockStoragePool('default')}
> - self._mock_networks = {'default': MockNetwork('default')}
> - self._mock_interfaces = self.dummy_interfaces()
> - self._mock_swupdate = MockSoftwareUpdate()
> - self.next_taskid = 1
> - self.storagepool_activate('default')
> - self._mock_host_repositories = MockRepositories()
> self._mock_devices = MockDevices()
> + self._mock_interfaces = MockInterfaces()
> + self._mock_storagevolumes = MockStorageVolumes()
> + self._mock_swupdate = MockSoftwareUpdate()
> + self._mock_repositories = MockRepositories()
> +
> + libvirt.virConnect.defineXML = MockModel.domainDefineXML
> + libvirt.virDomain.XMLDesc = MockModel.domainXMLDesc
> + libvirt.virDomain.undefine = MockModel.undefineDomain
> + libvirt.virDomain.attachDeviceFlags = MockModel.attachDeviceFlags
> + libvirt.virDomain.detachDeviceFlags = MockModel.detachDeviceFlags
> + libvirt.virDomain.updateDeviceFlags = MockModel.updateDeviceFlags
> + libvirt.virStorageVol.resize = MockModel.volResize
> + libvirt.virStorageVol.wipePattern = MockModel.volWipePattern
> +
> + super(MockModel, self).__init__('test:///default', objstore_loc)
> + self.objstore_loc = objstore_loc
> + self.objstore = ObjectStore(objstore_loc)
>
> - def _static_vm_update(self, dom, params):
> - state = dom.info['state']
> -
> - for key, val in params.items():
> - if key == 'name':
> - if state == 'running' or params['name'] in self.vms_get_list():
> - msg_args = {'name': dom.name, 'new_name': params['name']}
> - raise InvalidParameter("KCHVM0003E", msg_args)
> -
> - del self._mock_vms[dom.name]
> - dom.name = params['name']
> - self._mock_vms[dom.name] = dom
> -
> - elif key == 'users':
> - invalid_users = set(val) - set(self.users_get_list())
> - if len(invalid_users) != 0:
> - raise InvalidParameter("KCHVM0027E",
> - {'users': ", ".join(invalid_users)})
> + # The MockModel methods are instantiated on runtime according to Model
> + # and BaseModel
> + # Because that a normal method override will not work here
> + # Instead of that we also need to do the override on runtime
> + for method in dir(self):
> + if method.startswith('_mock_'):
> + mock_method = getattr(self, method)
> + if not callable(mock_method):
> + continue
> +
> + m = method.strip('_mock_')
> + model_method = getattr(self, m)
> + setattr(self, '_model_' + m, model_method)
> + setattr(self, m, mock_method)
> +
> + DeviceModel.lookup = self._mock_device_lookup
> + StorageVolumesModel.get_list = self._mock_storagevolumes_get_list
> + DebugReportsModel._gen_debugreport_file = self._gen_debugreport_file
> + LibvirtVMTemplate._get_volume_path = self._get_volume_path
> + VMTemplate.get_iso_info = self._probe_image
> + imageinfo.probe_image = self._probe_image
>
> - elif key == 'groups':
> - invalid_groups = set(val) - set(self.groups_get_list())
> - if len(invalid_groups) != 0:
> - raise InvalidParameter("KCHVM0028E",
> - {'groups':
> - ", ".join(invalid_groups)})
> + def reset(self):
> + MockModel._mock_vms = {}
> + self._mock_swupdate = MockSoftwareUpdate()
> + self._mock_repositories = MockRepositories()
>
> - dom.info[key] = val
> + if hasattr(self, 'objstore'):
> + self.objstore = ObjectStore(self.objstore_loc)
>
> - def _live_vm_update(self, dom, params):
> - if 'graphics' not in params:
> - return
> + params = {'vms': [u'test'], 'templates': [],
> + 'networks': [u'default'], 'storagepools': [u'default-pool']}
>
> - graphics = params.pop('graphics')
> - passwd = graphics.get('passwd')
> - if passwd is None:
> - passwd = "".join(random.sample(string.ascii_letters +
> - string.digits, 8))
> + for res, items in params.iteritems():
> + resources = getattr(self, '%s_get_list' % res)()
> + for i in resources:
> + if i in items:
> + continue
>
> - expire = graphics.get('passwdValidTo')
> - if expire is not None:
> - expire = round(time.time()) + expire
> + try:
> + getattr(self, '%s_deactivate' % res[:-1])(i)
> + except:
> + pass
>
> - dom.info['graphics']["passwd"] = passwd
> - dom.info['graphics']["passwdValidTo"] = expire
> + getattr(self, '%s_delete' % res[:-1])(i)
>
> - def vm_update(self, name, params):
> - dom = self._get_vm(name)
> - self._static_vm_update(dom, params)
> - self._live_vm_update(dom, params)
> + volumes = self.storagevolumes_get_list('default-pool')
> + for v in volumes:
> + self.storagevolume_delete('default-pool', v)
>
> - return dom.name
> -
> - def vm_lookup(self, name):
> - vm = self._get_vm(name)
> - if vm.info['state'] == 'running':
> - vm.info['screenshot'] = self.vmscreenshot_lookup(name)
> - else:
> - vm.info['screenshot'] = None
> -
> - validTo = vm.info['graphics']['passwdValidTo']
> - validTo = (validTo - round(time.time()) if validTo is not None
> - else None)
> - vm.info['graphics']['passwdValidTo'] = validTo
> - return vm.info
> -
> - def vm_delete(self, name):
> - vm = self._get_vm(name)
> - self._vmscreenshot_delete(vm.uuid)
> - for disk in vm.disk_paths:
> - self.storagevolume_delete(disk['pool'], disk['volume'])
> -
> - del self._mock_vms[vm.name]
> -
> - def vm_start(self, name):
> - self._get_vm(name).info['state'] = 'running'
> -
> - def vm_poweroff(self, name):
> - self._get_vm(name).info['state'] = 'shutoff'
> -
> - def vm_shutdown(self, name):
> - self._get_vm(name).info['state'] = 'shutoff'
> -
> - def vm_reset(self, name):
> - pass
> -
> - def vm_connect(self, name):
> - pass
> -
> - def vm_clone(self, name):
> - vm = self._mock_vms[name]
> - if vm.info['state'] != u'shutoff':
> - raise InvalidParameter('KCHVM0033E', {'name': name})
> -
> - new_name = get_next_clone_name(self.vms_get_list(), name)
> -
> - taskid = self.add_task(u'/vms/%s' % new_name, self._do_clone,
> - {'name': name, 'new_name': new_name})
> - return self.task_lookup(taskid)
> -
> - def _do_clone(self, cb, params):
> - name = params['name']
> - new_name = params['new_name']
> -
> - vm = self._mock_vms[name]
> - new_vm = copy.deepcopy(vm)
> -
> - new_uuid = unicode(uuid.uuid4())
> -
> - new_vm.name = new_name
> - new_vm.info['name'] = new_name
> - new_vm.uuid = new_uuid
> - new_vm.info['uuid'] = new_uuid
> -
> - for mac, iface in new_vm.ifaces.items():
> - new_mac = MockVMIface.get_mac()
> - iface.info['mac'] = new_mac
> - new_vm.ifaces[new_mac] = iface
> -
> - storage_names = new_vm.storagedevices.keys()
> - for i, storage_name in enumerate(storage_names):
> - storage = new_vm.storagedevices[storage_name]
> - basename, ext = os.path.splitext(storage.info['path'])
> - new_path = u'%s-%d%s' % (basename, i, ext)
> - new_vm.storagedevices[storage_name].path = new_path
> -
> - self._mock_vms[new_name] = new_vm
> -
> - cb('OK', True)
> -
> - def vms_create(self, params):
> - t_name = template_name_from_uri(params['template'])
> - name = get_vm_name(params.get('name'), t_name, self._mock_vms.keys())
> - if name in self._mock_vms:
> - raise InvalidOperation("KCHVM0001E", {'name': name})
> -
> - vm_uuid = str(uuid.uuid4())
> - vm_overrides = dict()
> - pool_uri = params.get('storagepool')
> - if pool_uri:
> - vm_overrides['storagepool'] = pool_uri
> -
> - t = self._get_template(t_name, vm_overrides)
> - t.validate()
> -
> - t_info = copy.deepcopy(t.info)
> - graphics = params.get('graphics')
> - if graphics:
> - t_info.update({'graphics': graphics})
> -
> - vm = MockVM(vm_uuid, name, t_info)
> - icon = t_info.get('icon')
> - if icon:
> - vm.info['icon'] = icon
> -
> - pool = t._storage_validate()
> - if pool.info['type'] == 'scsi':
> - vm.disk_paths = []
> - if not params.get('volumes'):
> - raise MissingParameter('KCHVM0017E')
> - for vol in params['volumes']:
> - vm.disk_paths.append({'pool': pool.name,
> - 'volume': vol})
> -
> - else:
> - vm.disk_paths = t.fork_vm_storage(vm_uuid)
> -
> - index = 0
> - for disk in vm.disk_paths:
> - storagepath = self._mock_storagepools[disk['pool']].info['path']
> - fullpath = os.path.join(storagepath, disk['volume'])
> - dev_name = "hd" + string.ascii_lowercase[index]
> - params = {'dev': dev_name, 'path': fullpath, 'type': 'disk'}
> - vm.storagedevices[dev_name] = MockVMStorageDevice(params)
> - index += 1
> -
> - cdrom = "hd" + string.ascii_lowercase[index + 1]
> - if t_info.get('cdrom'):
> - cdrom_params = {
> - 'dev': cdrom, 'path': t_info['cdrom'], 'type': 'cdrom'}
> - vm.storagedevices[cdrom] = MockVMStorageDevice(cdrom_params)
> -
> - self._mock_vms[name] = vm
> - return name
> -
> - def vms_get_list(self):
> - names = self._mock_vms.keys()
> - return sorted(names, key=unicode.lower)
> -
> - def vmscreenshot_lookup(self, name):
> - vm = self._get_vm(name)
> - if vm.info['state'] != 'running':
> - raise NotFoundError("KCHVM0004E", {'name': name})
> -
> - screenshot = self._mock_screenshots.setdefault(
> - vm.uuid, MockVMScreenshot({'uuid': vm.uuid}))
> - return screenshot.lookup()
> -
> - def _vmscreenshot_delete(self, vm_uuid):
> - screenshot = self._mock_screenshots.get(vm_uuid)
> - if screenshot:
> - screenshot.delete()
> - del self._mock_screenshots[vm_uuid]
> -
> - def template_lookup(self, name):
> - t = self._get_template(name)
> - return t.validate_integrity()
> -
> - def template_delete(self, name):
> - try:
> - del self._mock_templates[name]
> - except KeyError:
> - raise NotFoundError("KCHTMPL0002E", {'name': name})
> -
> - def templates_create(self, params):
> - name = params.get('name', '').strip()
> -
> - for net_name in params.get(u'networks', []):
> - try:
> - self._get_network(net_name)
> - except NotFoundError:
> - msg_args = {'network': net_name, 'template': name}
> - raise InvalidParameter("KCHTMPL0003E", msg_args)
> -
> - if params.get('cpu_info') is None:
> - params['cpu_info'] = dict()
> -
> - t = MockVMTemplate(params, self)
> - if t.name in self._mock_templates:
> - raise InvalidOperation("KCHTMPL0001E", {'name': name})
> -
> - self._mock_templates[name] = t
> - return name
> -
> - def template_clone(self, name):
> - # set default name
> - subfixs = [v[len(name):] for v in self.templates_get_list()
> - if v.startswith(name)]
> - indexs = [int(v.lstrip("-clone")) for v in subfixs
> - if v.startswith("-clone") and
> - v.lstrip("-clone").isdigit()]
> - indexs.sort()
> - index = "1" if not indexs else str(indexs[-1] + 1)
> - clone_name = name + "-clone" + index
> -
> - temp = self.template_lookup(name)
> - temp['name'] = clone_name
> - ident = self.templates_create(temp)
> - return ident
> -
> - def template_update(self, name, params):
> - old_t = self.template_lookup(name)
> - new_t = copy.copy(old_t)
> -
> - new_t.update(params)
> - ident = name
> -
> - new_storagepool = new_t.get(u'storagepool', '')
> - try:
> - self._get_storagepool(pool_name_from_uri(new_storagepool))
> - except Exception:
> - msg_args = {'pool': new_storagepool, 'template': name}
> - raise InvalidParameter("KCHTMPL0004E", msg_args)
> -
> - for net_name in params.get(u'networks', []):
> - try:
> - self._get_network(net_name)
> - except NotFoundError:
> - msg_args = {'network': net_name, 'template': name}
> - raise InvalidParameter("KCHTMPL0003E", msg_args)
> -
> - self.template_delete(name)
> + @staticmethod
> + def domainDefineXML(conn, xml):
> + name = objectify.fromstring(xml).name.text
> try:
> - ident = self.templates_create(new_t)
> + dom = conn.lookupByName(name)
> + if not dom.isActive():
> + MockModel._mock_vms[name] = xml
> except:
> - ident = self.templates_create(old_t)
> - raise
> - return ident
> + pass
> +
> + return MockModel._defineXML(conn, xml)
> +
> + @staticmethod
> + def domainXMLDesc(dom, flags=0):
> + return MockModel._mock_vms.get(dom.name(),
> + MockModel._XMLDesc(dom, flags))
> +
> + @staticmethod
> + def undefineDomain(dom):
> + name = dom.name()
> + if name in MockModel._mock_vms.keys():
> + del MockModel._mock_vms[dom.name()]
> + return MockModel._undefineDomain(dom)
> +
> + @staticmethod
> + def attachDeviceFlags(dom, xml, flags=0):
> + old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
> + root = objectify.fromstring(old_xml)
> + dev = objectify.fromstring(xml)
> + root.devices.append(dev)
> +
> + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
> +
> + @staticmethod
> + def _get_device_node(dom, xml):
> + xpath_map = {'disk': 'target',
> + 'interface': 'mac',
> + 'graphics': 'listen'}
> +
> + dev = objectify.fromstring(xml)
> + dev_id = dev.find(xpath_map[dev.tag]).items()
> +
> + dev_filter = ''
> + for key, value in dev_id:
> + dev_filter += "[@%s='%s']" % (key, value)
> +
> + old_xml = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
> + root = objectify.fromstring(old_xml)
> + devices = root.devices
> +
> + dev = devices.find("./%s/%s%s/.." % (dev.tag, xpath_map[dev.tag],
> + dev_filter))
> +
> + return (root, dev)
> +
> + @staticmethod
> + def detachDeviceFlags(dom, xml, flags=0):
> + root, dev = MockModel._get_device_node(dom, xml)
> + root.devices.remove(dev)
> +
> + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
> +
> + @staticmethod
> + def updateDeviceFlags(dom, xml, flags=0):
> + root, old_dev = MockModel._get_device_node(dom, xml)
> + root.devices.replace(old_dev, objectify.fromstring(xml))
> + MockModel._mock_vms[dom.name()] = ET.tostring(root, encoding="utf-8")
> +
> + @staticmethod
> + def volResize(vol, size, flags=0):
> + new_xml = xml_item_update(vol.XMLDesc(0), './capacity', str(size))
> + vol.delete(0)
> + pool = vol.storagePoolLookupByVolume()
> + pool.createXML(new_xml)
> +
> + @staticmethod
> + def volWipePattern(vol, algorithm, flags=0):
> + new_xml = xml_item_update(vol.XMLDesc(0), './allocation', '0')
> + vol.delete(0)
> + pool = vol.storagePoolLookupByVolume()
> + pool.createXML(new_xml)
> +
> + def _probe_image(self, path):
> + return ('unkown', 'unkown')
>
> - def templates_get_list(self):
> - return self._mock_templates.keys()
> -
> - def _get_template(self, name, overrides=None):
> - try:
> - t = self._mock_templates[name]
> - if overrides:
> - args = copy.copy(t.info)
> - args.update(overrides)
> - return MockVMTemplate(args, self)
> - else:
> - return t
> - except KeyError:
> - raise NotFoundError("KCHTMPL0002E", {'name': name})
> -
> - def debugreport_lookup(self, name):
> - path = config.get_debugreports_path()
> - file_pattern = os.path.join(path, name + '.txt')
> - try:
> - file_target = glob.glob(file_pattern)[0]
> - except IndexError:
> - raise NotFoundError("KCHDR0001E", {'name': name})
> -
> - ctime = os.stat(file_target).st_mtime
> - ctime = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(ctime))
> - file_target = os.path.split(file_target)[-1]
> - file_target = os.path.join("/data/debugreports", file_target)
> - return {'uri': file_target,
> - 'ctime': ctime}
> -
> - def debugreportcontent_lookup(self, name):
> - return self.debugreport_lookup(name)
> -
> - def debugreport_update(self, name, params):
> - path = config.get_debugreports_path()
> - file_pattern = os.path.join(path, name + '.txt')
> - try:
> - file_source = glob.glob(file_pattern)[0]
> - except IndexError:
> - raise NotFoundError("KCHDR0001E", {'name': name})
> + def _get_volume_path(self, pool, vol):
> + pool_info = self.storagepool_lookup(pool)
> + if pool_info['type'] == 'scsi':
> + return self._mock_storagevolumes.scsi_volumes[vol]['path']
>
> - file_target = file_source.replace(name, params['name'])
> - if os.path.isfile(file_target):
> - raise InvalidParameter('KCHDR0008E', {'name': params['name']})
> + return MockModel._libvirt_get_vol_path(pool, vol)
>
> - shutil.move(file_source, file_target)
> - return params['name']
> + def _gen_debugreport_file(self, name):
> + return add_task('/debugreports/%s' % name, self._create_log,
> + self.objstore, name)
>
> - def debugreport_delete(self, name):
> - path = config.get_debugreports_path()
> - file_pattern = os.path.join(path, name + '.txt')
> - try:
> - file_target = glob.glob(file_pattern)[0]
> - except IndexError:
> - raise NotFoundError("KCHDR0001E", {'name': name})
> -
> - os.remove(file_target)
> -
> - def debugreports_create(self, params):
> - ident = params.get('name').strip()
> - # Generate a name with time and millisec precision, if necessary
> - if ident is None or ident == "":
> - ident = 'report-' + str(int(time.time() * 1000))
> - else:
> - if ident in self.debugreports_get_list():
> - raise InvalidParameter("KCHDR0008E", {"name": ident})
> - taskid = self._gen_debugreport_file(ident)
> - return self.task_lookup(taskid)
> -
> - def debugreports_get_list(self):
> + def _create_log(self, cb, name):
> path = config.get_debugreports_path()
> - file_pattern = os.path.join(path, '*.txt')
> - file_lists = glob.glob(file_pattern)
> - file_lists = [os.path.split(file)[1] for file in file_lists]
> - name_lists = [file.split('.', 1)[0] for file in file_lists]
> -
> - return name_lists
> -
> - def _get_vm(self, name):
> - try:
> - return self._mock_vms[name]
> - except KeyError:
> - raise NotFoundError("KCHVM0002E", {'name': name})
> -
> - def storagepools_create(self, params):
> - try:
> - name = params['name']
> - pool = MockStoragePool(name)
> - pool.info['type'] = params['type']
> - if params['type'] == 'scsi':
> - pool.info['path'] = '/dev/disk/by-path'
> - pool.info['source'] = params['source']
> - if not pool.info['source'].get('adapter_name'):
> - raise MissingParameter('KCHPOOL0004E',
> - {'item': 'adapter_name',
> - 'name': name})
> - for vol in ['unit:0:0:1', 'unit:0:0:2',
> - 'unit:0:0:3', 'unit:0:0:4']:
> - mockvol = MockStorageVolume(name, vol,
> - dict([('type', 'lun')]))
> - pool._volumes[vol] = mockvol
> - else:
> - pool.info['path'] = params['path']
> - if params['type'] in ['dir', 'scsi']:
> - pool.info['autostart'] = True
> - else:
> - pool.info['autostart'] = False
> - except KeyError, item:
> - raise MissingParameter("KCHPOOL0004E",
> - {'item': str(item), 'name': name})
> -
> - if name in self._mock_storagepools or name in (ISO_POOL_NAME,):
> - raise InvalidOperation("KCHPOOL0001E", {'name': name})
> -
> - self._mock_storagepools[name] = pool
> - return name
> -
> - def storagepool_lookup(self, name):
> - storagepool = self._get_storagepool(name)
> - storagepool.refresh()
> - return storagepool.info
> -
> - def storagepool_update(self, name, params):
> - pool = self._get_storagepool(name)
> - if 'autostart' in params:
> - pool.info['autostart'] = params['autostart']
> - if 'disks' in params:
> - # check if pool is type 'logical'
> - if pool.info['type'] != 'logical':
> - raise InvalidOperation('KCHPOOL0029E')
> - self._update_lvm_disks(name, params['disks'])
> - ident = pool.name
> - return ident
> -
> - def storagepool_activate(self, name):
> - self._get_storagepool(name).info['state'] = 'active'
> -
> - def storagepool_deactivate(self, name):
> - self._get_storagepool(name).info['state'] = 'inactive'
> -
> - def storagepool_delete(self, name):
> - # firstly, we should check the pool actually exists
> - pool = self._get_storagepool(name)
> - del self._mock_storagepools[pool.name]
> -
> - def storagepools_get_list(self):
> - return sorted(self._mock_storagepools.keys())
> -
> - def _get_storagepool(self, name):
> - try:
> - return self._mock_storagepools[name]
> - except KeyError:
> - raise NotFoundError("KCHPOOL0002E", {'name': name})
> + tmpf = os.path.join(path, name + '.tmp')
> + realf = os.path.join(path, name + '.txt')
> + length = random.randint(1000, 10000)
> + with open(tmpf, 'w') as fd:
> + while length:
> + fd.write('I am logged')
> + length = length - 1
> + os.rename(tmpf, realf)
> + cb("OK", True)
>
> - def storagevolumes_create(self, pool_name, params):
> + def _mock_storagevolumes_create(self, pool, params):
> vol_source = ['file', 'url', 'capacity']
> - require_name_params = ['capacity']
> -
> - name = params.get('name')
> -
> index_list = list(i for i in range(len(vol_source))
> if vol_source[i] in params)
> - if len(index_list) != 1:
> - raise InvalidParameter("KCHVOL0018E",
> - {'param': ",".join(vol_source)})
> -
> create_param = vol_source[index_list[0]]
> -
> + name = params.get('name')
> if name is None:
> - if create_param in require_name_params:
> - raise InvalidParameter('KCHVOL0016E')
> -
> if create_param == 'file':
> name = os.path.basename(params['file'].filename)
> + del params['file']
> + params['capacity'] = 1024
> elif create_param == 'url':
> name = os.path.basename(params['url'])
> - else:
> - name = 'upload-%s' % int(time.time())
> + del params['url']
> + params['capacity'] = 1024
> params['name'] = name
>
> - try:
> - create_func = getattr(self, '_create_volume_with_%s' %
> - create_param)
> - except AttributeError:
> - raise InvalidParameter("KCHVOL0019E", {'param': create_param})
> -
> - pool = self._get_storagepool(pool_name)
> - if pool.info['type'] in READONLY_POOL_TYPE:
> - raise InvalidParameter("KCHVOL0012E", {'type': pool.info['type']})
> - if pool.info['state'] == 'inactive':
> - raise InvalidParameter('KCHVOL0003E', {'pool': pool_name,
> - 'volume': name})
> - if name in pool._volumes:
> - raise InvalidOperation("KCHVOL0001E", {'name': name})
> -
> - params['pool'] = pool_name
> - targeturi = '/storagepools/%s/storagevolumes/%s' % (pool_name, name)
> - taskid = self.add_task(targeturi, create_func, params)
> - return self.task_lookup(taskid)
> -
> - def _create_volume_with_file(self, cb, params):
> - upload_file = params['file']
> - params['name'] = params['name']
> - params['format'] = 'raw'
> - params['capacity'] = upload_file.fp.length
> - size = 0
> - try:
> - while True:
> - data = upload_file.file.read(8192*32)
> - if not data:
> - break
> - size += len(data)
> - cb('%s/%s' % (size, params['capacity']), True)
> - except Exception as e:
> - raise OperationFailed('KCHVOL0007E',
> - {'name': params['name'],
> - 'pool': params['pool'],
> - 'err': e.message})
> - self._create_volume_with_capacity(cb, params)
> - cb('%s/%s' % (size, params['capacity']), True)
> -
> - def _create_volume_with_capacity(self, cb, params):
> - pool_name = params.pop('pool')
> - pool = self._get_storagepool(pool_name)
> + return self._model_storagevolumes_create(pool, params)
>
> - try:
> - name = params['name']
> - volume = MockStorageVolume(pool, name, params)
> - volume.info['type'] = 'file'
> - volume.info['ref_cnt'] = params.get('ref_cnt', 0)
> - volume.info['format'] = params['format']
> - volume.info['path'] = os.path.join(
> - pool.info['path'], name)
> - if 'base' in params:
> - volume.info['base'] = copy.deepcopy(params['base'])
> - except KeyError, item:
> - raise MissingParameter("KCHVOL0004E",
> - {'item': str(item), 'volume': name})
> -
> - pool._volumes[name] = volume
> - cb('OK', True)
> -
> - def _create_volume_with_url(self, cb, params):
> - pool_name = params['pool']
> - name = params['name']
> - url = params['url']
> -
> - pool = self._get_storagepool(pool_name)
> -
> - params['path'] = os.path.join(pool.info['path'], name)
> - params['type'] = 'file'
> - params['base'] = url
> -
> - volume = MockStorageVolume(pool, name, params)
> - pool._volumes[name] = volume
> -
> - cb('OK', True)
> -
> - def storagevolume_clone(self, pool, name, new_pool=None, new_name=None):
> - if new_name is None:
> - base, ext = os.path.splitext(name)
> - new_name = get_next_clone_name(self.vms_get_list(), base, ext)
> -
> - if new_pool is None:
> - new_pool = pool
> -
> - params = {'name': name,
> - 'pool': pool,
> - 'new_name': new_name,
> - 'new_pool': new_pool}
> - taskid = self.add_task('/storagepools/%s/storagevolumes/%s' %
> - (new_pool, new_name),
> - self._storagevolume_clone_task, params)
> - return self.task_lookup(taskid)
> -
> - def _storagevolume_clone_task(self, cb, params):
> - try:
> - vol_name = params['name'].decode('utf-8')
> - pool_name = params['pool'].decode('utf-8')
> - new_vol_name = params['new_name'].decode('utf-8')
> - new_pool_name = params['new_pool'].decode('utf-8')
> -
> - orig_pool = self._get_storagepool(pool_name)
> - orig_vol = self._get_storagevolume(pool_name, vol_name)
> -
> - new_vol = copy.deepcopy(orig_vol)
> - new_vol.info['name'] = new_vol_name
> - new_vol.info['path'] = os.path.join(orig_pool.info['path'],
> - new_vol_name)
> -
> - new_pool = self._get_storagepool(new_pool_name)
> - new_pool._volumes[new_vol_name] = new_vol
> - except (KeyError, NotFoundError), e:
> - raise OperationFailed('KCHVOL0023E',
> - {'name': vol_name, 'pool': pool_name,
> - 'err': e.message})
> -
> - cb('OK', True)
> -
> - def storagevolume_lookup(self, pool, name):
> - if self._get_storagepool(pool).info['state'] != 'active':
> - raise InvalidOperation("KCHVOL0005E", {'pool': pool,
> - 'volume': name})
> -
> - storagevolume = self._get_storagevolume(pool, name)
> - return storagevolume.info
> -
> - def storagevolume_wipe(self, pool, name):
> - volume = self._get_storagevolume(pool, name)
> - volume.info['allocation'] = 0
> -
> - def storagevolume_delete(self, pool, name):
> - # firstly, we should check the pool actually exists
> - volume = self._get_storagevolume(pool, name)
> - del self._get_storagepool(pool)._volumes[volume.name]
> -
> - def storagevolume_resize(self, pool, name, size):
> - volume = self._get_storagevolume(pool, name)
> - volume.info['capacity'] = size
> -
> - def storagevolumes_get_list(self, pool):
> - res = self._get_storagepool(pool)
> - if res.info['state'] == 'inactive':
> - raise InvalidOperation("KCHVOL0006E", {'pool': pool})
> - return res._volumes.keys()
> -
> - def devices_get_list(self, _cap=None, _passthrough=None,
> - _passthrough_affected_by=None):
> - if _cap is None:
> - return self._mock_devices.devices.keys()
> - return [dev['name'] for dev in self._mock_devices.devices.values()
> - if dev['device_type'] == _cap]
> + def _mock_storagevolumes_get_list(self, pool):
> + pool_info = self.storagepool_lookup(pool)
> + if pool_info['type'] == 'scsi':
> + return self._mock_storagevolumes.scsi_volumes.keys()
>
> - def device_lookup(self, dev_name):
> - return self._mock_devices.devices[dev_name]
> + return self._model_storagevolumes_get_list(pool)
>
> - def isopool_lookup(self, name):
> - return {'state': 'active',
> - 'type': 'kimchi-iso'}
> -
> - def isovolumes_get_list(self):
> - iso_volumes = []
> - pools = self.storagepools_get_list()
> -
> - for pool in pools:
> - try:
> - volumes = self.storagevolumes_get_list(pool)
> - except InvalidOperation:
> - # Skip inactive pools
> - continue
> - for volume in volumes:
> - res = self.storagevolume_lookup(pool, volume)
> - if res['format'] == 'iso':
> - # prevent iso from different pool having same volume name
> - res['name'] = '%s-%s' % (pool, volume)
> - iso_volumes.append(res)
> - return iso_volumes
> -
> - def storageservers_get_list(self, _target_type=None):
> - # FIXME: This needs to be updted when adding new storage server support
> - target_type = STORAGE_SERVERS \
> - if not _target_type else [_target_type]
> - pools = self.storagepools_get_list()
> - server_list = []
> - for pool in pools:
> - try:
> - pool_info = self.storagepool_lookup(pool)
> - if (pool_info['type'] in target_type and
> - pool_info['source']['addr'] not in server_list):
> - server_list.append(pool_info['source']['addr'])
> - except NotFoundError:
> - pass
> -
> - return server_list
> -
> - def storageserver_lookup(self, server):
> - pools = self.storagepools_get_list()
> - for pool in pools:
> - try:
> - pool_info = self.storagepool_lookup(pool)
> - if pool_info['source'] and \
> - pool_info['source']['addr'] == server:
> - return dict(host=server)
> - except NotFoundError:
> - # Avoid inconsistent pool result because
> - # of lease between list and lookup
> - pass
> -
> - raise NotFoundError("KCHSR0001E", {'server': server})
> -
> - def dummy_interfaces(self):
> - interfaces = {}
> - ifaces = {"eth1": "nic", "bond0": "bonding",
> - "eth1.10": "vlan", "bridge0": "bridge"}
> - for i, name in enumerate(ifaces.iterkeys()):
> - iface = Interface(name)
> - iface.info['type'] = ifaces[name]
> - iface.info['ipaddr'] = '192.168.%s.101' % (i + 1)
> - interfaces[name] = iface
> - interfaces['eth1'].info['ipaddr'] = '192.168.0.101'
> - return interfaces
> -
> - def interfaces_get_list(self):
> - return self._mock_interfaces.keys()
> -
> - def interface_lookup(self, name):
> - return self._mock_interfaces[name].info
> -
> - def networks_create(self, params):
> - name = params['name']
> - if name in self.networks_get_list():
> - raise InvalidOperation("KCHNET0001E", {'name': name})
> -
> - network = MockNetwork(name)
> - connection = params['connection']
> - network.info['connection'] = connection
> - if connection == "bridge":
> - try:
> - interface = params['interface']
> - network.info['interface'] = interface
> - except KeyError:
> - raise MissingParameter("KCHNET0004E",
> - {'name': name})
> -
> - subnet = params.get('subnet', '')
> - if subnet:
> - network.info['subnet'] = subnet
> - try:
> - net = ipaddr.IPNetwork(subnet)
> - except ValueError:
> - msg_args = {'subnet': subnet, 'network': name}
> - raise InvalidParameter("KCHNET0003E", msg_args)
> -
> - network.info['dhcp'] = {
> - 'start': str(net.network + net.numhosts / 2),
> - 'stop': str(net.network + net.numhosts - 2)}
> -
> - self._mock_networks[name] = network
> - return name
> -
> - def _get_network(self, name):
> - try:
> - return self._mock_networks[name]
> - except KeyError:
> - raise NotFoundError("KCHNET0002E", {'name': name})
> -
> - def _get_vms_attach_to_a_network(self, network):
> - vms = []
> - for name, dom in self._mock_vms.iteritems():
> - if network in dom.networks:
> - vms.append(name)
> - return vms
> -
> - def _is_network_used_by_template(self, network):
> - for name, tmpl in self._mock_templates.iteritems():
> - if network in tmpl.info['networks']:
> - return True
> - return False
> -
> - def _is_network_in_use(self, name):
> - # The network "default" is used for Kimchi proposal and should not be
> - # deactivate or deleted. Otherwise, we will allow user create
> - # inconsistent templates from scratch
> - if name == 'default':
> - return True
> -
> - vms = self._get_vms_attach_to_a_network(name)
> - return bool(vms) or self._is_network_used_by_template(name)
> -
> - def network_lookup(self, name):
> - network = self._get_network(name)
> - network.info['vms'] = self._get_vms_attach_to_a_network(name)
> - network.info['in_use'] = self._is_network_in_use(name)
> -
> - return network.info
> -
> - def network_activate(self, name):
> - self._get_network(name).info['state'] = 'active'
> -
> - def network_deactivate(self, name):
> - if self._is_network_in_use(name):
> - raise InvalidOperation("KCHNET0018E", {'name': name})
> -
> - network = self._get_network(name)
> - if not network.info['persistent']:
> - self.network_delete(name)
> -
> - network.info['state'] = 'inactive'
> -
> - def network_delete(self, name):
> - if self._is_network_in_use(name):
> - raise InvalidOperation("KCHNET0017E", {'name': name})
> -
> - # firstly, we should check the network actually exists
> - network = self._get_network(name)
> - del self._mock_networks[network.name]
> -
> - def networks_get_list(self):
> - return sorted(self._mock_networks.keys())
> -
> - def vmstorages_create(self, vm_name, params):
> - path = params.get('path')
> - if path and path.startswith('/') and not os.path.exists(path):
> - raise InvalidParameter("KCHVMSTOR0003E", {'value': path})
> - if path and params.get('pool'):
> - raise InvalidParameter("KCHVMSTOR0017E")
> - elif params.get('pool'):
> - try:
> - self.storagevolume_lookup(params['pool'], params['vol'])
> - except Exception as e:
> - raise InvalidParameter("KCHVMSTOR0015E", {'error': e})
> - dom = self._get_vm(vm_name)
> - index = len(dom.storagedevices.keys()) + 1
> - params['dev'] = "hd" + string.ascii_lowercase[index]
> -
> - vmdev = MockVMStorageDevice(params)
> - dom.storagedevices[params['dev']] = vmdev
> - return params['dev']
> -
> - def vmstorages_get_list(self, vm_name):
> - dom = self._get_vm(vm_name)
> - return dom.storagedevices.keys()
> -
> - def vmstorage_lookup(self, vm_name, dev_name):
> - dom = self._get_vm(vm_name)
> - if dev_name not in self.vmstorages_get_list(vm_name):
> - raise NotFoundError(
> - "KCHVMSTOR0007E",
> - {'dev_name': dev_name, 'vm_name': vm_name})
> - return dom.storagedevices.get(dev_name).info
> -
> - def vmstorage_delete(self, vm_name, dev_name):
> - dom = self._get_vm(vm_name)
> - if dev_name not in self.vmstorages_get_list(vm_name):
> - raise NotFoundError(
> - "KCHVMSTOR0007E",
> - {'dev_name': dev_name, 'vm_name': vm_name})
> - dom.storagedevices.pop(dev_name)
> -
> - def vmstorage_update(self, vm_name, dev_name, params):
> - try:
> - dom = self._get_vm(vm_name)
> - dom.storagedevices[dev_name].info.update(params)
> - except Exception as e:
> - raise OperationFailed("KCHVMSTOR0009E", {'error': e.message})
> - return dev_name
> -
> - def vmifaces_create(self, vm, params):
> - if (params["type"] == "network" and
> - params["network"] not in self.networks_get_list()):
> - msg_args = {'network': params["network"], 'name': vm}
> - raise InvalidParameter("KCHVMIF0002E", msg_args)
> -
> - dom = self._get_vm(vm)
> - iface = MockVMIface(params["network"])
> - ("model" in params.keys() and
> - iface.info.update({"model": params["model"]}))
> -
> - mac = iface.info['mac']
> - dom.ifaces[mac] = iface
> - return mac
> -
> - def vmifaces_get_list(self, vm):
> - dom = self._get_vm(vm)
> - macs = dom.ifaces.keys()
> - return macs
> -
> - def vmiface_lookup(self, vm, mac):
> - dom = self._get_vm(vm)
> - try:
> - info = dom.ifaces[mac].info
> - except KeyError:
> - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm})
> - return info
> + def _mock_storagevolume_lookup(self, pool, vol):
> + pool_info = self.storagepool_lookup(pool)
> + if pool_info['type'] == 'scsi':
> + return self._mock_storagevolumes.scsi_volumes[vol]
>
> - def vmiface_delete(self, vm, mac):
> - dom = self._get_vm(vm)
> - try:
> - del dom.ifaces[mac]
> - except KeyError:
> - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm})
> + return self._model_storagevolume_lookup(pool, vol)
>
> - def vmiface_update(self, vm, mac, params):
> - dom = self._get_vm(vm)
> - try:
> - info = dom.ifaces[mac].info
> - except KeyError:
> - raise NotFoundError("KCHVMIF0001E", {'iface': mac, 'name': vm})
> - if info['type'] == 'network' and 'network' in params:
> - info['network'] = params['network']
> - if 'model' in params:
> - info['model'] = params['model']
> - return mac
> -
> - def tasks_get_list(self):
> - with self.objstore as session:
> - return session.get_list('task')
> -
> - def task_lookup(self, id):
> - with self.objstore as session:
> - return session.get('task', str(id))
> -
> - def add_task(self, target_uri, fn, opaque=None):
> - id = self.next_taskid
> - self.next_taskid = self.next_taskid + 1
> - AsyncTask(id, target_uri, fn, self.objstore, opaque)
> -
> - return id
> -
> - def _get_storagevolume(self, pool, name):
> - try:
> - return self._get_storagepool(pool)._volumes[name]
> - except KeyError:
> - raise NotFoundError("KCHVOL0002E", {'name': name, 'pool': pool})
> + def _mock_interfaces_get_list(self):
> + return self._mock_interfaces.ifaces.keys()
>
> - def _get_distros(self):
> - distroloader = DistroLoader()
> - return distroloader.get()
> + def _mock_interface_lookup(self, name):
> + return self._mock_interfaces.ifaces[name]
>
> - def distros_get_list(self):
> - return self.distros.keys()
> + def _mock_devices_get_list(self, _cap=None, _passthrough=None,
> + _passthrough_affected_by=None):
> + if _cap is None:
> + return self._mock_devices.devices.keys()
> + return [dev['name'] for dev in self._mock_devices.devices.values()
> + if dev['device_type'] == _cap]
>
> - def distro_lookup(self, name):
> - try:
> - return self.distros[name]
> - except KeyError:
> - raise NotFoundError("KCHDISTRO0001E", {'name': name})
> + def _mock_device_lookup(self, dev_name):
> + return self._mock_devices.devices[dev_name]
>
> - def _gen_debugreport_file(self, ident):
> - return self.add_task('/debugreports/%s' % ident, self._create_log,
> - ident)
> + def _mock_packagesupdate_get_list(self):
> + return self._mock_swupdate.pkgs.keys()
>
> - def _create_log(self, cb, name):
> - path = config.get_debugreports_path()
> - tmpf = os.path.join(path, name + '.tmp')
> - realf = os.path.join(path, name + '.txt')
> - length = random.randint(1000, 10000)
> - with open(tmpf, 'w') as fd:
> - while length:
> - fd.write('I am logged')
> - length = length - 1
> - os.rename(tmpf, realf)
> - cb("OK", True)
> + def _mock_packageupdate_lookup(self, pkg_name):
> + return self._mock_swupdate.pkgs[pkg_name]
>
> - def host_lookup(self, *name):
> - res = {}
> - res['memory'] = 6114058240
> - res['cpu_model'] = 'Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz'
> - res['cpus'] = 4
> - res['os_distro'] = 'Red Hat Enterprise Linux Server'
> - res['os_version'] = '6.4'
> - res['os_codename'] = 'Santiago'
> -
> - return res
> -
> - def hoststats_lookup(self, *name):
> - virt_mem = psutil.virtual_memory()
> - memory_stats = {'total': virt_mem.total,
> - 'free': virt_mem.free,
> - 'cached': virt_mem.cached,
> - 'buffers': virt_mem.buffers,
> - 'avail': virt_mem.available}
> - return {'cpu_utilization': round(random.uniform(0, 100), 1),
> - 'memory': memory_stats,
> - 'disk_read_rate': round(random.uniform(0, 4000), 1),
> - 'disk_write_rate': round(random.uniform(0, 4000), 1),
> - 'net_recv_rate': round(random.uniform(0, 4000), 1),
> - 'net_sent_rate': round(random.uniform(0, 4000), 1)}
> -
> - def hoststatshistory_lookup(self, *name):
> - return {'cpu_utilization': random.sample(range(100), 30),
> - 'memory': random.sample(range(4000), 30),
> - 'disk_read_rate': random.sample(range(4000), 30),
> - 'disk_write_rate': random.sample(range(4000), 30),
> - 'net_recv_rate': random.sample(range(4000), 30),
> - 'net_sent_rate': random.sample(range(4000), 30)}
> -
> - def users_get_list(self):
> - return ["userA", "userB", "userC", "admin"]
> -
> - def groups_get_list(self):
> - return ["groupA", "groupB", "groupC", "groupD"]
> -
> - def peers_get_list(self):
> - if kconfig.get("server", "federation") == "off":
> - return []
> -
> - return ["https://serverA:8001", "https://serverB:8001"]
> -
> - def vms_get_list_by_state(self, state):
> - ret_list = []
> - for name in self.vms_get_list():
> - if (self._mock_vms[name].info['state']) == state:
> - ret_list.append(name)
> - return ret_list
> -
> - def host_shutdown(self, args=None):
> - # Check for running vms before shutdown
> - running_vms = self.vms_get_list_by_state('running')
> - if len(running_vms) > 0:
> - raise OperationFailed("KCHHOST0001E")
> - cherrypy.engine.exit()
> -
> - def host_reboot(self, args=None):
> - # Find running VMs
> - running_vms = self.vms_get_list_by_state('running')
> - if len(running_vms) > 0:
> - raise OperationFailed("KCHHOST0002E")
> - cherrypy.engine.stop()
> - time.sleep(10)
> - cherrypy.engine.start()
> -
> - def partitions_get_list(self):
> - result = disks.get_partitions_names()
> - return result
> -
> - def partition_lookup(self, name):
> - if name not in disks.get_partitions_names():
> - raise NotFoundError("KCHPART0001E", {'name': name})
> -
> - return disks.get_partition_details(name)
> -
> - def config_lookup(self, name):
> - return {'display_proxy_port': kconfig.get('display',
> - 'display_proxy_port'),
> - 'version': config.get_version()}
> -
> - def packagesupdate_get_list(self):
> - return self._mock_swupdate.getUpdates()
> -
> - def packageupdate_lookup(self, pkg_name):
> - return self._mock_swupdate.getUpdate(pkg_name)
> -
> - def host_swupdate(self, args=None):
> - task_id = self.add_task('/host/swupdate', self._mock_swupdate.doUpdate,
> - None)
> + def _mock_host_swupdate(self, args=None):
> + task_id = add_task('/host/swupdate', self._mock_swupdate.doUpdate,
> + self.objstore)
> return self.task_lookup(task_id)
>
> - def repositories_get_list(self):
> - return self._mock_host_repositories.getRepositories()
> + def _mock_repositories_get_list(self):
> + return self._mock_repositories.repos.keys()
>
> - def repositories_create(self, params):
> + def _mock_repositories_create(self, params):
> # Create a repo_id if not given by user. The repo_id will follow
> # the format kimchi_repo_<integer>, where integer is the number of
> # seconds since the Epoch (January 1st, 1970), in UTC.
> @@ -1121,387 +307,62 @@ class MockModel(object):
> repo_id = "kimchi_repo_%s" % str(int(time.time() * 1000))
> params.update({'repo_id': repo_id})
>
> - if repo_id in self.repositories_get_list():
> - raise InvalidOperation("KCHREPOS0022E", {'repo_id': repo_id})
> -
> - self._mock_host_repositories.addRepository(params)
> - return repo_id
> -
> - def repository_lookup(self, repo_id):
> - return self._mock_host_repositories.getRepository(repo_id)
> -
> - def repository_delete(self, repo_id):
> - return self._mock_host_repositories.removeRepository(repo_id)
> -
> - def repository_enable(self, repo_id):
> - return self._mock_host_repositories.enableRepository(repo_id)
> -
> - def repository_disable(self, repo_id):
> - return self._mock_host_repositories.disableRepository(repo_id)
> -
> - def repository_update(self, repo_id, params):
> - return self._mock_host_repositories.updateRepository(repo_id, params)
> -
> -
> -class MockVMTemplate(VMTemplate):
> - def __init__(self, args, mockmodel_inst=None):
> - VMTemplate.__init__(self, args)
> - self.model = mockmodel_inst
> -
> - def _get_all_networks_name(self):
> - return self.model.networks_get_list()
> -
> - def _get_all_storagepools_name(self):
> - return self.model.storagepools_get_list()
> -
> - def _storage_validate(self):
> - pool_uri = self.info['storagepool']
> - pool_name = pool_name_from_uri(pool_uri)
> - try:
> - pool = self.model._get_storagepool(pool_name)
> - except NotFoundError:
> - msg_args = {'pool': pool_name, 'template': self.name}
> - raise InvalidParameter("KCHTMPL0004E", msg_args)
> -
> - if pool.info['state'] != 'active':
> - msg_args = {'pool': pool_name, 'template': self.name}
> - raise InvalidParameter("KCHTMPL0005E", msg_args)
> -
> - return pool
> -
> - def _get_storage_path(self):
> - pool = self._storage_validate()
> - return pool.info['path']
> -
> - def _get_volume_path(self, pool, vol):
> - return self.model.storagevolume_lookup(pool, vol)['path']
> -
> - def fork_vm_storage(self, vm_name):
> - pool = self._storage_validate()
> - volumes = self.to_volume_list(vm_name)
> - disk_paths = []
> - for vol_info in volumes:
> - vol_info['capacity'] = vol_info['capacity'] << 10
> - vol_info['ref_cnt'] = 1
> - if 'base' in self.info:
> - vol_info['base'] = copy.deepcopy(self.info['base'])
> - self.model.storagevolumes_create(pool.name, vol_info)
> - disk_paths.append({'pool': pool.name, 'volume': vol_info['name']})
> - return disk_paths
> -
> -
> -class MockVMStorageDevice(object):
> - def __init__(self, params):
> - self.info = {'dev': params.get('dev'),
> - 'type': params.get('type'),
> - 'pool': params.get('pool'),
> - 'vol': params.get('vol'),
> - 'path': params.get('path')}
> -
> -
> -class MockVMIface(object):
> - counter = 0
> -
> - def __init__(self, network=None):
> - self.__class__.counter += 1
> - self.info = {'type': 'network',
> - 'model': 'virtio',
> - 'network': network if network
> - else "net-%s" % self.counter,
> - 'mac': self.get_mac()
> - }
> -
> - @classmethod
> - def get_mac(cls):
> - mac = ":".join(["52", "54"] +
> - ["%02x" % (cls.counter / (256 ** i) % 256)
> - for i in range(3, -1, -1)])
> - return mac
> -
> -
> -class MockVM(object):
> - def __init__(self, uuid, name, template_info):
> - self.uuid = uuid
> - self.name = name
> - self.memory = template_info['memory']
> - self.cpus = template_info['cpus']
> - self.disk_paths = []
> - self.networks = template_info['networks']
> - ifaces = [MockVMIface(net) for net in self.networks]
> - self.storagedevices = {}
> - self.ifaces = dict([(iface.info['mac'], iface) for iface in ifaces])
> -
> - stats = {'cpu_utilization': 20,
> - 'net_throughput': 35,
> - 'net_throughput_peak': 100,
> - 'io_throughput': 45,
> - 'io_throughput_peak': 100}
> - self.info = {'name': self.name,
> - 'state': 'shutoff',
> - 'stats': stats,
> - 'uuid': self.uuid,
> - 'memory': self.memory,
> - 'cpus': self.cpus,
> - 'icon': None,
> - 'graphics': {'type': 'vnc', 'listen': '127.0.0.1',
> - 'port': None, 'passwd': '123456',
> - 'passwdValidTo': None},
> - 'users': ['user1', 'user2', 'root'],
> - 'groups': ['group1', 'group2', 'admin'],
> - 'access': 'full'
> - }
> - self.info['graphics'].update(template_info['graphics'])
> -
> -
> -class MockStoragePool(object):
> - def __init__(self, name):
> - self.name = name
> - self.info = {'state': 'inactive',
> - 'capacity': 1024 << 20,
> - 'allocated': 512 << 20,
> - 'available': 512 << 20,
> - 'path': '/var/lib/libvirt/images',
> - 'source': {},
> - 'type': 'dir',
> - 'nr_volumes': 0,
> - 'autostart': 0,
> - 'persistent': True}
> - self._volumes = {}
> -
> - def refresh(self):
> - state = self.info['state']
> - self.info['nr_volumes'] = len(self._volumes) \
> - if state == 'active' else 0
> -
> -
> -class Interface(object):
> - def __init__(self, name):
> - self.name = name
> - self.info = {'type': 'nic',
> - 'ipaddr': '192.168.0.101',
> - 'netmask': '255.255.255.0',
> - 'status': 'active'}
> -
> -
> -class MockNetwork(object):
> - def __init__(self, name):
> - self.name = name
> - self.info = {'state': 'inactive',
> - 'autostart': True,
> - 'connection': 'nat',
> - 'interface': 'virbr0',
> - 'subnet': '192.168.122.0/24',
> - 'dhcp': {'start': '192.168.122.128',
> - 'stop': '192.168.122.254'},
> - 'persistent': True
> - }
> -
> -
> -class MockTask(object):
> - def __init__(self, id):
> - self.id = id
> -
> -
> -class MockStorageVolume(object):
> - def __init__(self, pool, name, params={}):
> - self.name = name
> - self.pool = pool
> - # Check if volume should be scsi lun
> - if params.get('type') == 'lun':
> - params = self._def_lun(name)
> - fmt = params.get('format', 'raw')
> - capacity = params.get('capacity', 1024)
> - self.info = {'type': params.get('type', 'disk'),
> - 'capacity': capacity << 20,
> - 'allocation': params.get('allocation', '512'),
> - 'path': params.get('path'),
> - 'ref_cnt': params.get('ref_cnt'),
> - 'format': fmt}
> - if fmt == 'iso':
> - self.info['allocation'] = self.info['capacity']
> - self.info['os_version'] = '17'
> - self.info['os_distro'] = 'fedora'
> - self.info['bootable'] = True
> -
> - def _def_lun(self, name):
> - capacity = int(random.uniform(100, 300)) << 20
> - path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20999980e52e4492-lun"
> - return {
> - "capacity": capacity,
> - "name": name,
> - "format": random.choice(['dos', 'unknown']),
> - "allocation": capacity,
> - "path": path + name[-1],
> - "type": "block"}
> -
> -
> -class MockVMScreenshot(VMScreenshot):
> - OUTDATED_SECS = 5
> - BACKGROUND_COLOR = ['blue', 'green', 'purple', 'red', 'yellow']
> - BOX_COORD = (50, 115, 206, 141)
> - BAR_COORD = (50, 115, 50, 141)
> -
> - def __init__(self, vm_name):
> - VMScreenshot.__init__(self, vm_name)
> - self.coord = MockVMScreenshot.BAR_COORD
> - self.background = random.choice(MockVMScreenshot.BACKGROUND_COLOR)
> -
> - def _generate_scratch(self, thumbnail):
> - self.coord = (self.coord[0],
> - self.coord[1],
> - min(MockVMScreenshot.BOX_COORD[2],
> - self.coord[2] + random.randrange(50)),
> - self.coord[3])
> -
> - image = Image.new("RGB", (256, 256), self.background)
> - d = ImageDraw.Draw(image)
> - d.rectangle(MockVMScreenshot.BOX_COORD, outline='black')
> - d.rectangle(self.coord, outline='black', fill='black')
> - image.save(thumbnail)
> -
> -
> -class MockSoftwareUpdate(object):
> - def __init__(self):
> - self._packages = {
> - 'udevmountd': {'repository': 'openSUSE-13.1-Update',
> - 'version': '0.81.5-14.1',
> - 'arch': 'x86_64',
> - 'package_name': 'udevmountd'},
> - 'sysconfig-network': {'repository': 'openSUSE-13.1-Extras',
> - 'version': '0.81.5-14.1',
> - 'arch': 'x86_64',
> - 'package_name': 'sysconfig-network'},
> - 'libzypp': {'repository': 'openSUSE-13.1-Update',
> - 'version': '13.9.0-10.1',
> - 'arch': 'noarch',
> - 'package_name': 'libzypp'}}
> - self._num2update = 3
> -
> - def getUpdates(self):
> - return self._packages.keys()
> -
> - def getUpdate(self, name):
> - if name not in self._packages.keys():
> - raise NotFoundError('KCHPKGUPD0002E', {'name': name})
> - return self._packages[name]
> -
> - def getNumOfUpdates(self):
> - return self._num2update
> -
> - def doUpdate(self, cb, params):
> - msgs = []
> - for pkg in self._packages.keys():
> - msgs.append("Updating package %s" % pkg)
> - cb('\n'.join(msgs))
> - time.sleep(1)
> -
> - time.sleep(2)
> - msgs.append("All packages updated")
> - cb('\n'.join(msgs), True)
> -
> - # After updating all packages any package should be listed to be
> - # updated, so reset self._packages
> - self._packages = {}
> -
> -
> -class MockRepositories(object):
> - def __init__(self):
> - self._repos = {"kimchi_repo_1392167832":
> - {"repo_id": "kimchi_repo_1392167832",
> - "enabled": True,
> - "baseurl": "http://www.fedora.org",
> - "config": {"repo_name": "kimchi_repo_1392167832",
> - "gpgkey": [],
> - "gpgcheck": True,
> - "mirrorlist": ""}
> - }
> - }
> -
> - def addRepository(self, params):
> - # Create and enable the repository
> - repo_id = params['repo_id']
> config = params.get('config', {})
> - baseurl = params.get('baseurl')
> - mirrorlist = config.get('mirrorlist', "")
> -
> - if baseurl:
> - validate_repo_url(baseurl)
> -
> - if mirrorlist:
> - validate_repo_url(mirrorlist)
> -
> - repo = {'repo_id': repo_id,
> - 'baseurl': baseurl,
> + info = {'repo_id': repo_id,
> + 'baseurl': params['baseurl'],
> 'enabled': True,
> 'config': {'repo_name': config.get('repo_name', repo_id),
> 'gpgkey': config.get('gpgkey', []),
> 'gpgcheck': True,
> - 'mirrorlist': mirrorlist}
> - }
> -
> - self._repos[repo_id] = repo
> + 'mirrorlist': params.get('mirrorlist', '')}}
> + self._mock_repositories.repos[repo_id] = info
> return repo_id
>
> - def getRepositories(self):
> - return self._repos.keys()
> -
> - def getRepository(self, repo_id):
> - if repo_id not in self._repos.keys():
> - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
> -
> - return self._repos[repo_id]
> -
> - def enableRepository(self, repo_id):
> - if repo_id not in self._repos.keys():
> - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
> -
> - info = self._repos[repo_id]
> - # Check if repo_id is already enabled
> - if info['enabled']:
> - raise NotFoundError("KCHREPOS0015E", {'repo_id': repo_id})
> + def _mock_repository_lookup(self, repo_id):
> + return self._mock_repositories.repos[repo_id]
>
> - info['enabled'] = True
> - self._repos[repo_id] = info
> - return repo_id
> + def _mock_repository_delete(self, repo_id):
> + del self._mock_repositories.repos[repo_id]
>
> - def disableRepository(self, repo_id):
> - if repo_id not in self._repos.keys():
> - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
> + def _mock_repository_enable(self, repo_id):
> + self._mock_repositories.repos[repo_id]['enabled'] = True
>
> - info = self._repos[repo_id]
> - # Check if repo_id is already disabled
> - if not info['enabled']:
> - raise NotFoundError("KCHREPOS0016E", {'repo_id': repo_id})
> + def _mock_repository_disable(self, repo_id):
> + self._mock_repositories.repos[repo_id]['enabled'] = False
>
> - info['enabled'] = False
> - self._repos[repo_id] = info
> + def _mock_repository_update(self, repo_id, params):
> + self._mock_repositories.repos[repo_id].update(params)
> return repo_id
>
> - def updateRepository(self, repo_id, params):
> - if repo_id not in self._repos.keys():
> - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
> -
> - baseurl = params.get('baseurl', None)
> - config = params.get('config', {})
> - mirrorlist = config.get('mirrorlist', None)
> -
> - if baseurl:
> - validate_repo_url(baseurl)
> -
> - if mirrorlist:
> - validate_repo_url(mirrorlist)
> -
> - info = self._repos[repo_id]
> - info.update(params)
> - del self._repos[repo_id]
> - self._repos[info['repo_id']] = info
> - return info['repo_id']
>
> - def removeRepository(self, repo_id):
> - if repo_id not in self._repos.keys():
> - raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
> -
> - del self._repos[repo_id]
> +class MockStorageVolumes(object):
> + def __init__(self):
> + base_path = "/dev/disk/by-path/pci-0000:0e:00.0-fc-0x20-lun"
> + self.scsi_volumes = {'unit:0:0:1': {'capacity': 1024,
> + 'format': 'unknown',
> + 'allocation': 512,
> + 'type': 'block',
> + 'path': base_path + '1',
> + 'ref_cnt': 0},
> + 'unit:0:0:2': {'capacity': 2048,
> + 'format': 'unknown',
> + 'allocation': 512,
> + 'type': 'block',
> + 'path': base_path + '2',
> + 'ref_cnt': 0}}
> +
> +
> +class MockInterfaces(object):
> + def __init__(self):
> + self.ifaces = {}
> + params = {"eth1": "nic", "bond0": "bonding",
> + "eth1.10": "vlan", "bridge0": "bridge"}
> + for i, name in enumerate(params.iterkeys()):
> + info = {'name': name, 'type': params[name],
> + 'ipaddr': '192.168.%s.101' % (i + 1),
> + 'netmask': '255.255.255.0', 'status': 'active'}
> + self.ifaces[name] = info
> + self.ifaces['eth1']['ipaddr'] = '192.168.0.101'
>
>
> class MockDevices(object):
> @@ -1581,52 +442,46 @@ class MockDevices(object):
> 'path': '/sys/devices/pci0000:00/0000:40:00.0/2'}}
>
>
> -def get_mock_environment():
> - model = MockModel()
> - for i in xrange(5):
> - name = 'test-template-%i' % i
> - params = {'name': name, 'cdrom': '/file.iso'}
> - t = MockVMTemplate(params, model)
> - model._mock_templates[name] = t
> -
> - for name in ('test-template-1', 'test-template-3'):
> - model._mock_templates[name].info.update({'folder': ['rhel', '6']})
> -
> - for i in xrange(10):
> - name = u'test-vm-%i' % i
> - vm_uuid = str(uuid.uuid4())
> - vm = MockVM(vm_uuid, name, model.template_lookup('test-template-0'))
> - model._mock_vms[name] = vm
> -
> - # mock storagepool
> - for i in xrange(5):
> - name = 'default-pool-%i' % i
> - defaultstoragepool = MockStoragePool(name)
> - defaultstoragepool.info['path'] += '/%i' % i
> - model._mock_storagepools[name] = defaultstoragepool
> - for j in xrange(5):
> - vol_name = 'volume-%i' % j
> - defaultstoragevolume = MockStorageVolume(name, vol_name)
> - defaultstoragevolume.info['path'] = '%s/%s' % (
> - defaultstoragepool.info['path'], vol_name)
> - mockpool = model._mock_storagepools[name]
> - mockpool._volumes[vol_name] = defaultstoragevolume
> - vol_name = 'Fedora17.iso'
> - defaultstoragevolume = MockStorageVolume(name, vol_name,
> - {'format': 'iso'})
> - defaultstoragevolume.info['path'] = '%s/%s' % (
> - defaultstoragepool.info['path'], vol_name)
> - mockpool = model._mock_storagepools[name]
> - mockpool._volumes[vol_name] = defaultstoragevolume
> -
> - # mock network
> - for i in xrange(5):
> - name = 'test-network-%i' % i
> - testnetwork = MockNetwork(name)
> - testnetwork.info['interface'] = 'virbr%i' % (i + 1)
> - testnetwork.info['subnet'] = '192.168.%s.0/24' % (i + 1)
> - testnetwork.info['dhcp']['start'] = '192.168.%s.128' % (i + 1)
> - testnetwork.info['dhcp']['end'] = '192.168.%s.254' % (i + 1)
> - model._mock_networks[name] = testnetwork
> -
> - return model
> +class MockSoftwareUpdate(object):
> + def __init__(self):
> + self.pkgs = {
> + 'udevmountd': {'repository': 'openSUSE-13.1-Update',
> + 'version': '0.81.5-14.1',
> + 'arch': 'x86_64',
> + 'package_name': 'udevmountd'},
> + 'sysconfig-network': {'repository': 'openSUSE-13.1-Extras',
> + 'version': '0.81.5-14.1',
> + 'arch': 'x86_64',
> + 'package_name': 'sysconfig-network'},
> + 'libzypp': {'repository': 'openSUSE-13.1-Update',
> + 'version': '13.9.0-10.1',
> + 'arch': 'noarch',
> + 'package_name': 'libzypp'}}
> + self._num2update = 3
> +
> + def doUpdate(self, cb, params):
> + msgs = []
> + for pkg in self.pkgs.keys():
> + msgs.append("Updating package %s" % pkg)
> + cb('\n'.join(msgs))
> + time.sleep(1)
> +
> + time.sleep(2)
> + msgs.append("All packages updated")
> + cb('\n'.join(msgs), True)
> +
> + # After updating all packages any package should be listed to be
> + # updated, so reset self._packages
> + self.pkgs = {}
> +
> +
> +class MockRepositories(object):
> + def __init__(self):
> + self.repos = {"kimchi_repo_1392167832":
> + {"repo_id": "kimchi_repo_1392167832",
> + "enabled": True,
> + "baseurl": "http://www.fedora.org",
> + "config": {"repo_name": "kimchi_repo_1392167832",
> + "gpgkey": [],
> + "gpgcheck": True,
> + "mirrorlist": ""}}}
> diff --git a/src/kimchi/vmtemplate.py b/src/kimchi/vmtemplate.py
> index 5dbbdd4..4202ae6 100644
> --- a/src/kimchi/vmtemplate.py
> +++ b/src/kimchi/vmtemplate.py
> @@ -26,9 +26,9 @@ import uuid
> from lxml import etree
> from lxml.builder import E
>
> +from kimchi import imageinfo
> from kimchi import osinfo
> from kimchi.exception import InvalidParameter, IsoFormatError, MissingParameter
> -from kimchi.imageinfo import probe_image, probe_img_info
> from kimchi.isoinfo import IsoImage
> from kimchi.utils import check_url_path, pool_name_from_uri
> from kimchi.xmlutils.disk import get_disk_xml
> @@ -90,10 +90,11 @@ class VMTemplate(object):
> if 'base' in d.keys():
> base_imgs.append(d)
> if scan:
> - distro, version = probe_image(d['base'])
> + distro, version = imageinfo.probe_image(d['base'])
>
> if 'size' not in d.keys():
> - d['size'] = probe_img_info(d['base'])['virtual-size']
> + d_info = imageinfo.probe_img_info(d['base'])
> + d['size'] = d_info['virtual-size']
>
> if len(base_imgs) == 0:
> raise MissingParameter("KCHTMPL0016E")
> @@ -201,7 +202,7 @@ class VMTemplate(object):
>
> if 'base' in d:
> info['base'] = dict()
> - base_fmt = probe_img_info(d['base'])['format']
> + base_fmt = imageinfo.probe_img_info(d['base'])['format']
> if base_fmt is None:
> raise InvalidParameter("KCHTMPL0024E", {'path': d['base']})
> info['base']['path'] = d['base']
> diff --git a/tests/run_tests.sh.in b/tests/run_tests.sh.in
> index a097761..6ba486f 100644
> --- a/tests/run_tests.sh.in
> +++ b/tests/run_tests.sh.in
> @@ -24,11 +24,23 @@ PYTHON_VER=@PYTHON_VERSION@
> if [ $# -ne 0 ]; then
> ARGS="$@"
> else
> - ARGS="discover"
> + ARGS=`find -name "test_*.py" | xargs -I @ basename @ .py`
> fi
>
> if [ "$HAVE_UNITTEST" != "yes" -o "$PYTHON_VER" == "2.6" ]; then
> - PYTHONPATH=../src:./:../ unit2 $ARGS
> + CMD="unit2"
> else
> - PYTHONPATH=../src:../ python -m unittest $ARGS
> + CMD="python -m unittest"
> fi
> +
> +SORTED_LIST=($ARGS)
> +for ((i=0;i<${#SORTED_LIST[@]};i++)); do
> +
> + if [[ ${SORTED_LIST[$i]} == test_model* ]]; then
> + FIRST=${SORTED_LIST[$i]}
> + SORTED_LIST[$i]=${SORTED_LIST[0]}
> + SORTED_LIST[0]=$FIRST
> + fi
> +done
> +
> +PYTHONPATH=../src:../ $CMD ${SORTED_LIST[@]}
> diff --git a/tests/test_authorization.py b/tests/test_authorization.py
> index 71b416f..6e463d4 100644
> --- a/tests/test_authorization.py
> +++ b/tests/test_authorization.py
> @@ -21,11 +21,10 @@ import json
> import os
> import unittest
>
> -
> from functools import partial
>
> -
> import kimchi.mockmodel
> +from iso_gen import construct_fake_iso
> from utils import get_free_port, patch_auth, request
> from utils import run_server
>
> @@ -35,6 +34,7 @@ model = None
> host = None
> port = None
> ssl_port = None
> +fake_iso = '/tmp/fake.iso'
>
>
> def setUpModule():
> @@ -47,10 +47,14 @@ def setUpModule():
> ssl_port = get_free_port('https')
> test_server = run_server(host, port, ssl_port, test_mode=True, model=model)
>
> + # Create fake ISO to do the tests
> + construct_fake_iso(fake_iso, True, '12.04', 'ubuntu')
> +
>
> def tearDownModule():
> test_server.stop()
> os.unlink('/tmp/obj-store-test')
> + os.unlink(fake_iso)
>
>
> class AuthorizationTests(unittest.TestCase):
> @@ -103,7 +107,7 @@ class AuthorizationTests(unittest.TestCase):
> # but he can get and create a new one
> resp = self.request('/templates', '{}', 'GET')
> self.assertEquals(403, resp.status)
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(403, resp.status)
> resp = self.request('/templates/test', '{}', 'PUT')
> @@ -112,7 +116,7 @@ class AuthorizationTests(unittest.TestCase):
> self.assertEquals(403, resp.status)
>
> # Non-root users can only get vms authorized to them
> - model.templates_create({'name': u'test', 'cdrom': '/nonexistent.iso'})
> + model.templates_create({'name': u'test', 'cdrom': fake_iso})
>
> model.vms_create({'name': u'test-me', 'template': '/templates/test'})
> model.vm_update(u'test-me',
> @@ -121,11 +125,11 @@ class AuthorizationTests(unittest.TestCase):
>
> model.vms_create({'name': u'test-usera',
> 'template': '/templates/test'})
> - model.vm_update(u'test-usera', {'users': ['userA'], 'groups': []})
> + model.vm_update(u'test-usera', {'users': ['root'], 'groups': []})
>
> model.vms_create({'name': u'test-groupa',
> 'template': '/templates/test'})
> - model.vm_update(u'test-groupa', {'groups': ['groupA']})
> + model.vm_update(u'test-groupa', {'groups': ['wheel']})
>
> resp = self.request('/vms', '{}', 'GET')
> self.assertEquals(200, resp.status)
> @@ -136,11 +140,12 @@ class AuthorizationTests(unittest.TestCase):
> self.assertEquals(403, resp.status)
>
> # Create a vm using mockmodel directly to test Resource access
> - model.vms_create({'name': 'test', 'template': '/templates/test'})
> + model.vms_create({'name': 'kimchi-test',
> + 'template': '/templates/test'})
>
> - resp = self.request('/vms/test', '{}', 'PUT')
> + resp = self.request('/vms/kimchi-test', '{}', 'PUT')
> self.assertEquals(403, resp.status)
> - resp = self.request('/vms/test', '{}', 'DELETE')
> + resp = self.request('/vms/kimchi-test', '{}', 'DELETE')
> self.assertEquals(403, resp.status)
>
> # Non-root users can only update VMs authorized by them
> @@ -150,4 +155,4 @@ class AuthorizationTests(unittest.TestCase):
> self.assertEquals(403, resp.status)
>
> model.template_delete('test')
> - model.vm_delete('test')
> + model.vm_delete('test-me')
> diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
> index 4276832..29354aa 100644
> --- a/tests/test_mockmodel.py
> +++ b/tests/test_mockmodel.py
> @@ -38,24 +38,29 @@ ssl_port = None
> fake_iso = None
>
>
> +def setUpModule():
> + global host, port, ssl_port, model, test_server, fake_iso
> + cherrypy.request.headers = {'Accept': 'application/json'}
> + model = kimchi.mockmodel.MockModel('/tmp/obj-store-test')
> + patch_auth()
> + port = get_free_port('http')
> + ssl_port = get_free_port('https')
> + host = '127.0.0.1'
> + test_server = run_server(host, port, ssl_port, test_mode=True,
> + model=model)
> + fake_iso = '/tmp/fake.iso'
> + open(fake_iso, 'w').close()
> +
> +
> +def tearDown():
> + test_server.stop()
> + os.unlink('/tmp/obj-store-test')
> + os.unlink(fake_iso)
> +
> +
> class MockModelTests(unittest.TestCase):
> def setUp(self):
> - global host, port, ssl_port, model, test_server, fake_iso
> - cherrypy.request.headers = {'Accept': 'application/json'}
> - model = kimchi.mockmodel.MockModel('/tmp/obj-store-test')
> - patch_auth()
> - port = get_free_port('http')
> - ssl_port = get_free_port('https')
> - host = '127.0.0.1'
> - test_server = run_server(host, port, ssl_port, test_mode=True,
> - model=model)
> - fake_iso = '/tmp/fake.iso'
> - open(fake_iso, 'w').close()
> -
> - def tearDown(self):
> - test_server.stop()
> - os.unlink('/tmp/obj-store-test')
> - os.unlink(fake_iso)
> + model.reset()
>
> def test_collection(self):
> c = Collection(model)
> @@ -190,38 +195,37 @@ class MockModelTests(unittest.TestCase):
> request(host, ssl_port, '/templates', req, 'POST')
>
> def add_vm(name):
> -
> # Create a VM
> req = json.dumps({'name': name, 'template': '/templates/test'})
> request(host, ssl_port, '/vms', req, 'POST')
>
> - add_vm('bca')
> - add_vm('xba')
> - add_vm('abc')
> - add_vm('cab')
> + vms = [u'abc', u'bca', u'cab', u'xba']
> + for vm in vms:
> + add_vm(vm)
>
> - self.assertEqual(model.vms_get_list(), ['abc', 'bca', 'cab', 'xba'])
> + vms.append(u'test')
> + self.assertEqual(model.vms_get_list(), sorted(vms))
>
> def test_vm_info(self):
> model.templates_create({'name': u'test',
> 'cdrom': fake_iso})
> - model.vms_create({'name': u'test', 'template': '/templates/test'})
> + model.vms_create({'name': u'test-vm', 'template': '/templates/test'})
> vms = model.vms_get_list()
> - self.assertEquals(1, len(vms))
> - self.assertEquals(u'test', vms[0])
> + self.assertEquals(2, len(vms))
> + self.assertIn(u'test-vm', vms)
>
> keys = set(('name', 'state', 'stats', 'uuid', 'memory', 'cpus',
> 'screenshot', 'icon', 'graphics', 'users', 'groups',
> - 'access'))
> + 'access', 'persistent'))
>
> stats_keys = set(('cpu_utilization',
> 'net_throughput', 'net_throughput_peak',
> 'io_throughput', 'io_throughput_peak'))
>
> - info = model.vm_lookup(u'test')
> + info = model.vm_lookup(u'test-vm')
> self.assertEquals(keys, set(info.keys()))
> self.assertEquals('shutoff', info['state'])
> - self.assertEquals('test', info['name'])
> + self.assertEquals('test-vm', info['name'])
> self.assertEquals(1024, info['memory'])
> self.assertEquals(1, info['cpus'])
> self.assertEquals('images/icon-vm.png', info['icon'])
> diff --git a/tests/test_rest.py b/tests/test_rest.py
> index 6770647..7f14b50 100644
> --- a/tests/test_rest.py
> +++ b/tests/test_rest.py
> @@ -21,23 +21,22 @@
> import base64
> import json
> import os
> -import random
> import re
> import requests
> import shutil
> import time
> import unittest
> import urllib2
> -
> +import urlparse
>
> from functools import partial
>
> -
> import iso_gen
> import kimchi.mockmodel
> import kimchi.server
> from kimchi.config import paths
> from kimchi.rollbackcontext import RollbackContext
> +from kimchi.utils import add_task
> from utils import get_free_port, patch_auth, request
> from utils import run_server, wait_task
>
> @@ -48,8 +47,7 @@ host = None
> port = None
> ssl_port = None
> cherrypy_port = None
> -
> -# utils.silence_server()
> +fake_iso = '/tmp/fake.iso'
>
>
> def setUpModule():
> @@ -64,10 +62,14 @@ def setUpModule():
> test_server = run_server(host, port, ssl_port, test_mode=True,
> cherrypy_port=cherrypy_port, model=model)
>
> + # Create fake ISO to do the tests
> + iso_gen.construct_fake_iso(fake_iso, True, '12.04', 'ubuntu')
> +
>
> def tearDownModule():
> test_server.stop()
> os.unlink('/tmp/obj-store-test')
> + os.unlink(fake_iso)
>
>
> class RestTests(unittest.TestCase):
> @@ -172,15 +174,17 @@ class RestTests(unittest.TestCase):
>
> def test_get_vms(self):
> vms = json.loads(self.request('/vms').read())
> - self.assertEquals(0, len(vms))
> + # test_rest.py uses MockModel() which connects to libvirt URI
> + # test:///default. By default this driver already has one VM created
> + self.assertEquals(1, len(vms))
>
> # Create a template as a base for our VMs
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
>
> - test_users = ['user1', 'user2', 'root']
> - test_groups = ['group1', 'group2', 'admin']
> + test_users = ['root']
> + test_groups = ['wheel']
> # Now add a couple of VMs to the mock model
> for i in xrange(10):
> name = 'vm-%i' % i
> @@ -190,16 +194,16 @@ class RestTests(unittest.TestCase):
> self.assertEquals(201, resp.status)
>
> vms = json.loads(self.request('/vms').read())
> - self.assertEquals(10, len(vms))
> + self.assertEquals(11, len(vms))
>
> vm = json.loads(self.request('/vms/vm-1').read())
> self.assertEquals('vm-1', vm['name'])
> self.assertEquals('shutoff', vm['state'])
> - self.assertEquals(test_users, vm['users'])
> - self.assertEquals(test_groups, vm['groups'])
> + self.assertEquals([], vm['users'])
> + self.assertEquals([], vm['groups'])
>
> def test_edit_vm(self):
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
>
> @@ -311,7 +315,7 @@ class RestTests(unittest.TestCase):
> # Create a Template
> req = json.dumps({'name': 'test', 'disks': [{'size': 1}],
> 'icon': 'images/icon-debian.png',
> - 'cdrom': '/nonexistent.iso'})
> + 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
>
> @@ -326,8 +330,8 @@ class RestTests(unittest.TestCase):
> self.assertEquals('images/icon-debian.png', vm['icon'])
>
> # Verify the volume was created
> - vol_uri = '/storagepools/default/storagevolumes/%s-0.img' % vm['uuid']
> - resp = self.request(vol_uri)
> + vol_uri = '/storagepools/default-pool/storagevolumes/%s-0.img'
> + resp = self.request(vol_uri % vm['uuid'])
> vol = json.loads(resp.read())
> self.assertEquals(1 << 30, vol['capacity'])
> self.assertEquals(1, vol['ref_cnt'])
> @@ -391,11 +395,11 @@ class RestTests(unittest.TestCase):
> self.assertEquals(204, resp.status)
>
> # Verify the volume was deleted
> - self.assertHTTPStatus(404, vol_uri)
> + self.assertHTTPStatus(404, vol_uri % vm['uuid'])
>
> def test_vm_graphics(self):
> # Create a Template
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
>
> @@ -475,7 +479,7 @@ class RestTests(unittest.TestCase):
>
> with RollbackContext() as rollback:
> # Create a template as a base for our VMs
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
> # Delete the template
> @@ -574,8 +578,6 @@ class RestTests(unittest.TestCase):
> self.assertEquals(201, resp.status)
> cd_info = json.loads(resp.read())
> self.assertEquals('disk', cd_info['type'])
> - self.assertEquals('tmp', cd_info['pool'])
> - self.assertEquals('attach-volume', cd_info['vol'])
>
> # Attach a cdrom with existent dev name
> req = json.dumps({'type': 'cdrom',
> @@ -592,24 +594,29 @@ class RestTests(unittest.TestCase):
> os.remove('/tmp/existent.iso')
>
> # Change path of storage cdrom
> - req = json.dumps({'path': 'http://myserver.com/myiso.iso'})
> - resp = self.request('/vms/test-vm/storages/'+cd_dev, req, 'PUT')
> + cdrom = u'http://fedora.mirrors.tds.net/pub/fedora/releases/20/'\
> + 'Live/x86_64/Fedora-Live-Desktop-x86_64-20-1.iso'
> + req = json.dumps({'path': cdrom})
> + resp = self.request('/vms/test-vm/storages/' + cd_dev, req, 'PUT')
> self.assertEquals(200, resp.status)
> cd_info = json.loads(resp.read())
> - self.assertEquals('http://myserver.com/myiso.iso', cd_info['path'])
> + self.assertEquals(urlparse.urlparse(cdrom).path,
> + urlparse.urlparse(cd_info['path']).path)
>
> # Test GET
> devs = json.loads(self.request('/vms/test-vm/storages').read())
> self.assertEquals(4, len(devs))
>
> # Detach storage cdrom
> - resp = self.request('/vms/test-vm/storages/'+cd_dev,
> + resp = self.request('/vms/test-vm/storages/' + cd_dev,
> '{}', 'DELETE')
> self.assertEquals(204, resp.status)
>
> # Test GET
> devs = json.loads(self.request('/vms/test-vm/storages').read())
> self.assertEquals(3, len(devs))
> + resp = self.request('/storagepools/tmp/deactivate', {}, 'POST')
> + self.assertEquals(200, resp.status)
> resp = self.request('/storagepools/tmp', {}, 'DELETE')
> self.assertEquals(204, resp.status)
>
> @@ -617,7 +624,7 @@ class RestTests(unittest.TestCase):
>
> with RollbackContext() as rollback:
> # Create a template as a base for our VMs
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
> # Delete the template
> @@ -651,7 +658,7 @@ class RestTests(unittest.TestCase):
> iface['mac']).read())
> self.assertEquals('default', res['network'])
> self.assertEquals(17, len(res['mac']))
> - self.assertEquals('virtio', res['model'])
> + self.assertEquals('e1000', res['model'])
>
> # attach network interface to vm
> req = json.dumps({"type": "network",
> @@ -667,12 +674,12 @@ class RestTests(unittest.TestCase):
> self.assertEquals('network', iface['type'])
>
> # update vm interface
> - req = json.dumps({"network": "default", "model": "e1000"})
> + req = json.dumps({"network": "default", "model": "virtio"})
> resp = self.request('/vms/test-vm/ifaces/%s' % iface['mac'],
> req, 'PUT')
> self.assertEquals(200, resp.status)
> update_iface = json.loads(resp.read())
> - self.assertEquals('e1000', update_iface['model'])
> + self.assertEquals(u'virtio', update_iface['model'])
> self.assertEquals('default', update_iface['network'])
>
> # detach network interface from vm
> @@ -682,7 +689,7 @@ class RestTests(unittest.TestCase):
>
> def test_vm_customise_storage(self):
> # Create a Template
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso',
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso,
> 'disks': [{'size': 1}]})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
> @@ -707,7 +714,7 @@ class RestTests(unittest.TestCase):
>
> # Test template not changed after vm customise its pool
> t = json.loads(self.request('/templates/test').read())
> - self.assertEquals(t['storagepool'], '/storagepools/default')
> + self.assertEquals(t['storagepool'], '/storagepools/default-pool')
>
> # Verify the volume was created
> vol_uri = '/storagepools/alt/storagevolumes/%s-0.img' % vm_info['uuid']
> @@ -726,35 +733,34 @@ class RestTests(unittest.TestCase):
> # Create scsi fc pool
> req = json.dumps({'name': 'scsi_fc_pool',
> 'type': 'scsi',
> - 'source': {'adapter_name': 'scsi_host3'}})
> + 'source': {'adapter_name': 'scsi_host2'}})
> resp = self.request('/storagepools', req, 'POST')
> self.assertEquals(201, resp.status)
>
> - # Create template with this pool
> - req = json.dumps({'name': 'test_fc_pool', 'cdrom': '/nonexistent.iso',
> - 'storagepool': '/storagepools/scsi_fc_pool'})
> - resp = self.request('/templates', req, 'POST')
> - self.assertEquals(201, resp.status)
> -
> # Test create vms using lun of this pool
> # activate the storage pool
> resp = self.request('/storagepools/scsi_fc_pool/activate', '{}',
> 'POST')
>
> - # Get scsi pool luns and choose one
> + # Create template fails because SCSI volume is missing
> + tmpl_params = {'name': 'test_fc_pool', 'cdrom': fake_iso,
> + 'storagepool': '/storagepools/scsi_fc_pool'}
> + req = json.dumps(tmpl_params)
> + resp = self.request('/templates', req, 'POST')
> + self.assertEquals(400, resp.status)
> +
> + # Choose SCSI volume to create template
> resp = self.request('/storagepools/scsi_fc_pool/storagevolumes')
> - luns = json.loads(resp.read())
> - lun_name = random.choice(luns).get('name')
> + lun_name = json.loads(resp.read())[0]['name']
>
> - # Create vm in scsi pool without volumes: Error
> - req = json.dumps({'template': '/templates/test_fc_pool'})
> - resp = self.request('/vms', req, 'POST')
> - self.assertEquals(400, resp.status)
> + tmpl_params['disks'] = [{'index': 0, 'volume': lun_name}]
> + req = json.dumps(tmpl_params)
> + resp = self.request('/templates', req, 'POST')
> + self.assertEquals(201, resp.status)
>
> # Create vm in scsi pool
> req = json.dumps({'name': 'test-vm',
> - 'template': '/templates/test_fc_pool',
> - 'volumes': [lun_name]})
> + 'template': '/templates/test_fc_pool'})
> resp = self.request('/vms', req, 'POST')
> self.assertEquals(201, resp.status)
>
> @@ -773,7 +779,7 @@ class RestTests(unittest.TestCase):
> self.assertEquals(204, resp.status)
>
> def test_template_customise_storage(self):
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso',
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso,
> 'disks': [{'size': 1}]})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
> @@ -825,7 +831,7 @@ class RestTests(unittest.TestCase):
>
> def test_template_customise_network(self):
> with RollbackContext() as rollback:
> - tmpl = {'name': 'test', 'cdrom': '/nonexistent.iso',
> + tmpl = {'name': 'test', 'cdrom': fake_iso,
> 'disks': [{'size': 1}]}
> req = json.dumps(tmpl)
> resp = self.request('/templates', req, 'POST')
> @@ -883,7 +889,7 @@ class RestTests(unittest.TestCase):
>
> def test_unnamed_vms(self):
> # Create a Template
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
>
> @@ -893,7 +899,7 @@ class RestTests(unittest.TestCase):
> vm = json.loads(self.request('/vms', req, 'POST').read())
> self.assertEquals('test-vm-%i' % i, vm['name'])
> count = len(json.loads(self.request('/vms').read()))
> - self.assertEquals(5, count)
> + self.assertEquals(6, count)
>
> def test_create_vm_without_template(self):
> req = json.dumps({'name': 'vm-without-template'})
> @@ -912,7 +918,7 @@ class RestTests(unittest.TestCase):
>
> def test_create_vm_with_img_based_template(self):
> resp = json.loads(
> - self.request('/storagepools/default/storagevolumes').read())
> + self.request('/storagepools/default-pool/storagevolumes').read())
> self.assertEquals(0, len(resp))
>
> # Create a Template
> @@ -927,14 +933,13 @@ class RestTests(unittest.TestCase):
>
> # Test storage volume created with backing store of base file
> resp = json.loads(
> - self.request('/storagepools/default/storagevolumes').read())
> + self.request('/storagepools/default-pool/storagevolumes').read())
> self.assertEquals(1, len(resp))
> - self.assertEquals(mock_base, resp[0]['base']['path'])
>
> def test_get_storagepools(self):
> storagepools = json.loads(self.request('/storagepools').read())
> self.assertEquals(2, len(storagepools))
> - self.assertEquals('default', storagepools[0]['name'])
> + self.assertEquals('default-pool', storagepools[0]['name'])
> self.assertEquals('active', storagepools[0]['state'])
> self.assertEquals('kimchi_isos', storagepools[1]['name'])
> self.assertEquals('kimchi-iso', storagepools[1]['type'])
> @@ -1066,12 +1071,13 @@ class RestTests(unittest.TestCase):
> self.assertEquals('/var/lib/libvirt/images/volume-1',
> storagevolume['path'])
>
> - req = json.dumps({'url': 'https://anyurl.wor.kz'})
> + url = 'https://github.com/kimchi-project/kimchi/blob/master/COPYING'
> + req = json.dumps({'url': url})
> resp = self.request('/storagepools/pool-1/storagevolumes', req, 'POST')
> self.assertEquals(202, resp.status)
> task = json.loads(resp.read())
> vol_name = task['target_uri'].split('/')[-1]
> - self.assertEquals('anyurl.wor.kz', vol_name)
> + self.assertEquals('COPYING', vol_name)
> wait_task(self._task_lookup, task['id'])
> task = json.loads(self.request('/tasks/%s' % task['id']).read())
> self.assertEquals('finished', task['status'])
> @@ -1095,13 +1101,16 @@ class RestTests(unittest.TestCase):
> cloned_vol = json.loads(resp.read())
>
> self.assertNotEquals(vol['name'], cloned_vol['name'])
> - del vol['name']
> - del cloned_vol['name']
> self.assertNotEquals(vol['path'], cloned_vol['path'])
> - del vol['path']
> - del cloned_vol['path']
> + for key in ['name', 'path', 'allocation']:
> + del vol[key]
> + del cloned_vol[key]
> +
> self.assertEquals(vol, cloned_vol)
>
> + resp = self.request('/storagepools/pool-1/deactivate', '{}', 'POST')
> + self.assertEquals(200, resp.status)
> +
> # Now remove the StoragePool from mock model
> self._delete_pool('pool-1')
>
> @@ -1138,7 +1147,7 @@ class RestTests(unittest.TestCase):
> resp = self.request(uri, req, 'POST')
> uri = '/storagepools/pool-2/storagevolumes/test-volume'
> storagevolume = json.loads(self.request(uri).read())
> - self.assertEquals(768, storagevolume['capacity'])
> + self.assertEquals(768 << 20, storagevolume['capacity'])
>
> # Wipe the storage volume
> uri = '/storagepools/pool-2/storagevolumes/test-volume/wipe'
> @@ -1152,6 +1161,9 @@ class RestTests(unittest.TestCase):
> '{}', 'DELETE')
> self.assertEquals(204, resp.status)
>
> + resp = self.request('/storagepools/pool-2/deactivate', '{}', 'POST')
> + self.assertEquals(200, resp.status)
> +
> # Now remove the StoragePool from mock model
> self._delete_pool('pool-2')
>
> @@ -1198,7 +1210,7 @@ class RestTests(unittest.TestCase):
> open('/tmp/mock.img', 'w').close()
> t = {'name': 'test_img_template', 'os_distro': 'ImagineOS',
> 'os_version': '1.0', 'memory': 1024, 'cpus': 1,
> - 'storagepool': '/storagepools/alt',
> + 'storagepool': '/storagepools/default-pool',
> 'disks': [{'base': '/tmp/mock.img'}]}
> req = json.dumps(t)
> resp = self.request('/templates', req, 'POST')
> @@ -1210,8 +1222,8 @@ class RestTests(unittest.TestCase):
> graphics = {'type': 'spice', 'listen': '127.0.0.1'}
> t = {'name': 'test', 'os_distro': 'ImagineOS',
> 'os_version': '1.0', 'memory': 1024, 'cpus': 1,
> - 'storagepool': '/storagepools/alt', 'cdrom': '/tmp/mock.iso',
> - 'graphics': graphics}
> + 'storagepool': '/storagepools/default-pool',
> + 'cdrom': '/tmp/mock.iso', 'graphics': graphics}
> req = json.dumps(t)
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(201, resp.status)
> @@ -1239,8 +1251,8 @@ class RestTests(unittest.TestCase):
> # Create a template with same name fails with 400
> t = {'name': 'test', 'os_distro': 'ImagineOS',
> 'os_version': '1.0', 'memory': 1024, 'cpus': 1,
> - 'storagepool': '/storagepools/default',
> - 'cdrom': '/nonexistent.iso'}
> + 'storagepool': '/storagepools/default-pool',
> + 'cdrom': fake_iso}
> req = json.dumps(t)
> resp = self.request('/templates', req, 'POST')
> self.assertEquals(400, resp.status)
> @@ -1388,15 +1400,15 @@ class RestTests(unittest.TestCase):
> 'DELETE')
> self.assertEquals(204, resp.status)
>
> - # Delete the storagepool
> + # Try to delete the storagepool
> + # It should fail as it is associated to a template
> resp = request(host, ssl_port, '/storagepools/test-storagepool',
> '{}', 'DELETE')
> - self.assertEquals(204, resp.status)
> + self.assertEquals(400, resp.status)
>
> # Verify the template
> res = json.loads(self.request('/templates/test').read())
> self.assertEquals(res['invalid']['cdrom'], [iso])
> - self.assertEquals(res['invalid']['storagepools'], ['test-storagepool'])
>
> # Delete the template
> resp = request(host, ssl_port, '/templates/test', '{}', 'DELETE')
> @@ -1414,15 +1426,15 @@ class RestTests(unittest.TestCase):
>
> storagevolume = json.loads(self.request(
> '/storagepools/kimchi_isos/storagevolumes/').read())[0]
> - self.assertEquals('pool-3-fedora.iso', storagevolume['name'])
> + self.assertEquals('fedora.iso', storagevolume['name'])
> self.assertEquals('iso', storagevolume['format'])
> self.assertEquals('/var/lib/libvirt/images/fedora.iso',
> storagevolume['path'])
> self.assertEquals(1024 << 20, storagevolume['capacity'])
> - self.assertEquals(1024 << 20, storagevolume['allocation'])
> - self.assertEquals('17', storagevolume['os_version'])
> - self.assertEquals('fedora', storagevolume['os_distro'])
> - self.assertEquals(True, storagevolume['bootable'])
> + self.assertEquals(0, storagevolume['allocation'])
> + self.assertEquals('unknown', storagevolume['os_version'])
> + self.assertEquals('unknown', storagevolume['os_distro'])
> + self.assertEquals(False, storagevolume['bootable'])
>
> # Create a template
> # In real model os distro/version can be omitted
> @@ -1437,8 +1449,8 @@ class RestTests(unittest.TestCase):
> # Verify the template
> t = json.loads(self.request('/templates/test').read())
> self.assertEquals('test', t['name'])
> - self.assertEquals('fedora', t['os_distro'])
> - self.assertEquals('17', t['os_version'])
> + self.assertEquals('unknown', t['os_distro'])
> + self.assertEquals('unknown', t['os_version'])
> self.assertEquals(1024, t['memory'])
>
> # Deactivate or destroy scan pool return 405
> @@ -1454,22 +1466,25 @@ class RestTests(unittest.TestCase):
> resp = self.request('/templates/%s' % t['name'], '{}', 'DELETE')
> self.assertEquals(204, resp.status)
>
> + resp = self.request('/storagepools/pool-3/deactivate', '{}', 'POST')
> + self.assertEquals(200, resp.status)
> self._delete_pool('pool-3')
>
> def test_screenshot_refresh(self):
> # Create a VM
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> resp = self.request('/templates', req, 'POST')
> req = json.dumps({'name': 'test-vm', 'template': '/templates/test'})
> - self.request('/vms', req, 'POST')
> + resp = self.request('/vms', req, 'POST')
>
> # Test screenshot for shut-off state vm
> resp = self.request('/vms/test-vm/screenshot')
> self.assertEquals(404, resp.status)
>
> # Test screenshot for running vm
> - self.request('/vms/test-vm/start', '{}', 'POST')
> + resp = self.request('/vms/test-vm/start', '{}', 'POST')
> vm = json.loads(self.request('/vms/test-vm').read())
> +
> resp = self.request(vm['screenshot'], method='HEAD')
> self.assertEquals(200, resp.status)
> self.assertTrue(resp.getheader('Content-type').startswith('image'))
> @@ -1589,9 +1604,9 @@ class RestTests(unittest.TestCase):
> return json.loads(self.request('/tasks/%s' % taskid).read())
>
> def test_tasks(self):
> - id1 = model.add_task('/tasks/1', self._async_op)
> - id2 = model.add_task('/tasks/2', self._except_op)
> - id3 = model.add_task('/tasks/3', self._intermid_op)
> + id1 = add_task('/tasks/1', self._async_op, model.objstore)
> + id2 = add_task('/tasks/2', self._except_op, model.objstore)
> + id3 = add_task('/tasks/3', self._intermid_op, model.objstore)
>
> target_uri = urllib2.quote('^/tasks/*', safe="")
> filter_data = 'status=running&target_uri=%s' % target_uri
> @@ -1621,9 +1636,9 @@ class RestTests(unittest.TestCase):
> resp = self.request('/config/capabilities').read()
> conf = json.loads(resp)
>
> - keys = ['libvirt_stream_protocols', 'qemu_stream', 'qemu_spice',
> - 'screenshot', 'system_report_tool', 'update_tool',
> - 'repo_mngt_tool', 'federation']
> + keys = [u'libvirt_stream_protocols', u'qemu_stream', u'qemu_spice',
> + u'screenshot', u'system_report_tool', u'update_tool',
> + u'repo_mngt_tool', u'federation', u'kernel_vfio']
> self.assertEquals(sorted(keys), sorted(conf.keys()))
>
> def test_peers(self):
> @@ -1803,13 +1818,10 @@ class RestTests(unittest.TestCase):
> def test_host(self):
> resp = self.request('/host').read()
> info = json.loads(resp)
> - self.assertEquals('Red Hat Enterprise Linux Server', info['os_distro'])
> - self.assertEquals('6.4', info['os_version'])
> - self.assertEquals('Santiago', info['os_codename'])
> - self.assertEquals('Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz',
> - info['cpu_model'])
> - self.assertEquals(6114058240, info['memory'])
> - self.assertEquals(4, info['cpus'])
> +
> + keys = ['os_distro', 'os_version', 'os_codename', 'cpu_model',
> + 'memory', 'cpus']
> + self.assertEquals(sorted(keys), sorted(info.keys()))
>
> def test_hoststats(self):
> stats_keys = ['cpu_utilization', 'memory', 'disk_read_rate',
> @@ -1856,14 +1868,14 @@ class RestTests(unittest.TestCase):
> resp = self.request('/tasks/' + task[u'id'], None, 'GET')
> task_info = json.loads(resp.read())
> self.assertEquals(task_info['status'], 'running')
> - time.sleep(6)
> + wait_task(self._task_lookup, task_info['id'])
> resp = self.request('/tasks/' + task[u'id'], None, 'GET')
> task_info = json.loads(resp.read())
> self.assertEquals(task_info['status'], 'finished')
> self.assertIn(u'All packages updated', task_info['message'])
>
> def test_get_param(self):
> - req = json.dumps({'name': 'test', 'cdrom': '/nonexistent.iso'})
> + req = json.dumps({'name': 'test', 'cdrom': fake_iso})
> self.request('/templates', req, 'POST')
>
> # Create a VM
> @@ -1877,13 +1889,15 @@ class RestTests(unittest.TestCase):
> resp = request(host, ssl_port, '/vms')
> self.assertEquals(200, resp.status)
> res = json.loads(resp.read())
> - self.assertEquals(2, len(res))
> + self.assertEquals(3, len(res))
>
> + # FIXME: control/base.py also allows filter by regex so it is returning
> + # 2 vms when querying for 'test-vm1': 'test' and 'test-vm1'
> resp = request(host, ssl_port, '/vms?name=test-vm1')
> self.assertEquals(200, resp.status)
> res = json.loads(resp.read())
> - self.assertEquals(1, len(res))
> - self.assertEquals('test-vm1', res[0]['name'])
> + self.assertEquals(2, len(res))
> + self.assertIn('test-vm1', [r['name'] for r in res])
>
> def test_repositories(self):
> def verify_repo(t, res):
> @@ -1897,25 +1911,6 @@ class RestTests(unittest.TestCase):
> # Already have one repo in Kimchi's system
> self.assertEquals(1, len(json.loads(resp.read())))
>
> - invalid_urls = ['www.fedora.org', # missing protocol
> - '://www.fedora.org', # missing protocol
> - 'http://www.fedora', # invalid domain name
> - 'file:///home/userdoesnotexist'] # invalid path
> -
> - # Create repositories with invalid baseurl
> - for url in invalid_urls:
> - repo = {'repo_id': 'fedora-fake', 'baseurl': url}
> - req = json.dumps(repo)
> - resp = self.request(base_uri, req, 'POST')
> - self.assertEquals(400, resp.status)
> -
> - # Create repositories with invalid mirrorlist
> - for url in invalid_urls:
> - repo = {'repo_id': 'fedora-fake', 'mirrorlist': url}
> - req = json.dumps(repo)
> - resp = self.request(base_uri, req, 'POST')
> - self.assertEquals(400, resp.status)
> -
> # Create a repository
> repo = {'repo_id': 'fedora-fake',
> 'baseurl': 'http://www.fedora.org'}
> @@ -1927,22 +1922,6 @@ class RestTests(unittest.TestCase):
> res = json.loads(self.request('%s/fedora-fake' % base_uri).read())
> verify_repo(repo, res)
>
> - # Update repositories with invalid baseurl
> - for url in invalid_urls:
> - params = {}
> - params['baseurl'] = url
> - resp = self.request('%s/fedora-fake' % base_uri,
> - json.dumps(params), 'PUT')
> - self.assertEquals(400, resp.status)
> -
> - # Update repositories with invalid mirrorlist
> - for url in invalid_urls:
> - params = {}
> - params['mirrorlist'] = url
> - resp = self.request('%s/fedora-fake' % base_uri,
> - json.dumps(params), 'PUT')
> - self.assertEquals(400, resp.status)
> -
> # Update the repository
> params = {}
> params['baseurl'] = repo['baseurl'] = 'http://www.fedoraproject.org'
> @@ -1969,7 +1948,7 @@ class RestTests(unittest.TestCase):
>
> with RollbackContext() as rollback:
> vol_path = os.path.join(paths.get_prefix(), 'COPYING')
> - url = "https://%s:%s/storagepools/default/storagevolumes" % \
> + url = "https://%s:%s/storagepools/default-pool/storagevolumes" % \
> (host, ssl_port)
>
> with open(vol_path, 'rb') as fd:
> @@ -1981,8 +1960,8 @@ class RestTests(unittest.TestCase):
> self.assertEquals(r.status_code, 202)
> task = r.json()
> wait_task(self._task_lookup, task['id'])
> - resp = self.request('/storagepools/default/storagevolumes/%s' %
> - task['target_uri'].split('/')[-1])
> + uri = '/storagepools/default-pool/storagevolumes/%s'
> + resp = self.request(uri % task['target_uri'].split('/')[-1])
> self.assertEquals(200, resp.status)
>
> # Create a file with 3M to upload
> @@ -2001,8 +1980,7 @@ class RestTests(unittest.TestCase):
> self.assertEquals(r.status_code, 202)
> task = r.json()
> wait_task(self._task_lookup, task['id'], 15)
> - resp = self.request('/storagepools/default/storagevolumes/%s' %
> - task['target_uri'].split('/')[-1])
> + resp = self.request(uri % task['target_uri'].split('/')[-1])
>
> self.assertEquals(200, resp.status)
>
More information about the Kimchi-devel
mailing list