[Kimchi-devel] [PATCH v2] AsyncTask: Improve continuous status feedback

Zhou Zheng Sheng zhshzhou at linux.vnet.ibm.com
Thu Oct 30 04:44:34 UTC 2014


One of the advantage of AsyncTask is that the background task can call
"_status_cb()" multiple times to report the interim progress. However
this is not properly implemented in "_status_cb()". The correct handling
should be as follow.

cb(message, True)
  Means task finished succesfully.

cb(message, False)
  Means task failed.

cb(message)
cb(message, None)
  Means task is ongoing and the invocation is to provide progress feedback.

The current implementation fails to distinguish "cb(message, False)" and
"cb(message)". This patch fixes the problem and adds a new test case.

This patch also extracts "wait_task()" methods in from the tests, adding a
regular status report to allow developer know it's waiting for some task
but not stuck on something. There is also two methods in tests that start a
AsyncTask but do not wait it till finish, this will interfere with other
tests, so the patch adds the missing "wait_task()" invocation for them.

v2:
  Extract the common "wait_task()" methods to avoid duplicated code.

Signed-off-by: Zhou Zheng Sheng <zhshzhou at linux.vnet.ibm.com>
---
 src/kimchi/asynctask.py |  6 ++----
 tests/test_mockmodel.py |  2 ++
 tests/test_model.py     | 36 +++++++++++++++++++++++-------------
 tests/test_rest.py      | 25 +++++++++++--------------
 tests/utils.py          | 15 +++++++++++++++
 5 files changed, 53 insertions(+), 31 deletions(-)

diff --git a/src/kimchi/asynctask.py b/src/kimchi/asynctask.py
index 99e7a64..b5673b2 100644
--- a/src/kimchi/asynctask.py
+++ b/src/kimchi/asynctask.py
@@ -49,10 +49,8 @@ class AsyncTask(object):
             self._save_helper()
             return
 
-        if success:
-            self.status = 'finished'
-        else:
-            self.status = 'failed'
+        if success is not None:
+            self.status = 'finished' if success else 'failed'
         self.message = message
         self._save_helper()
 
diff --git a/tests/test_mockmodel.py b/tests/test_mockmodel.py
index 7319531..eeb6715 100644
--- a/tests/test_mockmodel.py
+++ b/tests/test_mockmodel.py
@@ -26,6 +26,7 @@ import unittest
 
 import kimchi.mockmodel
 from utils import get_free_port, patch_auth, request, run_server
+from utils import wait_task
 from kimchi.control.base import Collection, Resource
 
 
@@ -198,3 +199,4 @@ class MockModelTests(unittest.TestCase):
         task = model.host_swupdate()
         task_params = [u'id', u'message', u'status', u'target_uri']
         self.assertEquals(sorted(task_params), sorted(task.keys()))
+        wait_task(model.task_lookup, task['id'])
diff --git a/tests/test_model.py b/tests/test_model.py
index d9bbe9e..896540d 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -43,6 +43,7 @@ from kimchi.iscsi import TargetClient
 from kimchi.model import model
 from kimchi.rollbackcontext import RollbackContext
 from kimchi.utils import add_task
+from utils import wait_task
 
 
 invalid_repository_urls = ['www.fedora.org',       # missing protocol
@@ -127,7 +128,7 @@ class ModelTests(unittest.TestCase):
                       'format': 'qcow2'}
             task_id = inst.storagevolumes_create('default', params)['id']
             rollback.prependDefer(inst.storagevolume_delete, 'default', vol)
-            self._wait_task(inst, task_id)
+            wait_task(inst.task_lookup, task_id)
             self.assertEquals('finished', inst.task_lookup(task_id)['status'])
             vol_path = inst.storagevolume_lookup('default', vol)['path']
 
@@ -285,8 +286,9 @@ class ModelTests(unittest.TestCase):
                       'capacity': 1024,
                       'allocation': 512,
                       'format': 'qcow2'}
-            inst.storagevolumes_create(pool, params)
+            task_id = inst.storagevolumes_create(pool, params)['id']
             rollback.prependDefer(inst.storagevolume_delete, pool, vol)
+            wait_task(inst.task_lookup, task_id)
 
             vm_name = 'kimchi-cdrom'
             params = {'name': 'test', 'disks': [], 'cdrom': self.kimchi_iso}
@@ -558,7 +560,7 @@ class ModelTests(unittest.TestCase):
             params['name'] = vol
             task_id = inst.storagevolumes_create(pool, params)['id']
             rollback.prependDefer(inst.storagevolume_delete, pool, vol)
-            self._wait_task(inst, task_id)
+            wait_task(inst.task_lookup, task_id)
             self.assertEquals('finished', inst.task_lookup(task_id)['status'])
 
             fd, path = tempfile.mkstemp(dir=path)
