[Kimchi-devel] [PATCH v2] AsyncTask: Improve continuous status feedback

Aline Manera alinefm at linux.vnet.ibm.com
Thu Oct 30 11:21:26 UTC 2014


Reviewed-by: Aline Manera <alinefm at linux.vnet.ibm.com>

On 10/30/2014 02:49 AM, Zhou Zheng Sheng wrote:
> One of the advantage of AsyncTask is that the background task can call
> "_status_cb()" multiple times to report the interim progress. However
> this is not properly implemented in "_status_cb()". The correct handling
> should be as follow.
>
> cb(message, True)
>    Means task finished succesfully.
>
> cb(message, False)
>    Means task failed.
>
> cb(message)
> cb(message, None)
>    Means task is ongoing and the invocation is to provide progress feedback.
>
> The current implementation fails to distinguish "cb(message, False)" and
> "cb(message)". This patch fixes the problem and adds a new test case.
>
> This patch also extracts "wait_task()" methods in from the tests, adding a
> regular status report to allow developer know it's waiting for some task
> but not stuck on something. There is also two methods in tests that start a
> AsyncTask but do not wait it till finish, this will interfere with other
> tests, so the patch adds the missing "wait_task()" invocation for them.
>
> v2:
>    Extract the common "wait_task()" methods to avoid duplicated code.
>
> Signed-off-by: Zhou Zheng Sheng <zhshzhou at linux.vnet.ibm.com>
> ---
>   src/kimchi/asynctask.py |  6 ++----
>   tests/test_mockmodel.py |  2 ++
>   tests/test_model.py     | 36 +++++++++++++++++++++++-------------
>   tests/test_rest.py      | 25 +++++++++++--------------
>   tests/utils.py          | 15 +++++++++++++++
>   5 files changed, 53 insertions(+), 31 deletions(-)
>
> diff --git a/src/kimchi/asynctask.py b/src/kimchi/asynctask.py
> index 99e7a64..b5673b2 100644
> --- a/src/kimchi/asynctask.py
> +++ b/src/kimchi/asynctask.py
> @@ -49,10 +49,8 @@ class AsyncTask(object):
>               self._save_helper()
>               return
>
> -        if success:
> -            self.status = 'finished'
> -        else:
> -            self.status = 'failed'
> +        if success is not None:
> +            self.status = 'finished' if success else 'failed'
>           self.message = message
>           self._save_helper()
>
> diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
> index 7319531..eeb6715 100644
> --- a/tests/test_mockmodel.py
> +++ b/tests/test_mockmodel.py
> @@ -26,6 +26,7 @@ import unittest
>
>   import kimchi.mockmodel
>   from utils import get_free_port, patch_auth, request, run_server
> +from utils import wait_task
>   from kimchi.control.base import Collection, Resource
>
>
> @@ -198,3 +199,4 @@ class MockModelTests(unittest.TestCase):
>           task = model.host_swupdate()
>           task_params = [u'id', u'message', u'status', u'target_uri']
>           self.assertEquals(sorted(task_params), sorted(task.keys()))
> +        wait_task(model.task_lookup, task['id'])
> diff --git a/tests/test_model.py b/tests/test_model.py
> index d9bbe9e..896540d 100644
> --- a/tests/test_model.py
> +++ b/tests/test_model.py
> @@ -43,6 +43,7 @@ from kimchi.iscsi import TargetClient
>   from kimchi.model import model
>   from kimchi.rollbackcontext import RollbackContext
>   from kimchi.utils import add_task
> +from utils import wait_task
>
>
>   invalid_repository_urls = ['www.fedora.org',       # missing protocol
> @@ -127,7 +128,7 @@ class ModelTests(unittest.TestCase):
>                         'format': 'qcow2'}
>               task_id = inst.storagevolumes_create('default', params)['id']
>               rollback.prependDefer(inst.storagevolume_delete, 'default', vol)
> -            self._wait_task(inst, task_id)
> +            wait_task(inst.task_lookup, task_id)
>               self.assertEquals('finished', inst.task_lookup(task_id)['status'])
>               vol_path = inst.storagevolume_lookup('default', vol)['path']
>
> @@ -285,8 +286,9 @@ class ModelTests(unittest.TestCase):
>                         'capacity': 1024,
>                         'allocation': 512,
>                         'format': 'qcow2'}
> -            inst.storagevolumes_create(pool, params)
> +            task_id = inst.storagevolumes_create(pool, params)['id']
>               rollback.prependDefer(inst.storagevolume_delete, pool, vol)
> +            wait_task(inst.task_lookup, task_id)
>
>               vm_name = 'kimchi-cdrom'
>               params = {'name': 'test', 'disks': [], 'cdrom': self.kimchi_iso}
> @@ -558,7 +560,7 @@ class ModelTests(unittest.TestCase):
>               params['name'] = vol
>               task_id = inst.storagevolumes_create(pool, params)['id']
>               rollback.prependDefer(inst.storagevolume_delete, pool, vol)
> -            self._wait_task(inst, task_id)
> +            wait_task(inst.task_lookup, task_id)
>               self.assertEquals('finished', inst.task_lookup(task_id)['status'])
>
>               fd, path = tempfile.mkstemp(dir=path)
> @@ -599,7 +601,7 @@ class ModelTests(unittest.TestCase):
>               taskid = task_response['id']
>               vol_name = task_response['target_uri'].split('/')[-1]
>               self.assertEquals('COPYING', vol_name)
> -            self._wait_task(inst, taskid, timeout=60)
> +            wait_task(inst.task_lookup, taskid, timeout=60)
>               self.assertEquals('finished', inst.task_lookup(taskid)['status'])
>               vol_path = os.path.join(args['path'], vol_name)
>               self.assertTrue(os.path.isfile(vol_path))
> @@ -1120,10 +1122,17 @@ class ModelTests(unittest.TestCase):
>               except:
>                   cb("Exception raised", False)
>
> +        def continuous_ops(cb, params):
> +            cb("step 1 OK")
> +            time.sleep(2)
> +            cb("step 2 OK")
> +            time.sleep(2)
> +            cb("step 3 OK", params.get('result', True))
> +
>           inst = model.Model('test:///default',
>                              objstore_loc=self.tmp_store)
>           taskid = add_task('', quick_op, inst.objstore, 'Hello')
> -        self._wait_task(inst, taskid)
> +        wait_task(inst.task_lookup, taskid)
>           self.assertEquals(1, taskid)
>           self.assertEquals('finished', inst.task_lookup(taskid)['status'])
>           self.assertEquals('Hello', inst.task_lookup(taskid)['message'])
> @@ -1134,16 +1143,22 @@ class ModelTests(unittest.TestCase):
>           self.assertEquals(2, taskid)
>           self.assertEquals('running', inst.task_lookup(taskid)['status'])
>           self.assertEquals('OK', inst.task_lookup(taskid)['message'])
> -        self._wait_task(inst, taskid)
> +        wait_task(inst.task_lookup, taskid)
>           self.assertEquals('failed', inst.task_lookup(taskid)['status'])
>           self.assertEquals('It was not meant to be',
>                             inst.task_lookup(taskid)['message'])
>           taskid = add_task('', abnormal_op, inst.objstore, {})
> -        self._wait_task(inst, taskid)
> +        wait_task(inst.task_lookup, taskid)
>           self.assertEquals('Exception raised',
>                             inst.task_lookup(taskid)['message'])
>           self.assertEquals('failed', inst.task_lookup(taskid)['status'])
>
> +        taskid = add_task('', continuous_ops, inst.objstore,
> +                          {'result': True})
> +        self.assertEquals('running', inst.task_lookup(taskid)['status'])
> +        wait_task(inst.task_lookup, taskid, timeout=10)
> +        self.assertEquals('finished', inst.task_lookup(taskid)['status'])
> +
>       # This wrapper function is needed due to the new backend messaging in
>       # vm model. vm_poweroff and vm_delete raise exception if vm is not found.
>       # These functions are called after vm has been deleted if test finishes
> @@ -1247,7 +1262,7 @@ class ModelTests(unittest.TestCase):
>                   task = inst.debugreports_create({'name': reportName})
>                   rollback.prependDefer(inst.debugreport_delete, tmp_name)
>                   taskid = task['id']
> -                self._wait_task(inst, taskid, timeout)
> +                wait_task(inst.task_lookup, taskid, timeout)
>                   self.assertEquals('finished',
>                                     inst.task_lookup(taskid)['status'],
>                                     "It is not necessary an error.  "
> @@ -1265,11 +1280,6 @@ class ModelTests(unittest.TestCase):
>                   if 'debugreport tool not found' not in e.message:
>                       raise e
>
> -    def _wait_task(self, model, taskid, timeout=5):
> -            for i in range(0, timeout):
> -                if model.task_lookup(taskid)['status'] == 'running':
> -                    time.sleep(1)
> -
>       def test_get_distros(self):
>           inst = model.Model('test:///default',
>                              objstore_loc=self.tmp_store)
> diff --git a/tests/test_rest.py b/tests/test_rest.py
> index 8de0a9c..60dce2f 100644
> --- a/tests/test_rest.py
> +++ b/tests/test_rest.py
> @@ -38,7 +38,7 @@ import kimchi.server
>   from kimchi.config import paths
>   from kimchi.rollbackcontext import RollbackContext
>   from utils import get_free_port, patch_auth, request
> -from utils import run_server
> +from utils import run_server, wait_task
>
>
>   test_server = None
> @@ -1041,7 +1041,7 @@ class RestTests(unittest.TestCase):
>           task = json.loads(resp.read())
>           vol_name = task['target_uri'].split('/')[-1]
>           self.assertEquals('anyurl.wor.kz', vol_name)
> -        self._wait_task(task['id'])
> +        wait_task(self._task_lookup, task['id'])
>           task = json.loads(self.request('/tasks/%s' % task['id']).read())
>           self.assertEquals('finished', task['status'])
>           resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
> @@ -1069,7 +1069,7 @@ class RestTests(unittest.TestCase):
>                               req, 'POST')
>           self.assertEquals(202, resp.status)
>           task_id = json.loads(resp.read())['id']
> -        self._wait_task(task_id)
> +        wait_task(self._task_lookup, task_id)
>           status = json.loads(self.request('/tasks/%s' % task_id).read())
>           self.assertEquals('finished', status['status'])
>
> @@ -1531,11 +1531,8 @@ class RestTests(unittest.TestCase):
>                          'DELETE')
>           self.assertEquals(204, resp.status)
>
> -    def _wait_task(self, taskid, timeout=5):
> -        for i in range(0, timeout):
> -            task = json.loads(self.request('/tasks/%s' % taskid).read())
> -            if task['status'] == 'running':
> -                time.sleep(1)
> +    def _task_lookup(self, taskid):
> +        return json.loads(self.request('/tasks/%s' % taskid).read())
>
>       def test_tasks(self):
>           id1 = model.add_task('/tasks/1', self._async_op)
> @@ -1550,12 +1547,12 @@ class RestTests(unittest.TestCase):
>           tasks = json.loads(self.request('/tasks').read())
>           tasks_ids = [int(t['id']) for t in tasks]
>           self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
> -        self._wait_task(id2)
> +        wait_task(self._task_lookup, id2)
>           foo2 = json.loads(self.request('/tasks/%s' % id2).read())
>           keys = ['id', 'status', 'message', 'target_uri']
>           self.assertEquals(sorted(keys), sorted(foo2.keys()))
>           self.assertEquals('failed', foo2['status'])
> -        self._wait_task(id3)
> +        wait_task(self._task_lookup, id3)
>           foo3 = json.loads(self.request('/tasks/%s' % id3).read())
>           self.assertEquals('in progress', foo3['message'])
>           self.assertEquals('running', foo3['status'])
> @@ -1717,7 +1714,7 @@ class RestTests(unittest.TestCase):
>               task = json.loads(resp.read())
>               # make sure the debugreport doesn't exist until the
>               # the task is finished
> -            self._wait_task(task['id'])
> +            wait_task(self._task_lookup, task['id'])
>               rollback.prependDefer(self._report_delete, 'report2')
>               resp = request(host, ssl_port, '/debugreports/report1')
>               debugreport = json.loads(resp.read())
> @@ -1736,7 +1733,7 @@ class RestTests(unittest.TestCase):
>               task = json.loads(resp.read())
>               # make sure the debugreport doesn't exist until the
>               # the task is finished
> -            self._wait_task(task['id'], 20)
> +            wait_task(self._task_lookup, task['id'], 20)
>               rollback.prependDefer(self._report_delete, 'report1')
>               resp = request(host, ssl_port, '/debugreports/report1')
>               debugreport = json.loads(resp.read())
> @@ -1928,7 +1925,7 @@ class RestTests(unittest.TestCase):
>
>               self.assertEquals(r.status_code, 202)
>               task = r.json()
> -            self._wait_task(task['id'])
> +            wait_task(self._task_lookup, task['id'])
>               resp = self.request('/storagepools/default/storagevolumes/%s' %
>                                   task['target_uri'].split('/')[-1])
>               self.assertEquals(200, resp.status)
> @@ -1948,7 +1945,7 @@ class RestTests(unittest.TestCase):
>
>               self.assertEquals(r.status_code, 202)
>               task = r.json()
> -            self._wait_task(task['id'], 15)
> +            wait_task(self._task_lookup, task['id'], 15)
>               resp = self.request('/storagepools/default/storagevolumes/%s' %
>                                   task['target_uri'].split('/')[-1])
>
> diff --git a/tests/utils.py b/tests/utils.py
> index 140bb1d..9133904 100644
> --- a/tests/utils.py
> +++ b/tests/utils.py
> @@ -25,6 +25,7 @@ import json
>   import os
>   import socket
>   import sys
> +import time
>   import threading
>   import unittest
>
> @@ -36,6 +37,7 @@ import kimchi.mockmodel
>   import kimchi.server
>   from kimchi.config import paths
>   from kimchi.exception import OperationFailed
> +from kimchi.utils import kimchi_log
>
>   _ports = {}
>
> @@ -195,3 +197,16 @@ def patch_auth(sudo=True):
>   def normalize_xml(xml_str):
>       return etree.tostring(etree.fromstring(xml_str,
>                             etree.XMLParser(remove_blank_text=True)))
> +
> +
> +def wait_task(task_lookup, taskid, timeout=10):
> +    for i in range(0, timeout):
> +        task_info = task_lookup(taskid)
> +        if task_info['status'] == "running":
> +            kimchi_log.info("Waiting task %s, message: %s",
> +                            taskid, task_info['message'])
> +            time.sleep(1)
> +        else:
> +            return
> +    kimchi_log.error("Timeout while process long-run task, "
> +                     "try to increase timeout value.")




More information about the Kimchi-devel mailing list