[Kimchi-devel] [PATCH] [Wok 1/3] Issue #158: Move AsyncTask information to memory

Aline Manera alinefm at linux.vnet.ibm.com
Fri Aug 26 12:35:22 UTC 2016



On 08/26/2016 08:40 AM, Paulo Ricardo Paz Vital wrote:
> On Aug 26 08:23AM, Aline Manera wrote:
>>
>> On 08/24/2016 04:36 PM, pvital at linux.vnet.ibm.com wrote:
>>> From: Paulo Vital <pvital at linux.vnet.ibm.com>
>>>
>>> This patch moves all infrastructure of AsyncTask to store and read
>>> the tasks' information (id, status, messages and target_uri) from
>>> objectstore to memory, by the implementation of a dictionary of
>>> AsyncTasks objects.
>>>
>>> It also removes the tasks cleanup process from objectstore.
>>>
>>> Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
>>> ---
>>>    src/wok/asynctask.py   | 25 ++++---------------------
>>>    src/wok/i18n.py        |  2 --
>>>    src/wok/objectstore.py |  6 ------
>>>    src/wok/utils.py       | 31 ++++++++++++++++++++++++-------
>>>    4 files changed, 28 insertions(+), 36 deletions(-)
>>>
>>> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
>>> index fb614a2..2008c18 100644
>>> --- a/src/wok/asynctask.py
>>> +++ b/src/wok/asynctask.py
>>> @@ -21,24 +21,18 @@
>>>
>>>    import cherrypy
>>>    import threading
>>> +import time
>>>    import traceback
>>>
>>>
>>> -from wok.exception import OperationFailed
>>> -
>>> -
>>>    class AsyncTask(object):
>>> -    def __init__(self, id, target_uri, fn, objstore, opaque=None):
>>> -        if objstore is None:
>>> -            raise OperationFailed("WOKASYNC0001E")
>>> -
>>> +    def __init__(self, id, target_uri, fn, opaque=None):
>>>            self.id = str(id)
>>>            self.target_uri = target_uri
>>>            self.fn = fn
>>> -        self.objstore = objstore
>>>            self.status = 'running'
>>> -        self.message = 'OK'
>>> -        self._save_helper()
>>> +        self.message = 'Starting AsyncTask'
>> The 'message' is used on UI and the user is not aware about AsyncTask.
>> So my suggestion is to have a more generic message ("Processing
>> started"/"Processing request") or keep it in blank.
> Actually I modified the message from 'OK' to this one. May be, a single
> "Starting..." would be better due to user doesn't know exactly what is
> being processed. Or keep the OK.

Or maybe "The request is being processing"

>>> +        self.timestamp = time.time()
>>>            self._cp_request = cherrypy.serving.request
>>>            self.thread = threading.Thread(target=self._run_helper,
>>>                                           args=(opaque, self._status_cb))
>>> @@ -51,17 +45,6 @@ class AsyncTask(object):
>>>
>>>            if message.strip():
>>>                self.message = message
>>> -            self._save_helper()
>>> -
>>> -    def _save_helper(self):
>>> -        obj = {}
>>> -        for attr in ('id', 'target_uri', 'message', 'status'):
>>> -            obj[attr] = getattr(self, attr)
>>> -        try:
>>> -            with self.objstore as session:
>>> -                session.store('task', self.id, obj)
>>> -        except Exception as e:
>>> -            raise OperationFailed('WOKASYNC0002E', {'err': e.message})
>>>
>>>        def _run_helper(self, opaque, cb):
>>>            cherrypy.serving.request = self._cp_request
>>> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
>>> index 33107ee..1a0446b 100644
>>> --- a/src/wok/i18n.py
>>> +++ b/src/wok/i18n.py
>>> @@ -33,8 +33,6 @@ messages = {
>>>        "WOKAPI0008E": _("Parameters does not match requirement in schema: %(err)s"),
>>>        "WOKAPI0009E": _("You don't have permission to perform this operation."),
>>>
>>> -    "WOKASYNC0001E": _("Datastore is not initiated in the model object."),
>>> -    "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
>>>        "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while running task '%(task)s."),
>>>
>>>        "WOKAUTH0001E": _("Authentication failed for user '%(username)s'. [Error code: %(code)s]"),
>>> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
>>> index 59354f3..817f60c 100644
>>> --- a/src/wok/objectstore.py
>>> +++ b/src/wok/objectstore.py
>>> @@ -107,8 +107,6 @@ class ObjectStore(object):
>>>            c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND
>>>                         tbl_name='objects'; ''')
>>>            res = c.fetchall()
>>> -        # Because the tasks are regarded as temporary resource, the task states
>>> -        # are purged every time the daemon startup
>>>            if len(res) == 0:
>>>                c.execute('''CREATE TABLE objects
>>>                          (id TEXT, type TEXT, json TEXT, version TEXT,
>>> @@ -116,10 +114,6 @@ class ObjectStore(object):
>>>                conn.commit()
>>>                return
>>>
>>> -        # Clear out expired objects from a previous session
>>> -        c.execute('''DELETE FROM objects WHERE type = 'task'; ''')
>>> -        conn.commit()
>>> -
>>>        def _get_conn(self):
>>>            ident = threading.currentThread().ident
>>>            try:
>>> diff --git a/src/wok/utils.py b/src/wok/utils.py
>>> index 4c132a1..8c13c70 100644
>>> --- a/src/wok/utils.py
>>> +++ b/src/wok/utils.py
>>> @@ -31,6 +31,7 @@ import re
>>>    import sqlite3
>>>    import subprocess
>>>    import sys
>>> +import time
>>>    import traceback
>>>    import xml.etree.ElementTree as ET
>>>
>>> @@ -47,6 +48,7 @@ from wok.stringutils import decode_value
>>>
>>>    wok_log = cherrypy.log.error_log
>>>    task_id = 0
>>> +tasks_queue = {}
>>>
>>>
>>>    def get_next_task_id():
>>> @@ -55,17 +57,32 @@ def get_next_task_id():
>>>        return task_id
>>>
>>>
>>> -def get_task_id():
>>> -    global task_id
>>> -    return task_id
>>> -
>>> -
>>>    def add_task(target_uri, fn, objstore, opaque=None):
>>> -    id = get_next_task_id()
>>> -    AsyncTask(id, target_uri, fn, objstore, opaque)
>>> +    # let's prevent memory leak
>>> +    clean_old_tasks()
>>> +    id = str(get_next_task_id())
>>> +    tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
>>>        return id
>>>
>>>
>>> +def remove_task(id):
>>> +    try:
>>> +        del tasks_queue[id]
>>> +    except KeyError:
>>> +        cherrypy.log.error_log.error("There's no task_id %s in tasks_queue."
>>> +                                     " Nothing changed." % id)
>>> +
>>> +
>>> +def clean_old_tasks():
>>> +    """
>>> +    Check for all tasks in tasks_queue and remove those if timestamp < than
>>> +    10 minutes ago.
>>> +    """
>>> +    for id, task in tasks_queue.items():
>>> +        if task.timestamp < (time.time()-600):
>>> +            remove_task(id)
>>> +
>>> +
>> I like that idea to remove old entries but I think 10 minutes is not a good
>> number.
>> Some tasks take long to finish, ie, in 10 minutes they may be running. Even
>> if you check the Task status to be finished you may clean up an entry that
>> would be used to update the UI.
>>
>> Said that, I'd say to increase that value to 12h/24h (yeah, a big amount of
>> time) to be safer.
>>
>> What do you think about it?
> Yeah, I completely forgot that we have loooooooooooooooong tasks :-P
> I think 6h (or 12h) is sufficient and I can add a check to see if the
> task is finished or not.
>

I'd say 12h to be more conservative. And I am OK to check the Task 
status is finished or not.

>>>    def is_digit(value):
>>>        if isinstance(value, int):
>>>            return True




More information about the Kimchi-devel mailing list