[Kimchi-devel] [PATCH 5/5] Storage volume tests
Aline Manera
alinefm at linux.vnet.ibm.com
Thu Jan 22 14:36:53 UTC 2015
Create 2 new files: tests/test_mock_storagepool.py and tests/test_model_storagepool.py.
The first one has all the MockModel tests and the latter the Model tests.
As most of storage pool can not be tested automatically by Model, all storage
pool types are covered on MockModel tests which uses the libvirt Test driver
and then the storage volumes can be tested for each pool type.
Signed-off-by: Aline Manera <alinefm at linux.vnet.ibm.com>
---
tests/test_mock_storagevolume.py | 94 +++++++++++++++++
tests/test_model.py | 125 ++--------------------
tests/test_model_storagevolume.py | 211 ++++++++++++++++++++++++++++++++++++++
tests/test_rest.py | 138 -------------------------
4 files changed, 316 insertions(+), 252 deletions(-)
create mode 100644 tests/test_mock_storagevolume.py
create mode 100644 tests/test_model_storagevolume.py
diff --git a/tests/test_mock_storagevolume.py b/tests/test_mock_storagevolume.py
new file mode 100644
index 0000000..f59aa13
--- /dev/null
+++ b/tests/test_mock_storagevolume.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import json
+import os
+import unittest
+
+from functools import partial
+
+from kimchi.mockmodel import MockModel
+from test_model_storagevolume import _do_volume_test
+from utils import get_free_port, patch_auth, request, run_server
+
+
+model = None
+test_server = None
+host = None
+port = None
+ssl_port = None
+cherrypy_port = None
+
+
+def setUpModule():
+ global test_server, model, host, port, ssl_port, cherrypy_port
+
+ patch_auth()
+ model = MockModel('/tmp/obj-store-test')
+ host = '127.0.0.1'
+ port = get_free_port('http')
+ ssl_port = get_free_port('https')
+ cherrypy_port = get_free_port('cherrypy_port')
+ test_server = run_server(host, port, ssl_port, test_mode=True,
+ cherrypy_port=cherrypy_port, model=model)
+
+
+def tearDownModule():
+ test_server.stop()
+ os.unlink('/tmp/obj-store-test')
+
+
+class MockStorageVolumeTests(unittest.TestCase):
+ def setUp(self):
+ self.request = partial(request, host, ssl_port)
+
+ def test_storagevolume(self):
+ # MockModel always returns 2 partitions (vdx, vdz)
+ partitions = json.loads(self.request('/host/partitions').read())
+ devs = [dev['path'] for dev in partitions]
+
+ # MockModel always returns 3 FC devices
+ fc_devs = json.loads(self.request('/host/devices?_cap=fc_host').read())
+ fc_devs = [dev['name'] for dev in fc_devs]
+
+ poolDefs = [
+ {'type': 'dir', 'name': u'kīмсhīUnitTestDirPool',
+ 'path': '/tmp/kimchi-images'},
+ {'type': 'netfs', 'name': u'kīмсhīUnitTestNSFPool',
+ 'source': {'host': 'localhost',
+ 'path': '/var/lib/kimchi/nfs-pool'}},
+ {'type': 'scsi', 'name': u'kīмсhīUnitTestSCSIFCPool',
+ 'source': {'adapter_name': fc_devs[0]}},
+ {'type': 'iscsi', 'name': u'kīмсhīUnitTestISCSIPool',
+ 'source': {'host': '127.0.0.1',
+ 'target': 'iqn.2015-01.localhost.kimchiUnitTest'}},
+ {'type': 'logical', 'name': u'kīмсhīUnitTestLogicalPool',
+ 'source': {'devices': [devs[0]]}}]
+
+ for pool in poolDefs:
+ pool_name = pool['name']
+ uri = '/storagepools/%s' % pool_name.encode('utf-8')
+ req = json.dumps(pool)
+ resp = self.request('/storagepools', req, 'POST')
+ self.assertEquals(201, resp.status)
+ # activate the storage pool
+ resp = self.request(uri + '/activate', '{}', 'POST')
+ self.assertEquals(200, resp.status)
+ _do_volume_test(self, model, host, ssl_port, pool_name)
diff --git a/tests/test_model.py b/tests/test_model.py
index 24a11d0..db6abc4 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -26,7 +26,6 @@ import pwd
import re
import shutil
import socket
-import tempfile
import threading
import time
import urlparse
@@ -38,9 +37,10 @@ import iso_gen
import kimchi.objectstore
import utils
from kimchi import netinfo
-from kimchi.config import config, paths
+from kimchi.config import config
from kimchi.exception import InvalidOperation
from kimchi.exception import InvalidParameter, NotFoundError, OperationFailed
+from kimchi.model.featuretests import FeatureTests
from kimchi.model import model
from kimchi.model.libvirtconnection import LibvirtConnection
from kimchi.rollbackcontext import RollbackContext
@@ -520,13 +520,15 @@ class ModelTests(unittest.TestCase):
cdrom_info = inst.vmstorage_lookup(vm_name, cdrom_dev)
cur_cdrom_path = re.sub(":80/", '/', cdrom_info['path'])
- # As Kimchi server is not running during this test case
- # CapabilitiesModel.qemu_stream_dns will be always False
- # so we need to convert the hostname to IP
- output = urlparse.urlparse(valid_remote_iso_path)
- hostname = socket.gethostbyname(output.hostname)
- url = valid_remote_iso_path.replace(output.hostname, hostname)
- self.assertEquals(url, cur_cdrom_path)
+ # Check QEMU stream DNS to determine the cdrom path
+ qemu_stream_dns = FeatureTests.qemu_iso_stream_dns()
+ if not qemu_stream_dns:
+ output = urlparse.urlparse(valid_remote_iso_path)
+ hostname = socket.gethostbyname(output.hostname)
+ url = valid_remote_iso_path.replace(output.hostname, hostname)
+ self.assertEquals(url, cur_cdrom_path)
+ else:
+ self.assertEquals(valid_remote_iso_path, cur_cdrom_path)
@unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
def test_vm_storage_provisioning(self):
@@ -549,111 +551,6 @@ class ModelTests(unittest.TestCase):
self.assertFalse(os.access(disk_path, os.F_OK))
@unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
- def test_storagevolume(self):
- inst = model.Model(None, self.tmp_store)
-
- with RollbackContext() as rollback:
- path = '/tmp/kimchi-images'
- pool = 'test-pool'
- vol = 'test-volume.img'
- if not os.path.exists(path):
- os.mkdir(path)
-
- args = {'name': pool,
- 'path': path,
- 'type': 'dir'}
- inst.storagepools_create(args)
- rollback.prependDefer(shutil.rmtree, args['path'])
- rollback.prependDefer(inst.storagepool_delete, pool)
-
- self.assertRaises(InvalidOperation, inst.storagevolumes_get_list,
- pool)
- poolinfo = inst.storagepool_lookup(pool)
- self.assertEquals(0, poolinfo['nr_volumes'])
- # Activate the pool before adding any volume
- inst.storagepool_activate(pool)
- rollback.prependDefer(inst.storagepool_deactivate, pool)
-
- vols = inst.storagevolumes_get_list(pool)
- num = len(vols) + 2
- params = {'capacity': 1073741824, # 1 GiB
- 'allocation': 536870912, # 512 MiB
- 'format': 'raw'}
- # 'name' is required for this type of volume
- self.assertRaises(InvalidParameter, inst.storagevolumes_create,
- pool, params)
- params['name'] = vol
- task_id = inst.storagevolumes_create(pool, params)['id']
- rollback.prependDefer(inst.storagevolume_delete, pool, vol)
- inst.task_wait(task_id)
- self.assertEquals('finished', inst.task_lookup(task_id)['status'])
-
- fd, path = tempfile.mkstemp(dir=path)
- name = os.path.basename(path)
- rollback.prependDefer(inst.storagevolume_delete, pool, name)
- vols = inst.storagevolumes_get_list(pool)
- self.assertIn(name, vols)
- self.assertEquals(num, len(vols))
-
- inst.storagevolume_wipe(pool, vol)
- volinfo = inst.storagevolume_lookup(pool, vol)
- self.assertEquals(0, volinfo['allocation'])
- self.assertEquals(0, volinfo['ref_cnt'])
-
- volinfo = inst.storagevolume_lookup(pool, vol)
- # Define the size = capacity + 16 MiB
- size = volinfo['capacity'] + 16777216
- inst.storagevolume_resize(pool, vol, size)
-
- volinfo = inst.storagevolume_lookup(pool, vol)
- self.assertEquals(size, volinfo['capacity'])
- poolinfo = inst.storagepool_lookup(pool)
- self.assertEquals(len(vols), poolinfo['nr_volumes'])
-
- # download remote volume
- # 1) try an invalid URL
- params = {'url': 'http://www.invalid.url'}
- self.assertRaises(InvalidParameter, inst.storagevolumes_create,
- pool, params)
- # 2) download Kimchi's "COPYING" from Github and compare its
- # content to the corresponding local file's
- url = 'https://github.com/kimchi-project/kimchi/raw/master/COPYING'
- params = {'url': url}
- task_response = inst.storagevolumes_create(pool, params)
- rollback.prependDefer(inst.storagevolume_delete, pool,
- params['name'])
- taskid = task_response['id']
- vol_name = task_response['target_uri'].split('/')[-1]
- self.assertEquals('COPYING', vol_name)
- inst.task_wait(taskid, timeout=60)
- self.assertEquals('finished', inst.task_lookup(taskid)['status'])
- vol_path = os.path.join(args['path'], vol_name)
- self.assertTrue(os.path.isfile(vol_path))
- with open(vol_path) as vol_file:
- vol_content = vol_file.read()
- with open(os.path.join(paths.get_prefix(), 'COPYING')) as cp_file:
- cp_content = cp_file.read()
- self.assertEquals(vol_content, cp_content)
-
- # clone the volume created above
- task = inst.storagevolume_clone(pool, vol_name)
- taskid = task['id']
- cloned_vol_name = task['target_uri'].split('/')[-1]
- inst.task_wait(taskid)
- self.assertEquals('finished', inst.task_lookup(taskid)['status'])
- rollback.prependDefer(inst.storagevolume_delete, pool,
- cloned_vol_name)
-
- orig_vol = inst.storagevolume_lookup(pool, vol_name)
- cloned_vol = inst.storagevolume_lookup(pool, cloned_vol_name)
-
- self.assertNotEquals(orig_vol['path'], cloned_vol['path'])
- del orig_vol['path']
- del cloned_vol['path']
-
- self.assertEquals(orig_vol, cloned_vol)
-
- @unittest.skipUnless(utils.running_as_root(), 'Must be run as root')
def test_template_storage_customise(self):
inst = model.Model(objstore_loc=self.tmp_store)
diff --git a/tests/test_model_storagevolume.py b/tests/test_model_storagevolume.py
new file mode 100644
index 0000000..a3c3ce3
--- /dev/null
+++ b/tests/test_model_storagevolume.py
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import json
+import os
+import requests
+import unittest
+
+from functools import partial
+
+from kimchi.config import paths, READONLY_POOL_TYPE
+from kimchi.model.model import Model
+from kimchi.mockmodel import MockModel
+from kimchi.rollbackcontext import RollbackContext
+from utils import fake_auth_header, get_free_port, patch_auth, request
+from utils import rollback_wrapper, run_server, wait_task
+
+
+model = None
+test_server = None
+host = None
+port = None
+ssl_port = None
+cherrypy_port = None
+
+
+def setUpModule():
+ global test_server, model, host, port, ssl_port, cherrypy_port
+
+ patch_auth()
+ model = Model(None, '/tmp/obj-store-test')
+ host = '127.0.0.1'
+ port = get_free_port('http')
+ ssl_port = get_free_port('https')
+ cherrypy_port = get_free_port('cherrypy_port')
+ test_server = run_server(host, port, ssl_port, test_mode=True,
+ cherrypy_port=cherrypy_port, model=model)
+
+
+def tearDownModule():
+ test_server.stop()
+ os.unlink('/tmp/obj-store-test')
+
+
+def _do_volume_test(self, model, host, ssl_port, pool_name):
+ def _task_lookup(taskid):
+ return json.loads(self.request('/tasks/%s' % taskid).read())
+
+ uri = '/storagepools/%s/storagevolumes' % pool_name.encode('utf-8')
+ resp = self.request(uri)
+ self.assertEquals(200, resp.status)
+
+ resp = self.request('/storagepools/%s' % pool_name.encode('utf-8'))
+ pool_info = json.loads(resp.read())
+ with RollbackContext() as rollback:
+ # Create storage volume with 'capacity'
+ vol = 'test-volume'
+ vol_uri = uri + '/' + vol
+ req = json.dumps({'name': vol, 'format': 'raw',
+ 'capacity': 1073741824}) # 1 GiB
+ resp = self.request(uri, req, 'POST')
+ if pool_info['type'] in READONLY_POOL_TYPE:
+ self.assertEquals(400, resp.status)
+ else:
+ rollback.prependDefer(rollback_wrapper, model.storagevolume_delete,
+ pool_name, vol)
+ self.assertEquals(202, resp.status)
+ task_id = json.loads(resp.read())['id']
+ wait_task(_task_lookup, task_id)
+ status = json.loads(self.request('/tasks/%s' % task_id).read())
+ self.assertEquals('finished', status['status'])
+ vol_info = json.loads(self.request(vol_uri).read())
+ vol_info['name'] = vol
+ vol_info['format'] = 'raw'
+ vol_info['capacity'] = 1073741824
+
+ # Resize the storage volume: increase its capacity to 2 GiB
+ req = json.dumps({'size': 2147483648}) # 2 GiB
+ resp = self.request(vol_uri + '/resize', req, 'POST')
+ self.assertEquals(200, resp.status)
+ storagevolume = json.loads(self.request(vol_uri).read())
+ self.assertEquals(2147483648, storagevolume['capacity'])
+
+ # Resize the storage volume: decrease its capacity to 512 MiB
+ # FIXME: Due a libvirt bug it is not possible to decrease the
+ # volume capacity
+ # For reference:
+ # - https://bugzilla.redhat.com/show_bug.cgi?id=1021802
+ req = json.dumps({'size': 536870912}) # 512 MiB
+ resp = self.request(vol_uri + '/resize', req, 'POST')
+ # It is only possible when using MockModel
+ if isinstance(model, MockModel):
+ self.assertEquals(200, resp.status)
+ storagevolume = json.loads(self.request(vol_uri).read())
+ self.assertEquals(536870912, storagevolume['capacity'])
+ else:
+ self.assertEquals(500, resp.status)
+
+ # Wipe the storage volume
+ resp = self.request(vol_uri + '/wipe', '{}', 'POST')
+ self.assertEquals(200, resp.status)
+ storagevolume = json.loads(self.request(vol_uri).read())
+ self.assertEquals(0, storagevolume['allocation'])
+
+ # Clone the storage volume
+ vol_info = json.loads(self.request(vol_uri).read())
+ resp = self.request(vol_uri + '/clone', '{}', 'POST')
+ self.assertEquals(202, resp.status)
+ task = json.loads(resp.read())
+ cloned_vol_name = task['target_uri'].split('/')[-1]
+ rollback.prependDefer(model.storagevolume_delete, pool_name,
+ cloned_vol_name)
+ wait_task(_task_lookup, task['id'])
+ task = json.loads(self.request('/tasks/%s' % task['id']).read())
+ self.assertEquals('finished', task['status'])
+ resp = self.request(uri + '/' + cloned_vol_name.encode('utf-8'))
+
+ self.assertEquals(200, resp.status)
+ cloned_vol = json.loads(resp.read())
+
+ self.assertNotEquals(vol_info['name'], cloned_vol['name'])
+ self.assertNotEquals(vol_info['path'], cloned_vol['path'])
+ for key in ['name', 'path', 'allocation']:
+ del vol_info[key]
+ del cloned_vol[key]
+
+ self.assertEquals(vol_info, cloned_vol)
+
+ # Delete the storage volume
+ resp = self.request(vol_uri, '{}', 'DELETE')
+ self.assertEquals(204, resp.status)
+ resp = self.request(vol_uri)
+ self.assertEquals(404, resp.status)
+
+ # Create storage volume with 'file'
+ filepath = os.path.join(paths.get_prefix(), 'COPYING.LGPL')
+ url = 'https://%s:%s' % (host, ssl_port) + uri
+ with open(filepath, 'rb') as fd:
+ r = requests.post(url, files={'file': fd},
+ verify=False,
+ headers=fake_auth_header())
+
+ if pool_info['type'] in READONLY_POOL_TYPE:
+ self.assertEquals(r.status_code, 400)
+ else:
+ rollback.prependDefer(model.storagevolume_delete, pool_name,
+ 'COPYING.LGPL')
+ self.assertEquals(r.status_code, 202)
+ task = r.json()
+ wait_task(_task_lookup, task['id'])
+ resp = self.request(uri + '/COPYING.LGPL')
+ self.assertEquals(200, resp.status)
+
+ # Create storage volume with 'url'
+ url = 'https://github.com/kimchi-project/kimchi/raw/master/COPYING'
+ req = json.dumps({'url': url})
+ resp = self.request(uri, req, 'POST')
+
+ if pool_info['type'] in READONLY_POOL_TYPE:
+ self.assertEquals(400, resp.status)
+ else:
+ rollback.prependDefer(model.storagevolume_delete, pool_name,
+ 'COPYING')
+ self.assertEquals(202, resp.status)
+ task = json.loads(resp.read())
+ wait_task(_task_lookup, task['id'])
+ resp = self.request(uri + '/COPYING')
+ self.assertEquals(200, resp.status)
+
+
+class StorageVolumeTests(unittest.TestCase):
+ def setUp(self):
+ self.request = partial(request, host, ssl_port)
+
+ def test_get_storagevolume(self):
+ uri = '/storagepools/default/storagevolumes'
+ resp = self.request(uri)
+ self.assertEquals(200, resp.status)
+
+ keys = [u'name', u'type', u'capacity', u'allocation', u'path',
+ u'ref_cnt', u'format']
+ for vol in json.loads(resp.read()):
+ resp = self.request(uri + '/' + vol['name'])
+ self.assertEquals(200, resp.status)
+
+ all_keys = keys[:]
+ vol_info = json.loads(resp.read())
+ if vol_info['format'] == 'iso':
+ all_keys.extend([u'os_distro', u'os_version', u'bootable'])
+
+ self.assertEquals(sorted(all_keys), sorted(vol_info.keys()))
+
+ def test_storagevolume_action(self):
+ _do_volume_test(self, model, host, ssl_port, 'default')
diff --git a/tests/test_rest.py b/tests/test_rest.py
index fe6020f..812afb7 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -1009,144 +1009,6 @@ class RestTests(unittest.TestCase):
self.request('/storagepools/default-pool/storagevolumes').read())
self.assertEquals(1, len(resp))
- def test_get_storagevolumes(self):
- # Now add a StoragePool to the mock model
- self._create_pool('pool-1')
-
- # Test storagevolumes can't be listed with inactive pool
- resp = self.request('/storagepools/pool-1/storagevolumes')
- self.assertEquals(400, resp.status)
-
- resp = self.request('/storagepools/pool-1/activate', '{}', 'POST')
- self.assertEquals(200, resp.status)
- nr_vols = json.loads(
- self.request('/storagepools/pool-1').read())['nr_volumes']
- self.assertEquals(0, nr_vols)
-
- # Now add a couple of storage volumes to the mock model
- for i in xrange(5):
- name = 'volume-%i' % i
- req = json.dumps({'name': name,
- 'capacity': 1024,
- 'allocation': 512,
- 'type': 'file',
- 'format': 'raw'})
- resp = self.request('/storagepools/pool-1/storagevolumes',
- req, 'POST')
- self.assertEquals(202, resp.status)
-
- time.sleep(5)
- nr_vols = json.loads(
- self.request('/storagepools/pool-1').read())['nr_volumes']
- self.assertEquals(5, nr_vols)
- resp = self.request('/storagepools/pool-1/storagevolumes')
- storagevolumes = json.loads(resp.read())
- self.assertEquals(5, len(storagevolumes))
-
- resp = self.request('/storagepools/pool-1/storagevolumes/volume-1')
- storagevolume = json.loads(resp.read())
- self.assertEquals('volume-1', storagevolume['name'])
- self.assertEquals('raw', storagevolume['format'])
- self.assertEquals(0, storagevolume['ref_cnt'])
- self.assertEquals('/var/lib/libvirt/images/volume-1',
- storagevolume['path'])
-
- url = 'https://github.com/kimchi-project/kimchi/blob/master/COPYING'
- req = json.dumps({'url': url})
- resp = self.request('/storagepools/pool-1/storagevolumes', req, 'POST')
- self.assertEquals(202, resp.status)
- task = json.loads(resp.read())
- vol_name = task['target_uri'].split('/')[-1]
- self.assertEquals('COPYING', vol_name)
- wait_task(self._task_lookup, task['id'])
- task = json.loads(self.request('/tasks/%s' % task['id']).read())
- self.assertEquals('finished', task['status'])
- resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
- vol_name, '{}', 'GET')
- self.assertEquals(200, resp.status)
- vol = json.loads(resp.read())
-
- # clone the volume created above
- resp = self.request('/storagepools/pool-1/storagevolumes/%s/clone' %
- vol_name, {}, 'POST')
- self.assertEquals(202, resp.status)
- task = json.loads(resp.read())
- cloned_vol_name = task['target_uri'].split('/')[-1]
- wait_task(self._task_lookup, task['id'])
- task = json.loads(self.request('/tasks/%s' % task['id']).read())
- self.assertEquals('finished', task['status'])
- resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
- cloned_vol_name, '{}', 'GET')
- self.assertEquals(200, resp.status)
- cloned_vol = json.loads(resp.read())
-
- self.assertNotEquals(vol['name'], cloned_vol['name'])
- self.assertNotEquals(vol['path'], cloned_vol['path'])
- for key in ['name', 'path', 'allocation']:
- del vol[key]
- del cloned_vol[key]
-
- self.assertEquals(vol, cloned_vol)
-
- resp = self.request('/storagepools/pool-1/deactivate', '{}', 'POST')
- self.assertEquals(200, resp.status)
-
- # Now remove the StoragePool from mock model
- self._delete_pool('pool-1')
-
- def test_storagevolume_action(self):
- self._create_pool('pool-2')
-
- # Create a storage volume can only be successful for active pool
- req = json.dumps({'name': 'test-volume',
- 'capacity': 1073741824, # 1 GiB
- 'allocation': 536870912, # 512 MiB
- 'type': 'disk',
- 'format': 'raw'})
- resp = self.request('/storagepools/pool-2/storagevolumes/',
- req, 'POST')
- self.assertEquals(400, resp.status)
- resp = self.request('/storagepools/pool-2/activate', '{}', 'POST')
- self.assertEquals(200, resp.status)
- resp = self.request('/storagepools/pool-2/storagevolumes/',
- req, 'POST')
- self.assertEquals(202, resp.status)
- task_id = json.loads(resp.read())['id']
- wait_task(self._task_lookup, task_id)
- status = json.loads(self.request('/tasks/%s' % task_id).read())
- self.assertEquals('finished', status['status'])
-
- # Verify the storage volume
- resp = self.request('/storagepools/pool-2/storagevolumes/test-volume')
- storagevolume = json.loads(resp.read())
- self.assertEquals('raw', storagevolume['format'])
-
- # Resize the storage volume
- req = json.dumps({'size': 805306368}) # 768 MiB
- uri = '/storagepools/pool-2/storagevolumes/test-volume/resize'
- resp = self.request(uri, req, 'POST')
- uri = '/storagepools/pool-2/storagevolumes/test-volume'
- storagevolume = json.loads(self.request(uri).read())
- self.assertEquals(805306368, storagevolume['capacity']) # 768 MiB
-
- # Wipe the storage volume
- uri = '/storagepools/pool-2/storagevolumes/test-volume/wipe'
- resp = self.request(uri, '{}', 'POST')
- uri = '/storagepools/pool-2/storagevolumes/test-volume'
- storagevolume = json.loads(self.request(uri).read())
- self.assertEquals(0, storagevolume['allocation'])
-
- # Delete the storage volume
- resp = self.request('/storagepools/pool-2/storagevolumes/test-volume',
- '{}', 'DELETE')
- self.assertEquals(204, resp.status)
-
- resp = self.request('/storagepools/pool-2/deactivate', '{}', 'POST')
- self.assertEquals(200, resp.status)
-
- # Now remove the StoragePool from mock model
- self._delete_pool('pool-2')
-
def _create_pool(self, name):
req = json.dumps({'name': name,
'capacity': 10240,
--
2.1.0
More information about the Kimchi-devel
mailing list