[PATCH] [Wok 0/3] Issue #158: Move AsyncTask information to memory

From: Paulo Vital <pvital@linux.vnet.ibm.com> this patch-set moves AsyncTask information from objectstore to memory and it is dependency to solve Issue #122. Paulo Vital (3): Issue #158: Move AsyncTask information to memory Issue #158: Update model/tasks.py with AsyncTasks in memory. Issue #158: Add AsyncTasks testcases src/wok/asynctask.py | 25 ++------- src/wok/i18n.py | 3 +- src/wok/model/tasks.py | 22 ++++---- src/wok/objectstore.py | 6 --- src/wok/utils.py | 31 ++++++++--- tests/test_tasks.py | 136 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 178 insertions(+), 45 deletions(-) create mode 100644 tests/test_tasks.py -- 2.7.4

From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects. It also removes the tasks cleanup process from objectstore. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-) diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@ import cherrypy import threading +import time import traceback -from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object): if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message}) def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), - "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return - # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET @@ -47,6 +48,7 @@ from wok.stringutils import decode_value wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {} def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id -def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id +def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True -- 2.7.4

One suggestion is to move all task stuff to asynctask.py One more comment below. On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True
-- Lucio Correia Software Engineer IBM LTC Brazil

Hi Lucio. On Aug 25 11:23AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py
Yeah, I think about that, but then I went to Kimchi, Gingerbase and Ginger code to see how huge would be modify all call of add_task() method from wok.utils to wok.asynctask. IMO, this is a huge change, specially now near to code freeze for 2.3 release. It's totally feasible, but I'm afraid that code submitted to Ginger* is committed first than this one, and then things start to break and the number of issues increase in this moment.
One more comment below.
On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here
I know that, and the reason to keep it here in the method signature is the same I explained above. Specially in this case, if I remove objstore from here, I'll be forced to modify all add_task() calls from Kimchi and Ginger* plugins. PEP8 did not pointed as an error/failure, because (I suposse) it's in the signature of the method and not in the core of the method.
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True
-- Lucio Correia Software Engineer IBM LTC Brazil
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel
-- Paulo Ricardo Paz Vital Linux Technology Center, IBM Systems http://www.ibm.com/linux/ltc/

On 08/25/2016 11:45 AM, Paulo Ricardo Paz Vital wrote:
Hi Lucio.
On Aug 25 11:23AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py Yeah, I think about that, but then I went to Kimchi, Gingerbase and Ginger code to see how huge would be modify all call of add_task() method from wok.utils to wok.asynctask.
IMO, this is a huge change, specially now near to code freeze for 2.3 release. It's totally feasible, but I'm afraid that code submitted to Ginger* is committed first than this one, and then things start to break and the number of issues increase in this moment.
We will have 1 month for bug fixes so I think we will have enough time to fix any issue related to that. About the patch dependency, you just need to add to the patch subject the dependency. So the Ginger community will be aware about it to do not merge the patch prior to the Wok patch.
One more comment below.
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here I know that, and the reason to keep it here in the method signature is
On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote: the same I explained above. Specially in this case, if I remove objstore from here, I'll be forced to modify all add_task() calls from Kimchi and Ginger* plugins.
PEP8 did not pointed as an error/failure, because (I suposse) it's in the signature of the method and not in the core of the method.
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True
-- Lucio Correia Software Engineer IBM LTC Brazil
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel

