[Kimchi-devel] [PATCH] [Wok 1/3] Issue #158: Move AsyncTask information to memory

Lucio Correia luciojhc at linux.vnet.ibm.com
Mon Aug 29 19:55:09 UTC 2016


On 29-08-2016 16:18, pvital at linux.vnet.ibm.com wrote:
> From: Paulo Vital <pvital at linux.vnet.ibm.com>
>
> This patch moves all infrastructure of AsyncTask to store and read
> the tasks' information (id, status, messages and target_uri) from
> objectstore to memory, by the implementation of a dictionary of
> AsyncTasks objects.
>
> It also removes the tasks cleanup process from objectstore.
>
> Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
> ---
>  src/wok/asynctask.py   | 48 ++++++++++++++++++++++++++++--------------------
>  src/wok/i18n.py        |  2 --
>  src/wok/objectstore.py |  6 ------
>  src/wok/utils.py       | 19 -------------------
>  4 files changed, 28 insertions(+), 47 deletions(-)
>
> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
> index fb614a2..1a114a8 100644
> --- a/src/wok/asynctask.py
> +++ b/src/wok/asynctask.py
> @@ -21,29 +21,41 @@
>
>  import cherrypy
>  import threading
> +import time
>  import traceback
> +import uuid
>
>
> -from wok.exception import OperationFailed
> +tasks_queue = {}
>
>
> -class AsyncTask(object):
> -    def __init__(self, id, target_uri, fn, objstore, opaque=None):
> -        if objstore is None:
> -            raise OperationFailed("WOKASYNC0001E")
> +def clean_old_tasks():
> +    """
> +    Remove from tasks_queue any task that started before 12 hours ago and has
> +    current status equal do finished or failed.
> +    """
> +    for id, task in tasks_queue.items():
> +        if (task.timestamp < (time.time()-43200)):
> +            if (task.status is 'finished') or (task.status is 'failed'):
> +                task.remove(id)
> +
>
> -        self.id = str(id)
> +class AsyncTask(object):
> +    def __init__(self, target_uri, fn, opaque=None):
> +        self.id = str(uuid.uuid1())
>          self.target_uri = target_uri
>          self.fn = fn
> -        self.objstore = objstore
>          self.status = 'running'
> -        self.message = 'OK'
> -        self._save_helper()
> +        self.message = 'The request is being processing.'
> +        self.timestamp = time.time()
>          self._cp_request = cherrypy.serving.request
>          self.thread = threading.Thread(target=self._run_helper,
>                                         args=(opaque, self._status_cb))
>          self.thread.setDaemon(True)
>          self.thread.start()
> +        # let's prevent memory leak in tasks_queue
> +        clean_old_tasks()
> +        tasks_queue[self.id] = self
>
>      def _status_cb(self, message, success=None):
>          if success is not None:
> @@ -51,17 +63,6 @@ class AsyncTask(object):
>
>          if message.strip():
>              self.message = message
> -            self._save_helper()
> -
> -    def _save_helper(self):
> -        obj = {}
> -        for attr in ('id', 'target_uri', 'message', 'status'):
> -            obj[attr] = getattr(self, attr)
> -        try:
> -            with self.objstore as session:
> -                session.store('task', self.id, obj)
> -        except Exception as e:
> -            raise OperationFailed('WOKASYNC0002E', {'err': e.message})
>
>      def _run_helper(self, opaque, cb):
>          cherrypy.serving.request = self._cp_request
> @@ -71,3 +72,10 @@ class AsyncTask(object):
>              cherrypy.log.error_log.error("Error in async_task %s " % self.id)
>              cherrypy.log.error_log.error(traceback.format_exc())
>              cb(e.message, False)
> +
> +    def remove(self, id):
A task should not kill another one. I believe we don't need a parameter 
id here, just use self.id.

> +        try:
> +            del tasks_queue[id]
> +        except KeyError:
> +            msg = "There's no task_id %s in tasks_queue. Nothing changed."
> +            cherrypy.log.error_log.error(msg % id)
> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
> index 33107ee..1a0446b 100644
> --- a/src/wok/i18n.py
> +++ b/src/wok/i18n.py
> @@ -33,8 +33,6 @@ messages = {
>      "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"),
>      "WOKAPI0009E": _("You don't have permission to perform this operation."),
>
> -    "WOKASYNC0001E": _("Datastore is not initiated in the model object."),
> -    "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
>      "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
>
>      "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"),
> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
> index 59354f3..817f60c 100644
> --- a/src/wok/objectstore.py
> +++ b/src/wok/objectstore.py
> @@ -107,8 +107,6 @@ class ObjectStore(object):
>          c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND
>                       tbl_name='objects'; ''')
>          res = c.fetchall()
> -        # Because the tasks are regarded as temporary resource, the task states
> -        # are purged every time the daemon startup
>          if len(res) == 0:
>              c.execute('''CREATE TABLE objects
>                        (id TEXT, type TEXT, json TEXT, version TEXT,
> @@ -116,10 +114,6 @@ class ObjectStore(object):
>              conn.commit()
>              return
>
> -        # Clear out expired objects from a previous session
> -        c.execute('''DELETE FROM objects WHERE type = 'task'; ''')
> -        conn.commit()
> -
>      def _get_conn(self):
>          ident = threading.currentThread().ident
>          try:
> diff --git a/src/wok/utils.py b/src/wok/utils.py
> index 4c132a1..7599e85 100644
> --- a/src/wok/utils.py
> +++ b/src/wok/utils.py
> @@ -39,31 +39,12 @@ from datetime import datetime, timedelta
>  from multiprocessing import Process, Queue
>  from threading import Timer
>
> -from wok.asynctask import AsyncTask
>  from wok.config import paths, PluginPaths
>  from wok.exception import InvalidParameter, TimeoutExpired
>  from wok.stringutils import decode_value
>
>
>  wok_log = cherrypy.log.error_log
> -task_id = 0
> -
> -
> -def get_next_task_id():
> -    global task_id
> -    task_id += 1
> -    return task_id
> -
> -
> -def get_task_id():
> -    global task_id
> -    return task_id
> -
> -
> -def add_task(target_uri, fn, objstore, opaque=None):
> -    id = get_next_task_id()
> -    AsyncTask(id, target_uri, fn, objstore, opaque)
> -    return id
>
>
>  def is_digit(value):
>


-- 
Lucio Correia
Software Engineer
IBM LTC Brazil




More information about the Kimchi-devel mailing list