[Kimchi-devel] [PATCH v2] AsyncTask: Improve continuous status feedback
Zhou Zheng Sheng
zhshzhou at linux.vnet.ibm.com
Thu Oct 30 04:49:08 UTC 2014
One of the advantage of AsyncTask is that the background task can call
"_status_cb()" multiple times to report the interim progress. However
this is not properly implemented in "_status_cb()". The correct handling
should be as follow.
cb(message, True)
Means task finished succesfully.
cb(message, False)
Means task failed.
cb(message)
cb(message, None)
Means task is ongoing and the invocation is to provide progress feedback.
The current implementation fails to distinguish "cb(message, False)" and
"cb(message)". This patch fixes the problem and adds a new test case.
This patch also extracts "wait_task()" methods in from the tests, adding a
regular status report to allow developer know it's waiting for some task
but not stuck on something. There is also two methods in tests that start a
AsyncTask but do not wait it till finish, this will interfere with other
tests, so the patch adds the missing "wait_task()" invocation for them.
v2:
Extract the common "wait_task()" methods to avoid duplicated code.
Signed-off-by: Zhou Zheng Sheng <zhshzhou at linux.vnet.ibm.com>
---
src/kimchi/asynctask.py | 6 ++----
tests/test_mockmodel.py | 2 ++
tests/test_model.py | 36 +++++++++++++++++++++++-------------
tests/test_rest.py | 25 +++++++++++--------------
tests/utils.py | 15 +++++++++++++++
5 files changed, 53 insertions(+), 31 deletions(-)
diff --git a/src/kimchi/asynctask.py b/src/kimchi/asynctask.py
index 99e7a64..b5673b2 100644
--- a/src/kimchi/asynctask.py
+++ b/src/kimchi/asynctask.py
@@ -49,10 +49,8 @@ class AsyncTask(object):
self._save_helper()
return
- if success:
- self.status = 'finished'
- else:
- self.status = 'failed'
+ if success is not None:
+ self.status = 'finished' if success else 'failed'
self.message = message
self._save_helper()
diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
index 7319531..eeb6715 100644
--- a/tests/test_mockmodel.py
+++ b/tests/test_mockmodel.py
@@ -26,6 +26,7 @@ import unittest
import kimchi.mockmodel
from utils import get_free_port, patch_auth, request, run_server
+from utils import wait_task
from kimchi.control.base import Collection, Resource
@@ -198,3 +199,4 @@ class MockModelTests(unittest.TestCase):
task = model.host_swupdate()
task_params = [u'id', u'message', u'status', u'target_uri']
self.assertEquals(sorted(task_params), sorted(task.keys()))
+ wait_task(model.task_lookup, task['id'])
diff --git a/tests/test_model.py b/tests/test_model.py
index d9bbe9e..896540d 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -43,6 +43,7 @@ from kimchi.iscsi import TargetClient
from kimchi.model import model
from kimchi.rollbackcontext import RollbackContext
from kimchi.utils import add_task
+from utils import wait_task
invalid_repository_urls = ['www.fedora.org', # missing protocol
@@ -127,7 +128,7 @@ class ModelTests(unittest.TestCase):
'format': 'qcow2'}
task_id = inst.storagevolumes_create('default', params)['id']
rollback.prependDefer(inst.storagevolume_delete, 'default', vol)
- self._wait_task(inst, task_id)
+ wait_task(inst.task_lookup, task_id)
self.assertEquals('finished', inst.task_lookup(task_id)['status'])
vol_path = inst.storagevolume_lookup('default', vol)['path']
@@ -285,8 +286,9 @@ class ModelTests(unittest.TestCase):
'capacity': 1024,
'allocation': 512,
'format': 'qcow2'}
- inst.storagevolumes_create(pool, params)
+ task_id = inst.storagevolumes_create(pool, params)['id']
rollback.prependDefer(inst.storagevolume_delete, pool, vol)
+ wait_task(inst.task_lookup, task_id)
vm_name = 'kimchi-cdrom'
params = {'name': 'test', 'disks': [], 'cdrom': self.kimchi_iso}
@@ -558,7 +560,7 @@ class ModelTests(unittest.TestCase):
params['name'] = vol
task_id = inst.storagevolumes_create(pool, params)['id']
rollback.prependDefer(inst.storagevolume_delete, pool, vol)
- self._wait_task(inst, task_id)
+ wait_task(inst.task_lookup, task_id)
self.assertEquals('finished', inst.task_lookup(task_id)['status'])
fd, path = tempfile.mkstemp(dir=path)
@@ -599,7 +601,7 @@ class ModelTests(unittest.TestCase):
taskid = task_response['id']
vol_name = task_response['target_uri'].split('/')[-1]
self.assertEquals('COPYING', vol_name)
- self._wait_task(inst, taskid, timeout=60)
+ wait_task(inst.task_lookup, taskid, timeout=60)
self.assertEquals('finished', inst.task_lookup(taskid)['status'])
vol_path = os.path.join(args['path'], vol_name)
self.assertTrue(os.path.isfile(vol_path))
@@ -1120,10 +1122,17 @@ class ModelTests(unittest.TestCase):
except:
cb("Exception raised", False)
+ def continuous_ops(cb, params):
+ cb("step 1 OK")
+ time.sleep(2)
+ cb("step 2 OK")
+ time.sleep(2)
+ cb("step 3 OK", params.get('result', True))
+
inst = model.Model('test:///default',
objstore_loc=self.tmp_store)
taskid = add_task('', quick_op, inst.objstore, 'Hello')
- self._wait_task(inst, taskid)
+ wait_task(inst.task_lookup, taskid)
self.assertEquals(1, taskid)
self.assertEquals('finished', inst.task_lookup(taskid)['status'])
self.assertEquals('Hello', inst.task_lookup(taskid)['message'])
@@ -1134,16 +1143,22 @@ class ModelTests(unittest.TestCase):
self.assertEquals(2, taskid)
self.assertEquals('running', inst.task_lookup(taskid)['status'])
self.assertEquals('OK', inst.task_lookup(taskid)['message'])
- self._wait_task(inst, taskid)
+ wait_task(inst.task_lookup, taskid)
self.assertEquals('failed', inst.task_lookup(taskid)['status'])
self.assertEquals('It was not meant to be',
inst.task_lookup(taskid)['message'])
taskid = add_task('', abnormal_op, inst.objstore, {})
- self._wait_task(inst, taskid)
+ wait_task(inst.task_lookup, taskid)
self.assertEquals('Exception raised',
inst.task_lookup(taskid)['message'])
self.assertEquals('failed', inst.task_lookup(taskid)['status'])
+ taskid = add_task('', continuous_ops, inst.objstore,
+ {'result': True})
+ self.assertEquals('running', inst.task_lookup(taskid)['status'])
+ wait_task(inst.task_lookup, taskid, timeout=10)
+ self.assertEquals('finished', inst.task_lookup(taskid)['status'])
+
# This wrapper function is needed due to the new backend messaging in
# vm model. vm_poweroff and vm_delete raise exception if vm is not found.
# These functions are called after vm has been deleted if test finishes
@@ -1247,7 +1262,7 @@ class ModelTests(unittest.TestCase):
task = inst.debugreports_create({'name': reportName})
rollback.prependDefer(inst.debugreport_delete, tmp_name)
taskid = task['id']
- self._wait_task(inst, taskid, timeout)
+ wait_task(inst.task_lookup, taskid, timeout)
self.assertEquals('finished',
inst.task_lookup(taskid)['status'],
"It is not necessary an error. "
@@ -1265,11 +1280,6 @@ class ModelTests(unittest.TestCase):
if 'debugreport tool not found' not in e.message:
raise e
- def _wait_task(self, model, taskid, timeout=5):
- for i in range(0, timeout):
- if model.task_lookup(taskid)['status'] == 'running':
- time.sleep(1)
-
def test_get_distros(self):
inst = model.Model('test:///default',
objstore_loc=self.tmp_store)
diff --git a/tests/test_rest.py b/tests/test_rest.py
index 8de0a9c..60dce2f 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -38,7 +38,7 @@ import kimchi.server
from kimchi.config import paths
from kimchi.rollbackcontext import RollbackContext
from utils import get_free_port, patch_auth, request
-from utils import run_server
+from utils import run_server, wait_task
test_server = None
@@ -1041,7 +1041,7 @@ class RestTests(unittest.TestCase):
task = json.loads(resp.read())
vol_name = task['target_uri'].split('/')[-1]
self.assertEquals('anyurl.wor.kz', vol_name)
- self._wait_task(task['id'])
+ wait_task(self._task_lookup, task['id'])
task = json.loads(self.request('/tasks/%s' % task['id']).read())
self.assertEquals('finished', task['status'])
resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
@@ -1069,7 +1069,7 @@ class RestTests(unittest.TestCase):
req, 'POST')
self.assertEquals(202, resp.status)
task_id = json.loads(resp.read())['id']
- self._wait_task(task_id)
+ wait_task(self._task_lookup, task_id)
status = json.loads(self.request('/tasks/%s' % task_id).read())
self.assertEquals('finished', status['status'])
@@ -1531,11 +1531,8 @@ class RestTests(unittest.TestCase):
'DELETE')
self.assertEquals(204, resp.status)
- def _wait_task(self, taskid, timeout=5):
- for i in range(0, timeout):
- task = json.loads(self.request('/tasks/%s' % taskid).read())
- if task['status'] == 'running':
- time.sleep(1)
+ def _task_lookup(self, taskid):
+ return json.loads(self.request('/tasks/%s' % taskid).read())
def test_tasks(self):
id1 = model.add_task('/tasks/1', self._async_op)
@@ -1550,12 +1547,12 @@ class RestTests(unittest.TestCase):
tasks = json.loads(self.request('/tasks').read())
tasks_ids = [int(t['id']) for t in tasks]
self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
- self._wait_task(id2)
+ wait_task(self._task_lookup, id2)
foo2 = json.loads(self.request('/tasks/%s' % id2).read())
keys = ['id', 'status', 'message', 'target_uri']
self.assertEquals(sorted(keys), sorted(foo2.keys()))
self.assertEquals('failed', foo2['status'])
- self._wait_task(id3)
+ wait_task(self._task_lookup, id3)
foo3 = json.loads(self.request('/tasks/%s' % id3).read())
self.assertEquals('in progress', foo3['message'])
self.assertEquals('running', foo3['status'])
@@ -1717,7 +1714,7 @@ class RestTests(unittest.TestCase):
task = json.loads(resp.read())
# make sure the debugreport doesn't exist until the
# the task is finished
- self._wait_task(task['id'])
+ wait_task(self._task_lookup, task['id'])
rollback.prependDefer(self._report_delete, 'report2')
resp = request(host, ssl_port, '/debugreports/report1')
debugreport = json.loads(resp.read())
@@ -1736,7 +1733,7 @@ class RestTests(unittest.TestCase):
task = json.loads(resp.read())
# make sure the debugreport doesn't exist until the
# the task is finished
- self._wait_task(task['id'], 20)
+ wait_task(self._task_lookup, task['id'], 20)
rollback.prependDefer(self._report_delete, 'report1')
resp = request(host, ssl_port, '/debugreports/report1')
debugreport = json.loads(resp.read())
@@ -1928,7 +1925,7 @@ class RestTests(unittest.TestCase):
self.assertEquals(r.status_code, 202)
task = r.json()
- self._wait_task(task['id'])
+ wait_task(self._task_lookup, task['id'])
resp = self.request('/storagepools/default/storagevolumes/%s' %
task['target_uri'].split('/')[-1])
self.assertEquals(200, resp.status)
@@ -1948,7 +1945,7 @@ class RestTests(unittest.TestCase):
self.assertEquals(r.status_code, 202)
task = r.json()
- self._wait_task(task['id'], 15)
+ wait_task(self._task_lookup, task['id'], 15)
resp = self.request('/storagepools/default/storagevolumes/%s' %
task['target_uri'].split('/')[-1])
diff --git a/tests/utils.py b/tests/utils.py
index 140bb1d..9133904 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -25,6 +25,7 @@ import json
import os
import socket
import sys
+import time
import threading
import unittest
@@ -36,6 +37,7 @@ import kimchi.mockmodel
import kimchi.server
from kimchi.config import paths
from kimchi.exception import OperationFailed
+from kimchi.utils import kimchi_log
_ports = {}
@@ -195,3 +197,16 @@ def patch_auth(sudo=True):
def normalize_xml(xml_str):
return etree.tostring(etree.fromstring(xml_str,
etree.XMLParser(remove_blank_text=True)))
+
+
+def wait_task(task_lookup, taskid, timeout=10):
+ for i in range(0, timeout):
+ task_info = task_lookup(taskid)
+ if task_info['status'] == "running":
+ kimchi_log.info("Waiting task %s, message: %s",
+ taskid, task_info['message'])
+ time.sleep(1)
+ else:
+ return
+ kimchi_log.error("Timeout while process long-run task, "
+ "try to increase timeout value.")
--
1.9.3
More information about the Kimchi-devel
mailing list