On 08/24/2016 04:36 PM, pvital(a)linux.vnet.ibm.com wrote:
From: Paulo Vital <pvital(a)linux.vnet.ibm.com>
This patch moves all infrastructure of AsyncTask to store and read
the tasks' information (id, status, messages and target_uri) from
objectstore to memory, by the implementation of a dictionary of
AsyncTasks objects.
It also removes the tasks cleanup process from objectstore.
Signed-off-by: Paulo Vital <pvital(a)linux.vnet.ibm.com>
---
src/wok/asynctask.py | 25 ++++---------------------
src/wok/i18n.py | 2 --
src/wok/objectstore.py | 6 ------
src/wok/utils.py | 31 ++++++++++++++++++++++++-------
4 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
index fb614a2..2008c18 100644
--- a/src/wok/asynctask.py
+++ b/src/wok/asynctask.py
@@ -21,24 +21,18 @@
import cherrypy
import threading
+import time
import traceback
-from wok.exception import OperationFailed
-
-
class AsyncTask(object):
- def __init__(self, id, target_uri, fn, objstore, opaque=None):
- if objstore is None:
- raise OperationFailed("WOKASYNC0001E")
-
+ def __init__(self, id, target_uri, fn, opaque=None):
self.id = str(id)
self.target_uri = target_uri
self.fn = fn
- self.objstore = objstore
self.status = 'running'
- self.message = 'OK'
- self._save_helper()
+ self.message = 'Starting AsyncTask'
The 'message' is used on UI and the user is not aware about AsyncTask.
So my suggestion is to have a more generic message ("Processing
started"/"Processing request") or keep it in blank.
+ self.timestamp = time.time()
self._cp_request = cherrypy.serving.request
self.thread = threading.Thread(target=self._run_helper,
args=(opaque, self._status_cb))
@@ -51,17 +45,6 @@ class AsyncTask(object):
if message.strip():
self.message = message
- self._save_helper()
-
- def _save_helper(self):
- obj = {}
- for attr in ('id', 'target_uri', 'message',
'status'):
- obj[attr] = getattr(self, attr)
- try:
- with self.objstore as session:
- session.store('task', self.id, obj)
- except Exception as e:
- raise OperationFailed('WOKASYNC0002E', {'err': e.message})
def _run_helper(self, opaque, cb):
cherrypy.serving.request = self._cp_request
diff --git a/src/wok/i18n.py b/src/wok/i18n.py
index 33107ee..1a0446b 100644
--- a/src/wok/i18n.py
+++ b/src/wok/i18n.py
@@ -33,8 +33,6 @@ messages = {
"WOKAPI0008E": _("Parameters does not match requirement in schema:
%(err)s"),
"WOKAPI0009E": _("You don't have permission to perform this
operation."),
- "WOKASYNC0001E": _("Datastore is not initiated in the model
object."),
- "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
"WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while
running task '%(task)s."),
"WOKAUTH0001E": _("Authentication failed for user
'%(username)s'. [Error code: %(code)s]"),
diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
index 59354f3..817f60c 100644
--- a/src/wok/objectstore.py
+++ b/src/wok/objectstore.py
@@ -107,8 +107,6 @@ class ObjectStore(object):
c.execute('''SELECT * FROM sqlite_master WHERE type='table'
AND
tbl_name='objects'; ''')
res = c.fetchall()
- # Because the tasks are regarded as temporary resource, the task states
- # are purged every time the daemon startup
if len(res) == 0:
c.execute('''CREATE TABLE objects
(id TEXT, type TEXT, json TEXT, version TEXT,
@@ -116,10 +114,6 @@ class ObjectStore(object):
conn.commit()
return
- # Clear out expired objects from a previous session
- c.execute('''DELETE FROM objects WHERE type = 'task';
''')
- conn.commit()
-
def _get_conn(self):
ident = threading.currentThread().ident
try:
diff --git a/src/wok/utils.py b/src/wok/utils.py
index 4c132a1..8c13c70 100644
--- a/src/wok/utils.py
+++ b/src/wok/utils.py
@@ -31,6 +31,7 @@ import re
import sqlite3
import subprocess
import sys
+import time
import traceback
import xml.etree.ElementTree as ET
@@ -47,6 +48,7 @@ from wok.stringutils import decode_value
wok_log = cherrypy.log.error_log
task_id = 0
+tasks_queue = {}
def get_next_task_id():
@@ -55,17 +57,32 @@ def get_next_task_id():
return task_id
-def get_task_id():
- global task_id
- return task_id
-
-
def add_task(target_uri, fn, objstore, opaque=None):
- id = get_next_task_id()
- AsyncTask(id, target_uri, fn, objstore, opaque)
+ # let's prevent memory leak
+ clean_old_tasks()
+ id = str(get_next_task_id())
+ tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
return id
+def remove_task(id):
+ try:
+ del tasks_queue[id]
+ except KeyError:
+ cherrypy.log.error_log.error("There's no task_id %s in
tasks_queue."
+ " Nothing changed." % id)
+
+
+def clean_old_tasks():
+ """
+ Check for all tasks in tasks_queue and remove those if timestamp < than
+ 10 minutes ago.
+ """
+ for id, task in tasks_queue.items():
+ if task.timestamp < (time.time()-600):
+ remove_task(id)
+
+
I like that idea to remove old entries but I think 10 minutes is not a
good number.
Some tasks take long to finish, ie, in 10 minutes they may be running.
Even if you check the Task status to be finished you may clean up an
entry that would be used to update the UI.
Said that, I'd say to increase that value to 12h/24h (yeah, a big amount
of time) to be safer.
What do you think about it?
def is_digit(value):
if isinstance(value, int):
return True