[Kimchi-devel] [PATCH] [Wok 1/3] Issue #158: Move AsyncTask information to memory

pvital at linux.vnet.ibm.com pvital at linux.vnet.ibm.com
Fri Aug 26 14:18:09 UTC 2016


From: Paulo Vital <pvital at linux.vnet.ibm.com>

This patch moves all infrastructure of AsyncTask to store and read
the tasks' information (id, status, messages and target_uri) from
objectstore to memory, by the implementation of a dictionary of
AsyncTasks objects.

It also removes the tasks cleanup process from objectstore.

Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
---
 src/wok/asynctask.py   | 58 +++++++++++++++++++++++++++++++++-----------------
 src/wok/i18n.py        |  2 --
 src/wok/objectstore.py |  6 ------
 src/wok/utils.py       | 19 -----------------
 4 files changed, 39 insertions(+), 46 deletions(-)

diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
index fb614a2..3298787 100644
--- a/src/wok/asynctask.py
+++ b/src/wok/asynctask.py
@@ -21,24 +21,55 @@
 
 import cherrypy
 import threading
+import time
 import traceback
 
 
-from wok.exception import OperationFailed
+task_id = 0
+tasks_queue = {}
 
 
-class AsyncTask(object):
-    def __init__(self, id, target_uri, fn, objstore, opaque=None):
-        if objstore is None:
-            raise OperationFailed("WOKASYNC0001E")
+def get_next_task_id():
+    global task_id
+    task_id += 1
+    return task_id
+
+
+def add_task(target_uri, fn, opaque=None):
+    # let's prevent memory leak
+    clean_old_tasks()
+    id = str(get_next_task_id())
+    tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
+    return id
+
+
+def remove_task(id):
+    try:
+        del tasks_queue[id]
+    except KeyError:
+        cherrypy.log.error_log.error("There's no task_id %s in tasks_queue."
+                                     " Nothing changed." % id)
+
 
+def clean_old_tasks():
+    """
+    Check for all tasks in tasks_queue and remove those if timestamp < than
+    12 hours ago and status equal do finished or failed.
+    """
+    for id, task in tasks_queue.items():
+        if (task.timestamp < (time.time()-43200)):
+            if (task.status is 'finished') or (task.status is 'failed'):
+                remove_task(id)
+
+
+class AsyncTask(object):
+    def __init__(self, id, target_uri, fn, opaque=None):
         self.id = str(id)
         self.target_uri = target_uri
         self.fn = fn
-        self.objstore = objstore
         self.status = 'running'
-        self.message = 'OK'
-        self._save_helper()
+        self.message = 'The request is being processing.'
+        self.timestamp = time.time()
         self._cp_request = cherrypy.serving.request
         self.thread = threading.Thread(target=self._run_helper,
                                        args=(opaque, self._status_cb))
@@ -51,17 +82,6 @@ class AsyncTask(object):
 
         if message.strip():
             self.message = message
-            self._save_helper()
-
-    def _save_helper(self):
-        obj = {}
-        for attr in ('id', 'target_uri', 'message', 'status'):
-            obj[attr] = getattr(self, attr)
-        try:
-            with self.objstore as session:
-                session.store('task', self.id, obj)
-        except Exception as e:
-            raise OperationFailed('WOKASYNC0002E', {'err': e.message})
 
     def _run_helper(self, opaque, cb):
         cherrypy.serving.request = self._cp_request
diff --git a/src/wok/i18n.py b/src/wok/i18n.py
index 33107ee..1a0446b 100644
--- a/src/wok/i18n.py
+++ b/src/wok/i18n.py
@@ -33,8 +33,6 @@ messages = {
     "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"),
     "WOKAPI0009E": _("You don't have permission to perform this operation."),
 
-    "WOKASYNC0001E": _("Datastore is not initiated in the model object."),
-    "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
     "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
 
     "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"),
diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
index 59354f3..817f60c 100644
--- a/src/wok/objectstore.py
+++ b/src/wok/objectstore.py
@@ -107,8 +107,6 @@ class ObjectStore(object):
         c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND
                      tbl_name='objects'; ''')
         res = c.fetchall()
-        # Because the tasks are regarded as temporary resource, the task states
-        # are purged every time the daemon startup
         if len(res) == 0:
             c.execute('''CREATE TABLE objects
                       (id TEXT, type TEXT, json TEXT, version TEXT,
@@ -116,10 +114,6 @@ class ObjectStore(object):
             conn.commit()
             return
 
-        # Clear out expired objects from a previous session
-        c.execute('''DELETE FROM objects WHERE type = 'task'; ''')
-        conn.commit()
-
     def _get_conn(self):
         ident = threading.currentThread().ident
         try:
diff --git a/src/wok/utils.py b/src/wok/utils.py
index 4c132a1..7599e85 100644
--- a/src/wok/utils.py
+++ b/src/wok/utils.py
@@ -39,31 +39,12 @@ from datetime import datetime, timedelta
 from multiprocessing import Process, Queue
 from threading import Timer
 
-from wok.asynctask import AsyncTask
 from wok.config import paths, PluginPaths
 from wok.exception import InvalidParameter, TimeoutExpired
 from wok.stringutils import decode_value
 
 
 wok_log = cherrypy.log.error_log
-task_id = 0
-
-
-def get_next_task_id():
-    global task_id
-    task_id += 1
-    return task_id
-
-
-def get_task_id():
-    global task_id
-    return task_id
-
-
-def add_task(target_uri, fn, objstore, opaque=None):
-    id = get_next_task_id()
-    AsyncTask(id, target_uri, fn, objstore, opaque)
-    return id
 
 
 def is_digit(value):
-- 
2.7.4




More information about the Kimchi-devel mailing list