@@ -599,7 +601,7 @@ class ModelTests(unittest.TestCase):
             taskid = task_response['id']
             vol_name = task_response['target_uri'].split('/')[-1]
             self.assertEquals('COPYING', vol_name)
-            self._wait_task(inst, taskid, timeout=60)
+            wait_task(inst.task_lookup, taskid, timeout=60)
             self.assertEquals('finished', inst.task_lookup(taskid)['status'])
             vol_path = os.path.join(args['path'], vol_name)
             self.assertTrue(os.path.isfile(vol_path))
@@ -1120,10 +1122,17 @@ class ModelTests(unittest.TestCase):
             except:
                 cb("Exception raised", False)
 
+        def continuous_ops(cb, params):
+            cb("step 1 OK")
+            time.sleep(2)
+            cb("step 2 OK")
+            time.sleep(2)
+            cb("step 3 OK", params.get('result', True))
+
         inst = model.Model('test:///default',
                            objstore_loc=self.tmp_store)
         taskid = add_task('', quick_op, inst.objstore, 'Hello')
-        self._wait_task(inst, taskid)
+        wait_task(inst.task_lookup, taskid)
         self.assertEquals(1, taskid)
         self.assertEquals('finished', inst.task_lookup(taskid)['status'])
         self.assertEquals('Hello', inst.task_lookup(taskid)['message'])
@@ -1134,16 +1143,22 @@ class ModelTests(unittest.TestCase):
         self.assertEquals(2, taskid)
         self.assertEquals('running', inst.task_lookup(taskid)['status'])
         self.assertEquals('OK', inst.task_lookup(taskid)['message'])
-        self._wait_task(inst, taskid)
+        wait_task(inst.task_lookup, taskid)
         self.assertEquals('failed', inst.task_lookup(taskid)['status'])
         self.assertEquals('It was not meant to be',
                           inst.task_lookup(taskid)['message'])
         taskid = add_task('', abnormal_op, inst.objstore, {})
-        self._wait_task(inst, taskid)
+        wait_task(inst.task_lookup, taskid)
         self.assertEquals('Exception raised',
                           inst.task_lookup(taskid)['message'])
         self.assertEquals('failed', inst.task_lookup(taskid)['status'])
 
+        taskid = add_task('', continuous_ops, inst.objstore,
+                          {'result': True})
+        self.assertEquals('running', inst.task_lookup(taskid)['status'])
+        wait_task(inst.task_lookup, taskid, timeout=10)
+        self.assertEquals('finished', inst.task_lookup(taskid)['status'])
+
     # This wrapper function is needed due to the new backend messaging in
     # vm model. vm_poweroff and vm_delete raise exception if vm is not found.
     # These functions are called after vm has been deleted if test finishes
@@ -1247,7 +1262,7 @@ class ModelTests(unittest.TestCase):
                 task = inst.debugreports_create({'name': reportName})
                 rollback.prependDefer(inst.debugreport_delete, tmp_name)
                 taskid = task['id']
-                self._wait_task(inst, taskid, timeout)
+                wait_task(inst.task_lookup, taskid, timeout)
                 self.assertEquals('finished',
                                   inst.task_lookup(taskid)['status'],
                                   "It is not necessary an error.  "
@@ -1265,11 +1280,6 @@ class ModelTests(unittest.TestCase):
                 if 'debugreport tool not found' not in e.message:
                     raise e
 
-    def _wait_task(self, model, taskid, timeout=5):
-            for i in range(0, timeout):
-                if model.task_lookup(taskid)['status'] == 'running':
-                    time.sleep(1)
-
     def test_get_distros(self):
         inst = model.Model('test:///default',
                            objstore_loc=self.tmp_store)
diff --git a/tests/test_rest.py b/tests/test_rest.py
index 8de0a9c..60dce2f 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -38,7 +38,7 @@ import kimchi.server
 from kimchi.config import paths
 from kimchi.rollbackcontext import RollbackContext
 from utils import get_free_port, patch_auth, request
-from utils import run_server
+from utils import run_server, wait_task
 
 
 test_server = None
@@ -1041,7 +1041,7 @@ class RestTests(unittest.TestCase):
         task = json.loads(resp.read())
         vol_name = task['target_uri'].split('/')[-1]
         self.assertEquals('anyurl.wor.kz', vol_name)
-        self._wait_task(task['id'])
+        wait_task(self._task_lookup, task['id'])
         task = json.loads(self.request('/tasks/%s' % task['id']).read())
         self.assertEquals('finished', task['status'])
         resp = self.request('/storagepools/pool-1/storagevolumes/%s' %
@@ -1069,7 +1069,7 @@ class RestTests(unittest.TestCase):
                             req, 'POST')
         self.assertEquals(202, resp.status)
         task_id = json.loads(resp.read())['id']
-        self._wait_task(task_id)
+        wait_task(self._task_lookup, task_id)
         status = json.loads(self.request('/tasks/%s' % task_id).read())
         self.assertEquals('finished', status['status'])
 
@@ -1531,11 +1531,8 @@ class RestTests(unittest.TestCase):
                        'DELETE')
         self.assertEquals(204, resp.status)
 
