[PATCH V5] [Wok 0/3] Issue #158: Move AsyncTask information to memory

From: Paulo Vital <pvital@linux.vnet.ibm.com> V5: - Moved remove_tasks() to AsyncTask class. V4: - Fix usage of UUID to prevent "UUID Is not JSON Serializable" error V3: - Changes in how create AsyncTask V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages. V1: this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122. Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases src/wok/asynctask.py | 48 +++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 tests/test_tasks.py -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects. It also removes the tasks cleanup process from objectstore. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++-------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ------------------- 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..1a114a8 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,29 +21,41 @@ import cherrypy import threading +import time import traceback +import uuid -from wok.exception import OperationFailed +tasks_queue = {} -class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def clean_old_tasks(): + """ + Remove from tasks_queue any task that started before 12 hours ago and has + current status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + task.remove(id) + - self.id = str(id) +class AsyncTask(object): + def __init__(self, target_uri, fn, opaque=None): + self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) self.thread.setDaemon(True) self.thread.start() + # let's prevent memory leak in tasks_queue + clean_old_tasks() + tasks_queue[self.id] = self def _status_cb(self, message, success=None): if success is not None: @@ -51,17 +63,6 @@ class AsyncTask(object): if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message}) def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request @@ -71,3 +72,10 @@ class AsyncTask(object): cherrypy.log.error_log.error("Error in async_task %s " % self.id) cherrypy.log.error_log.error(traceback.format_exc()) cb(e.message, False) + + def remove(self, id): + try: + del tasks_queue[id] + except KeyError: + msg = "There's no task_id %s in tasks_queue. Nothing changed." + cherrypy.log.error_log.error(msg % id) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), - "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return - # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer -from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id def is_digit(value): -- 2.7.4

On 29-08-2016 16:18, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++-------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ------------------- 4 files changed, 28 insertions(+), 47 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..1a114a8 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,29 +21,41 @@
import cherrypy import threading +import time import traceback +import uuid
-from wok.exception import OperationFailed +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def clean_old_tasks(): + """ + Remove from tasks_queue any task that started before 12 hours ago and has + current status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + task.remove(id) +
- self.id = str(id) +class AsyncTask(object): + def __init__(self, target_uri, fn, opaque=None): + self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) self.thread.setDaemon(True) self.thread.start() + # let's prevent memory leak in tasks_queue + clean_old_tasks() + tasks_queue[self.id] = self
def _status_cb(self, message, success=None): if success is not None: @@ -51,17 +63,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request @@ -71,3 +72,10 @@ class AsyncTask(object): cherrypy.log.error_log.error("Error in async_task %s " % self.id) cherrypy.log.error_log.error(traceback.format_exc()) cb(e.message, False) + + def remove(self, id): A task should not kill another one. I believe we don't need a parameter id here, just use self.id.
+ try: + del tasks_queue[id] + except KeyError: + msg = "There's no task_id %s in tasks_queue. Nothing changed." + cherrypy.log.error_log.error(msg % id) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):
-- Lucio Correia Software Engineer IBM LTC Brazil

On Aug 29 04:55PM, Lucio Correia wrote:
On 29-08-2016 16:18, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++-------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ------------------- 4 files changed, 28 insertions(+), 47 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..1a114a8 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,29 +21,41 @@
import cherrypy import threading +import time import traceback +import uuid
-from wok.exception import OperationFailed +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def clean_old_tasks(): + """ + Remove from tasks_queue any task that started before 12 hours ago and has + current status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + task.remove(id) +
- self.id = str(id) +class AsyncTask(object): + def __init__(self, target_uri, fn, opaque=None): + self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) self.thread.setDaemon(True) self.thread.start() + # let's prevent memory leak in tasks_queue + clean_old_tasks() + tasks_queue[self.id] = self
def _status_cb(self, message, success=None): if success is not None: @@ -51,17 +63,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request @@ -71,3 +72,10 @@ class AsyncTask(object): cherrypy.log.error_log.error("Error in async_task %s " % self.id) cherrypy.log.error_log.error(traceback.format_exc()) cb(e.message, False) + + def remove(self, id): A task should not kill another one. I believe we don't need a parameter id here, just use self.id.
I'm not killing any task here, only removing the requested task from the dictionary that controls the AysncTasks objects and only if the task has status equal to 'finished' or 'failed'. But I got your point and it will be addressed in the V6
+ try: + del tasks_queue[id] + except KeyError: + msg = "There's no task_id %s in tasks_queue. Nothing changed." + cherrypy.log.error_log.error(msg % id) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):
-- Lucio Correia Software Engineer IBM LTC Brazil
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel
-- Paulo Ricardo Paz Vital Linux Technology Center, IBM Systems http://www.ibm.com/linux/ltc/

From: Paulo Vital <pvital@linux.vnet.ibm.com> TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), + "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..d1111a6 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@ import time -from wok.exception import TimeoutExpired +from wok.exception import NotFoundError, TimeoutExpired +from wok.asynctask import tasks_queue class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore'] def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys() class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore'] def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id] - if task['status'] != 'running': + if task.status != 'running': return time.sleep(1) raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri}) -- 2.7.4

On 08/29/2016 04:18 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
+ "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..d1111a6 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@
import time
-from wok.exception import TimeoutExpired +from wok.exception import NotFoundError, TimeoutExpired +from wok.asynctask import tasks_queue Wrong import order
class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore']
def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys()
class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore']
def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri}
def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id]
- if task['status'] != 'running': + if task.status != 'running': return
time.sleep(1)
raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri})

From: Paulo Vital <pvital@linux.vnet.ibm.com> Create new testcases to test AsyncTasks in memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/test_tasks.py diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..09bdf9d --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,124 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import unittest +import time + +from utils import wait_task + +from wok.model import model +from wok.asynctask import AsyncTask, tasks_queue + + +class AsyncTaskTests(unittest.TestCase): + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = AsyncTask('', self._quick_op, 'Hello').id + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model() + taskid = AsyncTask('', self._quick_op, 'Hello').id + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) -- 2.7.4

On 08/29/2016 04:18 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
Create new testcases to test AsyncTasks in memory.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/test_tasks.py
diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..09bdf9d --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,124 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import unittest +import time Wrong import order
+ +from utils import wait_task + +from wok.model import model +from wok.asynctask import AsyncTask, tasks_queue
Wrong import order
+ + +class AsyncTaskTests(unittest.TestCase): + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = AsyncTask('', self._quick_op, 'Hello').id + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model() + taskid = AsyncTask('', self._quick_op, 'Hello').id + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status'])
participants (4)
-
Daniel Henrique Barboza
-
Lucio Correia
-
Paulo Ricardo Paz Vital
-
pvital@linux.vnet.ibm.com