[Kimchi-devel] [PATCH] [Wok 3/3] Issue #158: Add AsyncTasks testcases

pvital at linux.vnet.ibm.com pvital at linux.vnet.ibm.com
Wed Aug 24 19:36:29 UTC 2016


From: Paulo Vital <pvital at linux.vnet.ibm.com>

Create new testcases to test AsyncTasks in memory.

Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
---
 tests/test_tasks.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 136 insertions(+)
 create mode 100644 tests/test_tasks.py

diff --git a/tests/test_tasks.py b/tests/test_tasks.py
new file mode 100644
index 0000000..aa2b6f0
--- /dev/null
+++ b/tests/test_tasks.py
@@ -0,0 +1,136 @@
+#
+# Project Wok
+#
+# Copyright IBM Corp, 2016
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+
+import os
+import unittest
+import time
+
+from utils import wait_task
+
+from wok.model import model
+from wok.objectstore import ObjectStore
+from wok.utils import add_task, tasks_queue
+
+
+class AsyncTaskTests(unittest.TestCase):
+    def setUp(self):
+        self.tmp_store = '/tmp/wok-tasks-objstore'
+        self.objstore = ObjectStore(self.tmp_store)
+
+    def tearDown(self):
+        if os.path.isfile(self.tmp_store):
+            os.unlink(self.tmp_store)
+
+    def _quick_op(self, cb, message):
+        cb(message, True)
+
+    def _long_op(self, cb, params):
+        time.sleep(params.get('delay', 3))
+        cb(params.get('message', ''), params.get('result', False))
+
+    def _continuous_ops(self, cb, params):
+        cb("step 1 OK")
+        time.sleep(2)
+        cb("step 2 OK")
+        time.sleep(2)
+        cb("step 3 OK", params.get('result', True))
+
+    def _task_lookup(self, id):
+        task = tasks_queue[id]
+        return {'id': id,
+                'status': task.status,
+                'message': task.message,
+                'target_uri': task.target_uri}
+
+    def test_async_tasks(self):
+        class task_except(Exception):
+            pass
+
+        def abnormal_op(cb, params):
+            try:
+                raise task_except
+            except:
+                cb("Exception raised", False)
+
+        taskid = add_task('', self._quick_op, self.objstore, 'Hello')
+        wait_task(self._task_lookup, taskid)
+        self.assertEquals('finished', self._task_lookup(taskid)['status'])
+        self.assertEquals('Hello', self._task_lookup(taskid)['message'])
+
+        taskid = add_task('', self._long_op, self.objstore,
+                          {'delay': 3, 'result': False,
+                           'message': 'It was not meant to be'})
+        self.assertEquals('running', self._task_lookup(taskid)['status'])
+        self.assertEquals('Starting AsyncTask',
+                          self._task_lookup(taskid)['message'])
+        wait_task(self._task_lookup, taskid)
+        self.assertEquals('failed', self._task_lookup(taskid)['status'])
+        self.assertEquals('It was not meant to be',
+                          self._task_lookup(taskid)['message'])
+
+        taskid = add_task('', abnormal_op, self.objstore, {})
+        wait_task(self._task_lookup, taskid)
+        self.assertEquals('Exception raised',
+                          self._task_lookup(taskid)['message'])
+        self.assertEquals('failed', self._task_lookup(taskid)['status'])
+
+        taskid = add_task('', self._continuous_ops, self.objstore,
+                          {'result': True})
+        self.assertEquals('running', self._task_lookup(taskid)['status'])
+        wait_task(self._task_lookup, taskid, timeout=10)
+        self.assertEquals('finished', self._task_lookup(taskid)['status'])
+
+    def test_tasks_model(self):
+        class task_except(Exception):
+            pass
+
+        def abnormal_op(cb, params):
+            try:
+                raise task_except
+            except:
+                cb("Exception raised", False)
+
+        inst = model.Model(objstore_loc=self.tmp_store)
+        taskid = add_task('', self._quick_op, inst.objstore, 'Hello')
+        inst.task_wait(taskid)
+        self.assertEquals('finished', inst.task_lookup(taskid)['status'])
+        self.assertEquals('Hello', inst.task_lookup(taskid)['message'])
+
+        taskid = add_task('', self._long_op, inst.objstore,
+                          {'delay': 3, 'result': False,
+                           'message': 'It was not meant to be'})
+        self.assertEquals('running', inst.task_lookup(taskid)['status'])
+        self.assertEquals('Starting AsyncTask',
+                          inst.task_lookup(taskid)['message'])
+        inst.task_wait(taskid)
+        self.assertEquals('failed', inst.task_lookup(taskid)['status'])
+        self.assertEquals('It was not meant to be',
+                          inst.task_lookup(taskid)['message'])
+
+        taskid = add_task('', abnormal_op, inst.objstore, {})
+        inst.task_wait(taskid)
+        self.assertEquals('Exception raised',
+                          inst.task_lookup(taskid)['message'])
+        self.assertEquals('failed', inst.task_lookup(taskid)['status'])
+
+        taskid = add_task('', self._continuous_ops, inst.objstore,
+                          {'result': True})
+        self.assertEquals('running', inst.task_lookup(taskid)['status'])
+        inst.task_wait(taskid, timeout=10)
+        self.assertEquals('finished', inst.task_lookup(taskid)['status'])
-- 
2.7.4




More information about the Kimchi-devel mailing list