[PATCH V2] [Wok 0/3] Issue #158: Move AsyncTask information to memory

From: Paulo Vital <pvital@linux.vnet.ibm.com> V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages. V1: this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122. Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases src/wok/asynctask.py | 58 +++++++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 177 insertions(+), 55 deletions(-) create mode 100644 tests/test_tasks.py -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects. It also removes the tasks cleanup process from objectstore. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 58 +++++++++++++++++++++++++++++++++----------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ----------------- 4 files changed, 39 insertions(+), 46 deletions(-) diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..3298787 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,55 @@ import cherrypy import threading +import time import traceback -from wok.exception import OperationFailed +task_id = 0 +tasks_queue = {} -class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def get_next_task_id(): + global task_id + task_id += 1 + return task_id + + +def add_task(target_uri, fn, opaque=None): + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) + return id + + +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 12 hours ago and status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + remove_task(id) + + +class AsyncTask(object): + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +82,6 @@ class AsyncTask(object): if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message}) def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), - "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return - # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer -from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id def is_digit(value): -- 2.7.4

On 08/26/2016 11:18 AM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 58 +++++++++++++++++++++++++++++++++----------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ----------------- 4 files changed, 39 insertions(+), 46 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..3298787 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,55 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed +task_id = 0 +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E")
+def get_next_task_id(): + global task_id + task_id += 1 + return task_id + +
I don't think we need it anymore. IMO it would be better to auto generate the task ID by using uuid or other tool. If you agree on that, you can send a separated patch to do that.
+def add_task(target_uri, fn, opaque=None): + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) + return id + + +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) +
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 12 hours ago and status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + remove_task(id) + + +class AsyncTask(object): + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +82,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):

On Aug 26 02:09PM, Aline Manera wrote:
On 08/26/2016 11:18 AM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 58 +++++++++++++++++++++++++++++++++----------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ----------------- 4 files changed, 39 insertions(+), 46 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..3298787 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,55 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed +task_id = 0 +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E")
+def get_next_task_id(): + global task_id + task_id += 1 + return task_id + +
I don't think we need it anymore. IMO it would be better to auto generate the task ID by using uuid or other tool. If you agree on that, you can send a separated patch to do that.
That's ok for me. I will check what we can use and send a new patch.
+def add_task(target_uri, fn, opaque=None): + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) + return id + + +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) +
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 12 hours ago and status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + remove_task(id) + + +class AsyncTask(object): + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +82,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):
-- Paulo Ricardo Paz Vital Linux Technology Center, IBM Systems http://www.ibm.com/linux/ltc/

Also thinking more about the structure, see the comments below: On 08/26/2016 11:18 AM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 58 +++++++++++++++++++++++++++++++++----------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ----------------- 4 files changed, 39 insertions(+), 46 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..3298787 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,55 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed +task_id = 0 +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def get_next_task_id(): + global task_id + task_id += 1 + return task_id + +
+def add_task(target_uri, fn, opaque=None): + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) + return id + + +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) +
Why do include that to the AsyncTask class as create() and destroy() ? It will be more OOO than having separated functions to do that.
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 12 hours ago and status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + remove_task(id) + + +class AsyncTask(object): + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +82,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):

On 08/26/2016 02:13 PM, Aline Manera wrote:
Also thinking more about the structure, see the comments below:
On 08/26/2016 11:18 AM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 58 +++++++++++++++++++++++++++++++++----------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ----------------- 4 files changed, 39 insertions(+), 46 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..3298787 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,55 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed +task_id = 0 +tasks_queue = {}
-class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def get_next_task_id(): + global task_id + task_id += 1 + return task_id + +
+def add_task(target_uri, fn, opaque=None): + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) + return id + + +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) +
Why do include that to the AsyncTask class as create() and destroy() ? It will be more OOO than having separated functions to do that.
Nether a create() function is needed. The instance create is done on __init__() So there the uuid is generated in the instance is returned. It will require more changes in the plugins. return AsynTask(...).id instead of return add_task(...)
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 12 hours ago and status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + remove_task(id) + + +class AsyncTask(object): + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +82,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer
-from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id
def is_digit(value):
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel

From: Paulo Vital <pvital@linux.vnet.ibm.com> TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), + "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..d1111a6 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@ import time -from wok.exception import TimeoutExpired +from wok.exception import NotFoundError, TimeoutExpired +from wok.asynctask import tasks_queue class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore'] def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys() class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore'] def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id] - if task['status'] != 'running': + if task.status != 'running': return time.sleep(1) raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri}) -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> Create new testcases to test AsyncTasks in memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/test_tasks.py diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..85ddc02 --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,124 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import unittest +import time + +from utils import wait_task + +from wok.model import model +from wok.asynctask import add_task, tasks_queue + + +class AsyncTaskTests(unittest.TestCase): + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = add_task('', self._quick_op, 'Hello') + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = add_task('', self._long_op, params) + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, {}) + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, {'result': True}) + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model() + taskid = add_task('', self._quick_op, 'Hello') + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + params = {'delay': 3, 'result': False, + 'message': 'It was not meant to be'} + taskid = add_task('', self._long_op, params) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('The request is being processing.', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, {}) + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, {'result': True}) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) -- 2.7.4

Reviewed-by: Aline Manera <alinefm@linux.vnet.ibm.com> On 08/26/2016 11:18 AM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
V2: - moved all tasks_queue infrastructure to wok.asynctask - modified some user messages.
V1:
this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122.
Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases
src/wok/asynctask.py | 58 +++++++++++++++-------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 +++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 19 -------- tests/test_tasks.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 177 insertions(+), 55 deletions(-) create mode 100644 tests/test_tasks.py
participants (3)
-
Aline Manera
-
Paulo Ricardo Paz Vital
-
pvital@linux.vnet.ibm.com