
On 08/25/2016 11:23 AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py
+1
One more comment below.
On 24-08-2016 16:36, pvital@linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital@linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 25 ++++--------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 31 ++++++++++++++++++++++++------- 4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..2008c18 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,24 +21,18 @@
import cherrypy import threading +import time import traceback
-from wok.exception import OperationFailed - - class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") - + def __init__(self, id, target_uri, fn, opaque=None): self.id = str(id) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'Starting AsyncTask' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) @@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return
- # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..8c13c70 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -31,6 +31,7 @@ import re import sqlite3 import subprocess import sys +import time import traceback import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log task_id = 0 +tasks_queue = {}
def get_next_task_id(): @@ -55,17 +57,32 @@ def get_next_task_id(): return task_id
-def get_task_id(): - global task_id - return task_id - - def add_task(target_uri, fn, objstore, opaque=None): objstore is not in use here
- id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) + # let's prevent memory leak + clean_old_tasks() + id = str(get_next_task_id()) + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque) return id
+def remove_task(id): + try: + del tasks_queue[id] + except KeyError: + cherrypy.log.error_log.error("There's no task_id %s in tasks_queue." + " Nothing changed." % id) + + +def clean_old_tasks(): + """ + Check for all tasks in tasks_queue and remove those if timestamp < than + 10 minutes ago. + """ + for id, task in tasks_queue.items(): + if task.timestamp < (time.time()-600): + remove_task(id) + + def is_digit(value): if isinstance(value, int): return True