
From: Paulo Vital <pvital@linux.vnet.ibm.com> This patch moves all infrastructure of AsyncTask to store and read the tasks' information (id, status, messages and target_uri) from objectstore to memory, by the implementation of a dictionary of AsyncTasks objects. It also removes the tasks cleanup process from objectstore. Signed-off-by: Paulo Vital <pvital@linux.vnet.ibm.com> --- src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++-------------------- src/wok/i18n.py | 2 -- src/wok/objectstore.py | 6 ------ src/wok/utils.py | 19 ------------------- 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py index fb614a2..b98ad9a 100644 --- a/src/wok/asynctask.py +++ b/src/wok/asynctask.py @@ -21,29 +21,41 @@ import cherrypy import threading +import time import traceback +import uuid -from wok.exception import OperationFailed +tasks_queue = {} -class AsyncTask(object): - def __init__(self, id, target_uri, fn, objstore, opaque=None): - if objstore is None: - raise OperationFailed("WOKASYNC0001E") +def clean_old_tasks(): + """ + Remove from tasks_queue any task that started before 12 hours ago and has + current status equal do finished or failed. + """ + for id, task in tasks_queue.items(): + if (task.timestamp < (time.time()-43200)): + if (task.status is 'finished') or (task.status is 'failed'): + task.remove() + - self.id = str(id) +class AsyncTask(object): + def __init__(self, target_uri, fn, opaque=None): + self.id = str(uuid.uuid1()) self.target_uri = target_uri self.fn = fn - self.objstore = objstore self.status = 'running' - self.message = 'OK' - self._save_helper() + self.message = 'The request is being processing.' + self.timestamp = time.time() self._cp_request = cherrypy.serving.request self.thread = threading.Thread(target=self._run_helper, args=(opaque, self._status_cb)) self.thread.setDaemon(True) self.thread.start() + # let's prevent memory leak in tasks_queue + clean_old_tasks() + tasks_queue[self.id] = self def _status_cb(self, message, success=None): if success is not None: @@ -51,17 +63,6 @@ class AsyncTask(object): if message.strip(): self.message = message - self._save_helper() - - def _save_helper(self): - obj = {} - for attr in ('id', 'target_uri', 'message', 'status'): - obj[attr] = getattr(self, attr) - try: - with self.objstore as session: - session.store('task', self.id, obj) - except Exception as e: - raise OperationFailed('WOKASYNC0002E', {'err': e.message}) def _run_helper(self, opaque, cb): cherrypy.serving.request = self._cp_request @@ -71,3 +72,10 @@ class AsyncTask(object): cherrypy.log.error_log.error("Error in async_task %s " % self.id) cherrypy.log.error_log.error(traceback.format_exc()) cb(e.message, False) + + def remove(self): + try: + del tasks_queue[self.id] + except KeyError: + msg = "There's no task_id %s in tasks_queue. Nothing changed." + cherrypy.log.error_log.error(msg % self.id) diff --git a/src/wok/i18n.py b/src/wok/i18n.py index 33107ee..1a0446b 100644 --- a/src/wok/i18n.py +++ b/src/wok/i18n.py @@ -33,8 +33,6 @@ messages = { "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"), "WOKAPI0009E": _("You don't have permission to perform this operation."), - "WOKASYNC0001E": _("Datastore is not initiated in the model object."), - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"), "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."), "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"), diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py index 59354f3..817f60c 100644 --- a/src/wok/objectstore.py +++ b/src/wok/objectstore.py @@ -107,8 +107,6 @@ class ObjectStore(object): c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND tbl_name='objects'; ''') res = c.fetchall() - # Because the tasks are regarded as temporary resource, the task states - # are purged every time the daemon startup if len(res) == 0: c.execute('''CREATE TABLE objects (id TEXT, type TEXT, json TEXT, version TEXT, @@ -116,10 +114,6 @@ class ObjectStore(object): conn.commit() return - # Clear out expired objects from a previous session - c.execute('''DELETE FROM objects WHERE type = 'task'; ''') - conn.commit() - def _get_conn(self): ident = threading.currentThread().ident try: diff --git a/src/wok/utils.py b/src/wok/utils.py index 4c132a1..7599e85 100644 --- a/src/wok/utils.py +++ b/src/wok/utils.py @@ -39,31 +39,12 @@ from datetime import datetime, timedelta from multiprocessing import Process, Queue from threading import Timer -from wok.asynctask import AsyncTask from wok.config import paths, PluginPaths from wok.exception import InvalidParameter, TimeoutExpired from wok.stringutils import decode_value wok_log = cherrypy.log.error_log -task_id = 0 - - -def get_next_task_id(): - global task_id - task_id += 1 - return task_id - - -def get_task_id(): - global task_id - return task_id - - -def add_task(target_uri, fn, objstore, opaque=None): - id = get_next_task_id() - AsyncTask(id, target_uri, fn, objstore, opaque) - return id def is_digit(value): -- 2.7.4