On Aug 26 08:18AM, Aline Manera wrote:
On 08/25/2016 11:45 AM, Paulo Ricardo Paz Vital wrote:
Hi Lucio.
On Aug 25 11:23AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py Yeah, I think about that, but then I went to Kimchi, Gingerbase and Ginger code to see how huge would be modify all call of add_task() method from wok.utils to wok.asynctask.
IMO, this is a huge change, specially now near to code freeze for 2.3 release. It's totally feasible, but I'm afraid that code submitted to Ginger* is committed first than this one, and then things start to break and the number of issues increase in this moment.
We will have 1 month for bug fixes so I think we will have enough time to fix any issue related to that.
About the patch dependency, you just need to add to the patch subject the dependency. So the Ginger community will be aware about it to do not merge the patch prior to the Wok patch.
Ok! I'll change that and submit a V2 for this patch, and new patches to all other plugins with the necessary modifications.
One more comment below.
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here I know that, and the reason to keep it here in the method signature is
On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote: the same I explained above. Specially in this case, if I remove objstore from here, I'll be forced to modify all add_task() calls from Kimchi and Ginger* plugins.
PEP8 did not pointed as an error/failure, because (I suposse) it's in the signature of the method and not in the core of the method.
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True
-- Lucio Correia Software Engineer IBM LTC Brazil
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel
_______________________________________________ Kimchi-devel mailing list Kimchi-devel@ovirt.org http://lists.ovirt.org/mailman/listinfo/kimchi-devel
-- Paulo Ricardo Paz Vital Linux Technology Center, IBM Systems http://www.ibm.com/linux/ltc/

On 08/25/2016 11:23 AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py
+1
One more comment below.
On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True

On 08/24/2016 04:36 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper()
+ self.message = 'Starting AsyncTask'
The 'message' is used on UI and the user is not aware about AsyncTask. So my suggestion is to have a more generic message ("Processing started"/"Processing request") or keep it in blank.
+ self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + +
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + +
I like that idea to remove old entries but I think 10 minutes is not a good number. Some tasks take long to finish, ie, in 10 minutes they may be running. Even if you check the Task status to be finished you may clean up an entry that would be used to update the UI. Said that, I'd say to increase that value to 12h/24h (yeah, a big amount of time) to be safer. What do you think about it?
def is_digit(value): if isinstance(value, int): return True

On Aug 26 08:23AM, Aline Manera wrote:
On 08/24/2016 04:36 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper()
+ self.message = 'Starting AsyncTask'
The 'message' is used on UI and the user is not aware about AsyncTask. So my suggestion is to have a more generic message ("Processing started"/"Processing request") or keep it in blank.
Actually I modified the message from 'OK' to this one. May be, a single "Starting..." would be better due to user doesn't know exactly what is being processed. Or keep the OK.
+ self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + +
+def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + +
I like that idea to remove old entries but I think 10 minutes is not a good number. Some tasks take long to finish, ie, in 10 minutes they may be running. Even if you check the Task status to be finished you may clean up an entry that would be used to update the UI.
Said that, I'd say to increase that value to 12h/24h (yeah, a big amount of time) to be safer.
What do you think about it?
Yeah, I completely forgot that we have loooooooooooooooong tasks :-P I think 6h (or 12h) is sufficient and I can add a check to see if the task is finished or not.
def is_digit(value): if isinstance(value, int): return True
-- Paulo Ricardo Paz Vital Linux Technology Center, IBM Systems http://www.ibm.com/linux/ltc/

On 08/26/2016 08:40 AM, Paulo Ricardo Paz Vital wrote:
On Aug 26 08:23AM, Aline Manera wrote:
On 08/24/2016 04:36 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' The 'message' is used on UI and the user is not aware about AsyncTask. So my suggestion is to have a more generic message ("Processing started"/"Processing request") or keep it in blank.
Actually I modified the message from 'OK' to this one. May be, a single "Starting..." would be better due to user doesn't know exactly what is being processed. Or keep the OK.
Or maybe "The request is being processing"
+ self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + I like that idea to remove old entries but I think 10 minutes is not a good number. Some tasks take long to finish, ie, in 10 minutes they may be running. Even if you check the Task status to be finished you may clean up an entry that would be used to update the UI.
Said that, I'd say to increase that value to 12h/24h (yeah, a big amount of time) to be safer.
What do you think about it? Yeah, I completely forgot that we have loooooooooooooooong tasks :-P I think 6h (or 12h) is sufficient and I can add a check to see if the task is finished or not.
I'd say 12h to be more conservative. And I am OK to check the Task status is finished or not.
def is_digit(value): if isinstance(value, int): return True

