[PATCH V6] [Wok 0/3] Issue #158: Move AsyncTask information to memory

From: Paulo Vital <pvital@linux.vnet.ibm.com> V6: - Fixed import order - Improved AsyncTask.remove() - Baked banana and cinnamon cupcakes V5: - Moved remove_tasks() to AsyncTask class. V4: - Fix usage of UUID to prevent "UUID Is not JSON Serializable" error V3: - Changes in how create AsyncTask V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages. V1: this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122. Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases src/wok/asynctask.py | 48 +++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 tests/test_tasks.py -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects. It also removes the tasks cleanup process from objectstore. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++-------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ------------------- 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..b98ad9a 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,29 +21,41 @@ import cherrypy import threading +import time import traceback +import uuid -from wok.exception import OperationFailed +tasks_queue = {} -class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def clean_old_tasks(): + """ + Remove from tasks_queue any task that started before 12 hours ago and has + current status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + task.remove() + - self.id = str(id) +class AsyncTask(object): + def __init__(self, target_uri, fn, opaque=None): + self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) self.thread.setDaemon(True) self.thread.start() + # let's prevent memory leak in tasks_queue + clean_old_tasks() + tasks_queue[self.id] = self def _status_cb(self, message, success=None): if success is not None: @@ -51,17 +63,6 @@ class AsyncTask(object): if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message}) def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request @@ -71,3 +72,10 @@ class AsyncTask(object): cherrypy.log.error_log.error("Error in async_task %s " % self.id) cherrypy.log.error_log.error(traceback.format_exc()) cb(e.message, False) + + def remove(self): + try: + del tasks_queue[self.id] + except KeyError: + msg = "There's no task_id %s in tasks_queue. Nothing changed." + cherrypy.log.error_log.error(msg % self.id) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), - "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return - # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer -from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id def is_digit(value): -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), + "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..6a2b4bf 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@ import time -from wok.exception import TimeoutExpired +from wok.asynctask import tasks_queue +from wok.exception import NotFoundError, TimeoutExpired class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore'] def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys() class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore'] def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id] - if task['status'] != 'running': + if task.status != 'running': return time.sleep(1) raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri}) -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> Create new testcases to test AsyncTasks in memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/test_tasks.py diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..67f228b --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,124 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import time +import unittest + +from utils import wait_task + +from wok.asynctask import AsyncTask, tasks_queue +from wok.model import model + + +class AsyncTaskTests(unittest.TestCase): + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = AsyncTask('', self._quick_op, 'Hello').id + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model() + taskid = AsyncTask('', self._quick_op, 'Hello').id + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = AsyncTask('', self._long_op, params).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = AsyncTask('', abnormal_op, {}).id + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = AsyncTask('', self._continuous_ops, {'result': True}).id + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) -- 2.7.4

Reviewed-by: Daniel Barboza <danielhb@linux.vnet.ibm.com> On 08/29/2016 05:20 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
V6: - Fixed import order - Improved AsyncTask.remove() - Baked banana and cinnamon cupcakes
V5: - Moved remove_tasks() to AsyncTask class.
V4: - Fix usage of UUID to prevent "UUID Is not JSON Serializable" error
V3: - Changes in how create AsyncTask
V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages.
V1:
this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122.
Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases
src/wok/asynctask.py | 48 +++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 tests/test_tasks.py

Reviewed-By: Lucio Correia <luciojhc@linux.vnet.ibm.com> On 29-08-2016 17:20, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
V6: - Fixed import order - Improved AsyncTask.remove() - Baked banana and cinnamon cupcakes
V5: - Moved remove_tasks() to AsyncTask class.
V4: - Fix usage of UUID to prevent "UUID Is not JSON Serializable" error
V3: - Changes in how create AsyncTask
V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages.
V1:
this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122.
Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases
src/wok/asynctask.py | 48 +++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 tests/test_tasks.py
-- Lucio Correia Software Engineer IBM LTC Brazil
participants (4)
-
Aline Manera
-
Daniel Henrique Barboza
-
Lucio Correia
-
pvital@linux.vnet.ibm.com