[Kimchi-devel] [PATCH v2] AsyncTask: Improve continuous status feedback
Aline Manera
alinefm at linux.vnet.ibm.com
Thu Oct 30 11:21:26 UTC 2014
Reviewed-by: Aline Manera <alinefm at linux.vnet.ibm.com>
On 10/30/2014 02:49 AM, Zhou Zheng Sheng wrote:
> One of the advantage of AsyncTask is that the background task can call
> "_status_cb()" multiple times to report the interim progress. However
> this is not properly implemented in "_status_cb()". The correct handling
> should be as follow.
>
> cb(message, True)
> Means task finished succesfully.
>
> cb(message, False)
> Means task failed.
>
> cb(message)
> cb(message, None)
> Means task is ongoing and the invocation is to provide progress feedback.
>
> The current implementation fails to distinguish "cb(message, False)" and
> "cb(message)". This patch fixes the problem and adds a new test case.
>
> This patch also extracts "wait_task()" methods in from the tests, adding a
> regular status report to allow developer know it's waiting for some task
> but not stuck on something. There is also two methods in tests that start a
> AsyncTask but do not wait it till finish, this will interfere with other
> tests, so the patch adds the missing "wait_task()" invocation for them.
>
> v2:
> Extract the common "wait_task()" methods to avoid duplicated code.
>
> Signed-off-by: Zhou Zheng Sheng <zhshzhou at linux.vnet.ibm.com>
> ---
> src/kimchi/asynctask.py | 6 ++----
> tests/test_mockmodel.py | 2 ++
> tests/test_model.py | 36 +++++++++++++++++++++++-------------
> tests/test_rest.py | 25 +++++++++++--------------
> tests/utils.py | 15 +++++++++++++++
> 5 files changed, 53 insertions(+), 31 deletions(-)
>
> diff --git a/src/kimchi/asynctask.py b/src/kimchi/asynctask.py
> index 99e7a64..b5673b2 100644
> --- a/src/kimchi/asynctask.py
> +++ b/src/kimchi/asynctask.py
> @@ -49,10 +49,8 @@ class AsyncTask(object):
> self._save_helper()
> return
>
> - if success:
> - self.status = 'finished'
> - else:
> - self.status = 'failed'
> + if success is not None:
> + self.status = 'finished' if success else 'failed'
> self.message = message
> self._save_helper()
>
> diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
> index 7319531..eeb6715 100644
> --- a/tests/test_mockmodel.py
> +++ b/tests/test_mockmodel.py
> @@ -26,6 +26,7 @@ import unittest
>
> import kimchi.mockmodel
> from utils import get_free_port, patch_auth, request, run_server
> +from utils import wait_task
> from kimchi.control.base import Collection, Resource
>
>
> @@ -198,3 +199,4 @@ class MockModelTests(unittest.TestCase):
> task = model.host_swupdate()
> task_params = [u'id', u'message', u'status', u'target_uri']
> self.assertEquals(sorted(task_params), sorted(task.keys()))
> + wait_task(model.task_lookup, task['id'])
> diff --git a/tests/test_model.py b/tests/test_model.py
> index d9bbe9e..896540d 100644
> --- a/tests/test_model.py
> +++ b/tests/test_model.py
> @@ -43,6 +43,7 @@ from kimchi.iscsi import TargetClient
> from kimchi.model import model
> from kimchi.rollbackcontext import RollbackContext
> from kimchi.utils import add_task
> +from utils import wait_task
>
>
> invalid_repository_urls = ['www.fedora.org', # missing protocol
> @@ -127,7 +128,7 @@ class ModelTests(unittest.TestCase):
> 'format': 'qcow2'}
> task_id = inst.storagevolumes_create('default', params)['id']
> rollback.prependDefer(inst.storagevolume_delete, 'default', vol)
> - self._wait_task(inst, task_id)
> + wait_task(inst.task_lookup, task_id)
> self.assertEquals('finished', inst.task_lookup(task_id)['status'])
> vol_path = inst.storagevolume_lookup('default', vol)['path']
>
> @@ -285,8 +286,9 @@ class ModelTests(unittest.TestCase):
> 'capacity': 1024,
> 'allocation': 512,
> 'format': 'qcow2'}
> - inst.storagevolumes_create(pool, params)
> + task_id = inst.storagevolumes_create(pool, params)['id']
> rollback.prependDefer(inst.storagevolume_delete, pool, vol)
> + wait_task(inst.task_lookup, task_id)
>
> vm_name = 'kimchi-cdrom'
> params = {'name': 'test', 'disks': [], 'cdrom': self.kimchi_iso}
> @@ -558,7 +560,7 @@ class ModelTests(unittest.TestCase):
> params['name'] = vol
> task_id = inst.storagevolumes_create(pool, params)['id']
> rollback.prependDefer(inst.storagevolume_delete, pool, vol)
> - self._wait_task(inst, task_id)
> + wait_task(inst.task_lookup, task_id)
> self.assertEquals('finished', inst.task_lookup(task_id)['status'])
>
> fd, path = tempfile.mkstemp(dir=path)
> @@ -599,7 +601,7 @@ class ModelTests(unittest.TestCase):
> taskid = task_response['id']
> vol_name = task_response['target_uri'].split('/')[-1]
> self.assertEquals('COPYING', vol_name)
> - self._wait_task(inst, taskid, timeout=60)
> + wait_task(inst.task_lookup, taskid, timeout=60)
> self.assertEquals('finished', inst.task_lookup(taskid)['status'])
> vol_path = os.path.join(args['path'], vol_name)
> self.assertTrue(os.path.isfile(vol_path))
> @@ -1120,10 +1122,17 @@ class ModelTests(unittest.TestCase):
> except:
> cb("Exception raised", False)
>
> + def continuous_ops(cb, params):
> + cb("step 1 OK")
> + time.sleep(2)
> + cb("step 2 OK")
> + time.sleep(2)
> + cb("step 3 OK", params.get('result', True))
> +
> inst = model.Model('test:///default',
> objstore_loc=self.tmp_store)
> taskid = add_task('', quick_op, inst.objstore, 'Hello')
> - self._wait_task(inst, taskid)
> + wait_task(inst.task_lookup, taskid)
> self.assertEquals(1, taskid)
> self.assertEquals('finished', inst.task_lookup(taskid)['status'])
> self.assertEquals('Hello', inst.task_lookup(taskid)['message'])
> @@ -1134,16 +1143,22 @@ class ModelTests(unittest.TestCase):
> self.assertEquals(2, taskid)
> self.assertEquals('running', inst.task_lookup(taskid)['status'])
> self.assertEquals('OK', inst.task_lookup(taskid)['message'])
> - self._wait_task(inst, taskid)
> + wait_task(inst.task_lookup, taskid)
> self.assertEquals('failed', inst.task_lookup(taskid)['status'])
> self.assertEquals('It was not meant to be',
> inst.task_lookup(taskid)['message'])
> taskid = add_task('', abnormal_op, inst.objstore, {})
> - self._wait_task(inst, taskid)
> + wait_task(inst.task_lookup, taskid)
> self.assertEquals('Exception raised',
> inst.task_lookup(taskid)['message'])
> self.assertEquals('failed', inst.task_lookup(taskid)['status'])
>
> + taskid = add_task('', continuous_ops, inst.objstore,
> + {'result': True})
> + self.assertEquals('running', inst.task_lookup(taskid)['status'])
> + wait_task(inst.task_lookup, taskid, timeout=10)
> + self.assertEquals('finished', inst.task_lookup(taskid)['status'])
> +
> # This wrapper function is needed due to the new backend messaging in
> # vm model. vm_poweroff and vm_delete raise exception if vm is not found.
> # These functions are called after vm has been deleted if test finishes
> @@ -1247,7 +1262,7 @@ class ModelTests(unittest.TestCase):
> task = inst.debugreports_create({'name': reportName})
> rollback.prependDefer(inst.debugreport_delete, tmp_name)
> taskid = task['id']
> - self._wait_task(inst, taskid, timeout)
> + wait_task(inst.task_lookup, taskid, timeout)
> self.assertEquals('finished',
> inst.task_lookup(taskid)['status'],
> "It is not necessary an error. "
> @@ -1265,11 +1280,6 @@ class ModelTests(unittest.TestCase):
> if 'debugreport tool not found' not in e.message:
> raise e
>
> - def _wait_task(self, model, taskid, timeout=5):
> - for i in range(0, timeout):
> - if model.task_lookup(taskid)['status'] == 'running':
> - time.sleep(1)
> -
> def test_get_distros(self):
> inst = model.Model('test:///default',
> objstore_loc=self.tmp_store)
> diff --git a/tests/test_rest.py b/tests/test_rest.py
> index 8de0a9c..60dce2f 100644
> --- a/tests/test_rest.py
> +++ b/tests/test_rest.py
> @@ -38,7 +38,7 @@ import kimchi.server
> from kimchi.config import paths
> from kimchi.rollbackcontext import RollbackContext
> from utils import get_free_port, patch_auth, request
> -from utils import run_server
> +from utils import run_server, wait_task
>
>
> test_server = None
> @@ -1041,7 +1041,7 @@ class RestTests(unittest.TestCase):
> task = json.loads(resp.read())
> vol_name = task['target_uri'].split('/')[-1]
> self.assertEquals('anyurl.wor.kz', vol_name)
> - self._wait_task(task['id'])
> + wait_task(self._task_lookup, task['id'])
> task = json.loads(self.request('/tasks/%s' % task['id']).read())
> self.assertEquals('finished', task['status'])
> resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
> @@ -1069,7 +1069,7 @@ class RestTests(unittest.TestCase):
> req, 'POST')
> self.assertEquals(202, resp.status)
> task_id = json.loads(resp.read())['id']
> - self._wait_task(task_id)
> + wait_task(self._task_lookup, task_id)
> status = json.loads(self.request('/tasks/%s' % task_id).read())
> self.assertEquals('finished', status['status'])
>
> @@ -1531,11 +1531,8 @@ class RestTests(unittest.TestCase):
> 'DELETE')
> self.assertEquals(204, resp.status)
>
> - def _wait_task(self, taskid, timeout=5):
> - for i in range(0, timeout):
> - task = json.loads(self.request('/tasks/%s' % taskid).read())
> - if task['status'] == 'running':
> - time.sleep(1)
> + def _task_lookup(self, taskid):
> + return json.loads(self.request('/tasks/%s' % taskid).read())
>
> def test_tasks(self):
> id1 = model.add_task('/tasks/1', self._async_op)
> @@ -1550,12 +1547,12 @@ class RestTests(unittest.TestCase):
> tasks = json.loads(self.request('/tasks').read())
> tasks_ids = [int(t['id']) for t in tasks]
> self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
> - self._wait_task(id2)
> + wait_task(self._task_lookup, id2)
> foo2 = json.loads(self.request('/tasks/%s' % id2).read())
> keys = ['id', 'status', 'message', 'target_uri']
> self.assertEquals(sorted(keys), sorted(foo2.keys()))
> self.assertEquals('failed', foo2['status'])
> - self._wait_task(id3)
> + wait_task(self._task_lookup, id3)
> foo3 = json.loads(self.request('/tasks/%s' % id3).read())
> self.assertEquals('in progress', foo3['message'])
> self.assertEquals('running', foo3['status'])
> @@ -1717,7 +1714,7 @@ class RestTests(unittest.TestCase):
> task = json.loads(resp.read())
> # make sure the debugreport doesn't exist until the
> # the task is finished
> - self._wait_task(task['id'])
> + wait_task(self._task_lookup, task['id'])
> rollback.prependDefer(self._report_delete, 'report2')
> resp = request(host, ssl_port, '/debugreports/report1')
> debugreport = json.loads(resp.read())
> @@ -1736,7 +1733,7 @@ class RestTests(unittest.TestCase):
> task = json.loads(resp.read())
> # make sure the debugreport doesn't exist until the
> # the task is finished
> - self._wait_task(task['id'], 20)
> + wait_task(self._task_lookup, task['id'], 20)
> rollback.prependDefer(self._report_delete, 'report1')
> resp = request(host, ssl_port, '/debugreports/report1')
> debugreport = json.loads(resp.read())
> @@ -1928,7 +1925,7 @@ class RestTests(unittest.TestCase):
>
> self.assertEquals(r.status_code, 202)
> task = r.json()
> - self._wait_task(task['id'])
> + wait_task(self._task_lookup, task['id'])
> resp = self.request('/storagepools/default/storagevolumes/%s' %
> task['target_uri'].split('/')[-1])
> self.assertEquals(200, resp.status)
> @@ -1948,7 +1945,7 @@ class RestTests(unittest.TestCase):
>
> self.assertEquals(r.status_code, 202)
> task = r.json()
> - self._wait_task(task['id'], 15)
> + wait_task(self._task_lookup, task['id'], 15)
> resp = self.request('/storagepools/default/storagevolumes/%s' %
> task['target_uri'].split('/')[-1])
>
> diff --git a/tests/utils.py b/tests/utils.py
> index 140bb1d..9133904 100644
> --- a/tests/utils.py
> +++ b/tests/utils.py
> @@ -25,6 +25,7 @@ import json
> import os
> import socket
> import sys
> +import time
> import threading
> import unittest
>
> @@ -36,6 +37,7 @@ import kimchi.mockmodel
> import kimchi.server
> from kimchi.config import paths
> from kimchi.exception import OperationFailed
> +from kimchi.utils import kimchi_log
>
> _ports = {}
>
> @@ -195,3 +197,16 @@ def patch_auth(sudo=True):
> def normalize_xml(xml_str):
> return etree.tostring(etree.fromstring(xml_str,
> etree.XMLParser(remove_blank_text=True)))
> +
> +
> +def wait_task(task_lookup, taskid, timeout=10):
> + for i in range(0, timeout):
> + task_info = task_lookup(taskid)
> + if task_info['status'] == "running":
> + kimchi_log.info("Waiting task %s, message: %s",
> + taskid, task_info['message'])
> + time.sleep(1)
> + else:
> + return
> + kimchi_log.error("Timeout while process long-run task, "
> + "try to increase timeout value.")
More information about the Kimchi-devel
mailing list