From: Paulo Vital <pvital@linux.vnet.ibm.com> TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), + "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..15fe6f9 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@ import time -from wok.exception import TimeoutExpired +from wok.exception import NotFoundError, TimeoutExpired +from wok.utils import tasks_queue class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore'] def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys() class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore'] def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id] - if task['status'] != 'running': + if task.status != 'running': return time.sleep(1) raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri}) -- 2.7.4

Reviewed-by: Aline Manera <alinefm@linux.vnet.ibm.com> On 08/24/2016 04:36 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
TasksModel and TaskModel classes need to be updated after move AsyncTasks to memory.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/i18n.py | 1 + src/wok/model/tasks.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 1a0446b..ade2ae9 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,6 +33,7 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
+ "WOKASYNC0001E": _("Unable to find task id: %(id)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/model/tasks.py b/src/wok/model/tasks.py index 9c32554..15fe6f9 100644 --- a/src/wok/model/tasks.py +++ b/src/wok/model/tasks.py @@ -22,7 +22,8 @@
import time
-from wok.exception import TimeoutExpired +from wok.exception import NotFoundError, TimeoutExpired +from wok.utils import tasks_queue
class TasksModel(object): @@ -30,8 +31,7 @@ class TasksModel(object): self.objstore = kargs['objstore']
def get_list(self): - with self.objstore as session: - return session.get_list('task') + return tasks_queue.keys()
class TaskModel(object): @@ -39,8 +39,13 @@ class TaskModel(object): self.objstore = kargs['objstore']
def lookup(self, id): - with self.objstore as session: - return session.get('task', str(id)) + if id not in tasks_queue.keys(): + raise NotFoundError('WOKASYNC0001E', {'id': id}) + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri}
def wait(self, id, timeout=10): """Wait for a task until it stops running (successfully or due to @@ -54,13 +59,12 @@ class TaskModel(object): "TimeoutExpired" is raised. """ for i in range(0, timeout): - with self.objstore as session: - task = session.get('task', str(id)) + task = tasks_queue[id]
- if task['status'] != 'running': + if task.status != 'running': return
time.sleep(1)
raise TimeoutExpired('WOKASYNC0003E', {'seconds': timeout, - 'task': task['target_uri']}) + 'task': task.target_uri})

From: Paulo Vital <pvital@linux.vnet.ibm.com> Create new testcases to test AsyncTasks in memory. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 tests/test_tasks.py diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..aa2b6f0 --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,136 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import unittest +import time + +from utils import wait_task + +from wok.model import model +from wok.objectstore import ObjectStore +from wok.utils import add_task, tasks_queue + + +class AsyncTaskTests(unittest.TestCase): + def setUp(self): + self.tmp_store = '/tmp/wok-tasks-objstore' + self.objstore = ObjectStore(self.tmp_store) + + def tearDown(self): + if os.path.isfile(self.tmp_store): + os.unlink(self.tmp_store) + + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = add_task('', self._quick_op, self.objstore, 'Hello') + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + taskid = add_task('', self._long_op, self.objstore, + {'delay': 3, 'result': False, + 'message': 'It was not meant to be'}) + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('Starting AsyncTask', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, self.objstore, {}) + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, self.objstore, + {'result': True}) + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model(objstore_loc=self.tmp_store) + taskid = add_task('', self._quick_op, inst.objstore, 'Hello') + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + taskid = add_task('', self._long_op, inst.objstore, + {'delay': 3, 'result': False, + 'message': 'It was not meant to be'}) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('Starting AsyncTask', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, inst.objstore, {}) + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, inst.objstore, + {'result': True}) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) -- 2.7.4

