[PATCH V2] [Wok 0/2] Issue #122 - Make AsyncTask stoppable.

From: Paulo Vital <pvital@linux.vnet.ibm.com> v2: - Modified exception code used - Updated i18n messages V1: This patch-set gives to user a way to 'stop' a Task that is still running by setting the Task status to "killed". Since an AsyncTask is basic a thread running in the system and this thread can execute a pure Python method or a background command (by using run_command() from wok.utils), the developer must pass to AsyncTask constructor a method to be executed by the DELETE operation, called here as 'kill_cb'. If none kill_cb is passed, the task will not be able to stopped and an error message will be raised to user if DELETE operation is executed. Otherwise, the kill_cb method will be executed by kill() method (responsible to execute the DELETE operation) of AsyncTask class and its status set to 'killed'. The unit tests present how you can use it. Paulo Vital (2): Issue #122 - Make AsyncTask stoppable. Issue #122 - Add unit test to stop AsyncTask. docs/API/tasks.md | 2 ++ src/wok/asynctask.py | 18 +++++++++++++++++- src/wok/i18n.py | 2 ++ src/wok/model/tasks.py | 13 +++++++++++++ tests/test_api.py | 26 ++++++++++++++++++++++++-- tests/test_tasks.py | 19 ++++++++++++++++++- 6 files changed, 76 insertions(+), 4 deletions(-) -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch gives to user a way to 'stop' a Task that is still running by setting the Task status to "killed". Since an AsyncTask is basic a thread running in the system and this thread can execute a pure Python method or a background command (by using run_command() from wok.utils), the developer must pass to AsyncTask constructor a method to be executed by the DELETE operation, called here as 'kill_cb'. If none kill_cb is passed, the task will not be able to stopped and an error message will be raised to user if DELETE operation is executed. Otherwise, the kill_cb method will be executed by kill() method (responsible to execute the DELETE operation) of AsyncTask class and its status set to 'killed'. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- docs/API/tasks.md | 2 ++ src/wok/asynctask.py | 18 +++++++++++++++++- src/wok/i18n.py | 2 ++ src/wok/model/tasks.py | 13 +++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/docs/API/tasks.md b/docs/API/tasks.md index e0c404f..2068f29 100644 --- a/docs/API/tasks.md +++ b/docs/API/tasks.md @@ -27,8 +27,10 @@ server. * running: The task is running * finished: The task has finished successfully * failed: The task failed + * killed: The task was killed by user * message: Human-readable details about the Task status * target_uri: Resource URI related to the Task +* **DELETE**: Kill the Task, moving its status to 'killed' * **POST**: *See Task Actions* **Actions (POST):** diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index b98ad9a..ecd9029 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -26,6 +26,9 @@ import traceback import uuid +from wok.exception import InvalidOperation + + tasks_queue = {} @@ -41,10 +44,11 @@ def clean_old_tasks(): class AsyncTask(object): - def __init__(self, target_uri, fn, opaque=None): + def __init__(self, target_uri, fn, opaque=None, kill_cb=None): self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn + self.kill_cb = kill_cb self.status = 'running' self.message = 'The request is being processing.' self.timestamp = time.time() @@ -79,3 +83,15 @@ class AsyncTask(object): except KeyError: msg = "There's no task_id %s in tasks_queue. Nothing changed." cherrypy.log.error_log.error(msg % self.id) + + def kill(self): + if self.kill_cb is None: + raise InvalidOperation('WOKASYNC0002E') + + try: + self.kill_cb() + self.status = 'killed' + self.message = 'Task killed by user.' + except Exception, e: + self.message = e.message + raise InvalidOperation('WOKASYNC0004E', {'err': e.message}) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index ade2ae9..7ba7c24 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -34,7 +34,9 @@ messages = { "WOKAPI0009E": _("You don't have permission to perform this operation."), "WOKASYNC0001E": _("Unable to find task id: %(id)s"), + "WOKASYNC0002E": _("There is no callback to execute the kill task process."), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), + "WOKASYNC0004E": _("Unable to kill task due error: %(err)s"), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), "WOKAUTH0002E": _("You are not authorized to access Kimchi"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 6a2b4bf..8ae1ce4 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -68,3 +68,16 @@ class TaskModel(object): raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, 'task': task.target_uri}) + + def delete(self, id): + """ + 'Stops' an AsyncTask, by executing the kill callback provided by user + when created the task. Task's status will be changed to 'killed'. + """ + try: + task = tasks_queue[id] + except KeyError: + raise NotFoundError("WOKASYNC0001E", {'id': id}) + + if task.status is 'running': + task.kill() -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_api.py | 26 ++++++++++++++++++++++++-- tests/test_tasks.py | 19 ++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index f2e252a..0ddf627 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -18,11 +18,13 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import json +import time import unittest -from functools import partial - import utils +from functools import partial + +from wok.asynctask import AsyncTask test_server = None model = None @@ -85,3 +87,23 @@ class APITests(unittest.TestCase): for record in records: # Test search by app self.assertEquals(record['app'], 'wok') + + def test_kill_async_task(self): + def continuous_ops(cb, params): + for i in range(30): + cb("...step %s OK" % i) + time.sleep(2) + cb("FINAL step OK", params.get('result', True)) + + def kill_function(): + print "... killing task...... BUUUUUUM" + + taskid = AsyncTask('', continuous_ops, {'result': True}, + kill_function).id + tasks = json.loads(self.request('/tasks').read()) + self.assertLessEqual(1, len(tasks)) + time.sleep(10) + resp = self.request('/tasks/%s' % taskid, '{}', 'DELETE') + self.assertEquals(204, resp.status) + task = json.loads(self.request('/tasks/%s' % taskid).read()) + self.assertEquals('killed', task['status']) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 67f228b..7c5deaa 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -85,7 +85,7 @@ class AsyncTaskTests(unittest.TestCase): wait_task(self._task_lookup, taskid, timeout=10) self.assertEquals('finished', self._task_lookup(taskid)['status']) - def test_tasks_model(self): + def test_async_tasks_model(self): class task_except(Exception): pass @@ -122,3 +122,20 @@ class AsyncTaskTests(unittest.TestCase): self.assertEquals('running', inst.task_lookup(taskid)['status']) inst.task_wait(taskid, timeout=10) self.assertEquals('finished', inst.task_lookup(taskid)['status']) + + def test_kill_async_task(self): + def continuous_ops(cb, params): + for i in range(30): + cb("...step %s OK" % i) + time.sleep(2) + cb("FINAL step OK", params.get('result', True)) + + def kill_function(): + print "... killing task...... BUUUUUUM" + + taskid = AsyncTask('', continuous_ops, {'result': True}, + kill_function).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + time.sleep(10) + tasks_queue[taskid].kill() + self.assertEquals('killed', self._task_lookup(taskid)['status']) -- 2.7.4
participants (1)
-
pvital@linux.vnet.ibm.com