-    def _wait_task(self, taskid, timeout=5):
-        for i in range(0, timeout):
-            task = json.loads(self.request('/tasks/%s' % taskid).read())
-            if task['status'] == 'running':
-                time.sleep(1)
+    def _task_lookup(self, taskid):
+        return json.loads(self.request('/tasks/%s' % taskid).read())
 
     def test_tasks(self):
         id1 = model.add_task('/tasks/1', self._async_op)
@@ -1550,12 +1547,12 @@ class RestTests(unittest.TestCase):
         tasks = json.loads(self.request('/tasks').read())
         tasks_ids = [int(t['id']) for t in tasks]
         self.assertEquals(set([id1, id2, id3]) - set(tasks_ids), set([]))
-        self._wait_task(id2)
+        wait_task(self._task_lookup, id2)
         foo2 = json.loads(self.request('/tasks/%s' % id2).read())
         keys = ['id', 'status', 'message', 'target_uri']
         self.assertEquals(sorted(keys), sorted(foo2.keys()))
         self.assertEquals('failed', foo2['status'])
-        self._wait_task(id3)
+        wait_task(self._task_lookup, id3)
         foo3 = json.loads(self.request('/tasks/%s' % id3).read())
         self.assertEquals('in progress', foo3['message'])
         self.assertEquals('running', foo3['status'])
@@ -1717,7 +1714,7 @@ class RestTests(unittest.TestCase):
             task = json.loads(resp.read())
             # make sure the debugreport doesn't exist until the
             # the task is finished
-            self._wait_task(task['id'])
+            wait_task(self._task_lookup, task['id'])
             rollback.prependDefer(self._report_delete, 'report2')
             resp = request(host, ssl_port, '/debugreports/report1')
             debugreport = json.loads(resp.read())
@@ -1736,7 +1733,7 @@ class RestTests(unittest.TestCase):
             task = json.loads(resp.read())
             # make sure the debugreport doesn't exist until the
             # the task is finished
-            self._wait_task(task['id'], 20)
+            wait_task(self._task_lookup, task['id'], 20)
             rollback.prependDefer(self._report_delete, 'report1')
             resp = request(host, ssl_port, '/debugreports/report1')
             debugreport = json.loads(resp.read())
@@ -1928,7 +1925,7 @@ class RestTests(unittest.TestCase):
 
             self.assertEquals(r.status_code, 202)
             task = r.json()
-            self._wait_task(task['id'])
+            wait_task(self._task_lookup, task['id'])
             resp = self.request('/storagepools/default/storagevolumes/%s' %
                                 task['target_uri'].split('/')[-1])
             self.assertEquals(200, resp.status)
@@ -1948,7 +1945,7 @@ class RestTests(unittest.TestCase):
 
             self.assertEquals(r.status_code, 202)
             task = r.json()
-            self._wait_task(task['id'], 15)
+            wait_task(self._task_lookup, task['id'], 15)
             resp = self.request('/storagepools/default/storagevolumes/%s' %
                                 task['target_uri'].split('/')[-1])
 
diff --git a/tests/utils.py b/tests/utils.py
index 140bb1d..9133904 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -25,6 +25,7 @@ import json
 import os
 import socket
 import sys
+import time
 import threading
 import unittest
 
@@ -36,6 +37,7 @@ import kimchi.mockmodel
 import kimchi.server
 from kimchi.config import paths
 from kimchi.exception import OperationFailed
+from kimchi.utils import kimchi_log
 
 _ports = {}
 
@@ -195,3 +197,16 @@ def patch_auth(sudo=True):
 def normalize_xml(xml_str):
     return etree.tostring(etree.fromstring(xml_str,
                           etree.XMLParser(remove_blank_text=True)))
+
+
+def wait_task(task_lookup, taskid, timeout=10):
+    for i in range(0, timeout):
+        task_info = task_lookup(taskid)
+        if task_info['status'] == "running":
+            kimchi_log.info("Waiting task %s, message: %s",
+                            taskid, task_info['message'])
+            time.sleep(1)
+        else:
+            return
+    kimchi_log.error("Timeout while process long-run task, "
+                     "try to increase timeout value.")
-- 
1.9.3




More information about the Kimchi-devel mailing list