From: Paulo Vital <pvital(a)linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read
the tasks' information (id, status, messages and target_uri) from
objectstore to memory, by the implementation of a dictionary of
AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital(a)linux.vnet.ibm.com>
---
src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++--------------------
src/wok/i18n.py | 2 --
src/wok/objectstore.py | 6 ------
src/wok/utils.py | 19 -------------------
4 files changed, 28 insertions(+), 47 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
index fb614a2..1a114a8 100644
--- a/src/wok/asynctask.py
+++ b/src/wok/asynctask.py
@@ -21,29 +21,41 @@
import cherrypy
import threading
+import time
import traceback
+import uuid
-from wok.exception import OperationFailed
+tasks_queue = {}
-class AsyncTask(object):
- def __init__(self, id, target_uri, fn, objstore, opaque=None):
- if objstore is None:
- raise OperationFailed("WOKASYNC0001E")
+def clean_old_tasks():
+ """
+ Remove from tasks_queue any task that started before 12 hours ago and has
+ current status equal do finished or failed.
+ """
+ for id, task in tasks_queue.items():
+ if (task.timestamp < (time.time()-43200)):
+ if (task.status is 'finished') or (task.status is 'failed'):
+ task.remove(id)
+
- self.id = str(id)
+class AsyncTask(object):
+ def __init__(self, target_uri, fn, opaque=None):
+ self.id = str(uuid.uuid1())
self.target_uri = target_uri
self.fn = fn
- self.objstore = objstore
self.status = 'running'
- self.message = 'OK'
- self._save_helper()
+ self.message = 'The request is being processing.'
+ self.timestamp = time.time()
self._cp_request = cherrypy.serving.request
self.thread = threading.Thread(target=self._run_helper,
args=(opaque, self._status_cb))
self.thread.setDaemon(True)
self.thread.start()
+ # let's prevent memory leak in tasks_queue
+ clean_old_tasks()
+ tasks_queue[self.id] = self
def _status_cb(self, message, success=None):
if success is not None:
@@ -51,17 +63,6 @@ class AsyncTask(object):
if message.strip():
self.message = message
- self._save_helper()
-
- def _save_helper(self):
- obj = {}
- for attr in ('id', 'target_uri', 'message',
'status'):
- obj[attr] = getattr(self, attr)
- try:
- with self.objstore as session:
- session.store('task', self.id, obj)
- except Exception as e:
- raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb):
cherrypy.serving.request = self._cp_request
@@ -71,3 +72,10 @@ class AsyncTask(object):
cherrypy.log.error_log.error("Error in async_task %s " % self.id)
cherrypy.log.error_log.error(traceback.format_exc())
cb(e.message, False)
+
+ def remove(self, id):
+ try:
+ del tasks_queue[id]
+ except KeyError:
+ msg = "There's no task_id %s in tasks_queue. Nothing changed."
+ cherrypy.log.error_log.error(msg % id)
diff --git a/src/wok/i18n.py b/src/wok/i18n.py
index 33107ee..1a0446b 100644
--- a/src/wok/i18n.py
+++ b/src/wok/i18n.py
@@ -33,8 +33,6 @@ messages = {
"WOKAPI0008E": _("Parameters does not match requirement in schema:
%(err)s"),
"WOKAPI0009E": _("You don't have permission to perform this
operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model
object."),
- "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
"WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while
running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user
'%(username)s'. [Error code: %(code)s]"),
diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
index 59354f3..817f60c 100644
--- a/src/wok/objectstore.py
+++ b/src/wok/objectstore.py
@@ -107,8 +107,6 @@ class ObjectStore(object):
c.execute('''SELECT * FROM sqlite_master WHERE type='table'
AND
tbl_name='objects'; ''')
res = c.fetchall()
- # Because the tasks are regarded as temporary resource, the task states
- # are purged every time the daemon startup
if len(res) == 0:
c.execute('''CREATE TABLE objects
(id TEXT, type TEXT, json TEXT, version TEXT,
@@ -116,10 +114,6 @@ class ObjectStore(object):
conn.commit()
return
- # Clear out expired objects from a previous session
- c.execute('''DELETE FROM objects WHERE type = 'task';
''')
- conn.commit()
-
def _get_conn(self):
ident = threading.currentThread().ident
try:
diff --git a/src/wok/utils.py b/src/wok/utils.py
index 4c132a1..7599e85 100644
--- a/src/wok/utils.py
+++ b/src/wok/utils.py
@@ -39,31 +39,12 @@ from datetime import datetime, timedelta
from multiprocessing import Process, Queue
from threading import Timer
-from wok.asynctask import AsyncTask
from wok.config import paths, PluginPaths
from wok.exception import InvalidParameter, TimeoutExpired
from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log
-task_id = 0
-
-
-def get_next_task_id():
- global task_id
- task_id += 1
- return task_id
-
-
-def get_task_id():
- global task_id
- return task_id
-
-
-def add_task(target_uri, fn, objstore, opaque=None):
- id = get_next_task_id()
- AsyncTask(id, target_uri, fn, objstore, opaque)
- return id
def is_digit(value):
--
2.7.4