Seems you keep using objectstore to add_task. Maybe it is related to what Lucio has commented in a previous patch. On 08/24/2016 04:36 PM, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
Create new testcases to test AsyncTasks in memory.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- tests/test_tasks.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 tests/test_tasks.py
diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..aa2b6f0 --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,136 @@ +# +# Project Wok +# +# Copyright IBM Corp, 2016 +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import unittest +import time + +from utils import wait_task + +from wok.model import model +from wok.objectstore import ObjectStore +from wok.utils import add_task, tasks_queue + + +class AsyncTaskTests(unittest.TestCase): + def setUp(self):
+ self.tmp_store = '/tmp/wok-tasks-objstore' + self.objstore = ObjectStore(self.tmp_store) +
Probably, you can remove those lines.
+ def tearDown(self): + if os.path.isfile(self.tmp_store): + os.unlink(self.tmp_store) + + def _quick_op(self, cb, message): + cb(message, True) + + def _long_op(self, cb, params): + time.sleep(params.get('delay', 3)) + cb(params.get('message', ''), params.get('result', False)) + + def _continuous_ops(self, cb, params): + cb("step 1 OK") + time.sleep(2) + cb("step 2 OK") + time.sleep(2) + cb("step 3 OK", params.get('result', True)) + + def _task_lookup(self, id): + task = tasks_queue[id] + return {'id': id, + 'status': task.status, + 'message': task.message, + 'target_uri': task.target_uri} + + def test_async_tasks(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + taskid = add_task('', self._quick_op, self.objstore, 'Hello') + wait_task(self._task_lookup, taskid) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + self.assertEquals('Hello', self._task_lookup(taskid)['message']) + + taskid = add_task('', self._long_op, self.objstore, + {'delay': 3, 'result': False, + 'message': 'It was not meant to be'}) + self.assertEquals('running', self._task_lookup(taskid)['status']) + self.assertEquals('Starting AsyncTask', + self._task_lookup(taskid)['message']) + wait_task(self._task_lookup, taskid) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + self._task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, self.objstore, {}) + wait_task(self._task_lookup, taskid) + self.assertEquals('Exception raised', + self._task_lookup(taskid)['message']) + self.assertEquals('failed', self._task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, self.objstore, + {'result': True}) + self.assertEquals('running', self._task_lookup(taskid)['status']) + wait_task(self._task_lookup, taskid, timeout=10) + self.assertEquals('finished', self._task_lookup(taskid)['status']) + + def test_tasks_model(self): + class task_except(Exception): + pass + + def abnormal_op(cb, params): + try: + raise task_except + except: + cb("Exception raised", False) + + inst = model.Model(objstore_loc=self.tmp_store) + taskid = add_task('', self._quick_op, inst.objstore, 'Hello') + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + self.assertEquals('Hello', inst.task_lookup(taskid)['message']) + + taskid = add_task('', self._long_op, inst.objstore, + {'delay': 3, 'result': False, + 'message': 'It was not meant to be'}) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + self.assertEquals('Starting AsyncTask', + inst.task_lookup(taskid)['message']) + inst.task_wait(taskid) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + self.assertEquals('It was not meant to be', + inst.task_lookup(taskid)['message']) + + taskid = add_task('', abnormal_op, inst.objstore, {}) + inst.task_wait(taskid) + self.assertEquals('Exception raised', + inst.task_lookup(taskid)['message']) + self.assertEquals('failed', inst.task_lookup(taskid)['status']) + + taskid = add_task('', self._continuous_ops, inst.objstore, + {'result': True}) + self.assertEquals('running', inst.task_lookup(taskid)['status']) + inst.task_wait(taskid, timeout=10) + self.assertEquals('finished', inst.task_lookup(taskid)['status'])
participants (4)
-
Aline Manera
-
Lucio Correia
-
Paulo Ricardo Paz Vital
-
pvital@linux.vnet.ibm.com