[Kimchi-devel] [PATCH] [Wok 2/2] Issue #122 - Add unit test to stop AsyncTask.
pvital at linux.vnet.ibm.com
pvital at linux.vnet.ibm.com
Wed Aug 31 19:18:12 UTC 2016
From: Paulo Vital <pvital at linux.vnet.ibm.com>
---
tests/test_api.py | 26 ++++++++++++++++++++++++--
tests/test_tasks.py | 19 ++++++++++++++++++-
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/tests/test_api.py b/tests/test_api.py
index f2e252a..0ddf627 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -18,11 +18,13 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import json
+import time
import unittest
-from functools import partial
-
import utils
+from functools import partial
+
+from wok.asynctask import AsyncTask
test_server = None
model = None
@@ -85,3 +87,23 @@ class APITests(unittest.TestCase):
for record in records:
# Test search by app
self.assertEquals(record['app'], 'wok')
+
+ def test_kill_async_task(self):
+ def continuous_ops(cb, params):
+ for i in range(30):
+ cb("...step %s OK" % i)
+ time.sleep(2)
+ cb("FINAL step OK", params.get('result', True))
+
+ def kill_function():
+ print "... killing task...... BUUUUUUM"
+
+ taskid = AsyncTask('', continuous_ops, {'result': True},
+ kill_function).id
+ tasks = json.loads(self.request('/tasks').read())
+ self.assertLessEqual(1, len(tasks))
+ time.sleep(10)
+ resp = self.request('/tasks/%s' % taskid, '{}', 'DELETE')
+ self.assertEquals(204, resp.status)
+ task = json.loads(self.request('/tasks/%s' % taskid).read())
+ self.assertEquals('killed', task['status'])
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index 67f228b..7c5deaa 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -85,7 +85,7 @@ class AsyncTaskTests(unittest.TestCase):
wait_task(self._task_lookup, taskid, timeout=10)
self.assertEquals('finished', self._task_lookup(taskid)['status'])
- def test_tasks_model(self):
+ def test_async_tasks_model(self):
class task_except(Exception):
pass
@@ -122,3 +122,20 @@ class AsyncTaskTests(unittest.TestCase):
self.assertEquals('running', inst.task_lookup(taskid)['status'])
inst.task_wait(taskid, timeout=10)
self.assertEquals('finished', inst.task_lookup(taskid)['status'])
+
+ def test_kill_async_task(self):
+ def continuous_ops(cb, params):
+ for i in range(30):
+ cb("...step %s OK" % i)
+ time.sleep(2)
+ cb("FINAL step OK", params.get('result', True))
+
+ def kill_function():
+ print "... killing task...... BUUUUUUM"
+
+ taskid = AsyncTask('', continuous_ops, {'result': True},
+ kill_function).id
+ self.assertEquals('running', self._task_lookup(taskid)['status'])
+ time.sleep(10)
+ tasks_queue[taskid].kill()
+ self.assertEquals('killed', self._task_lookup(taskid)['status'])
--
2.7.4
More information about the Kimchi-devel
mailing list