[Kimchi-devel] [PATCH] [Wok 1/3] Issue #158: Move AsyncTask information to memory

Aline Manera alinefm at linux.vnet.ibm.com
Fri Aug 26 17:17:04 UTC 2016



On 08/26/2016 02:13 PM, Aline Manera wrote:
> Also thinking more about the structure, see the comments below:
>
> On 08/26/2016 11:18 AM, pvital at linux.vnet.ibm.com wrote:
>> From: Paulo Vital <pvital at linux.vnet.ibm.com>
>>
>> This patch moves all infrastructure of AsyncTask to store and read
>> the tasks' information (id, status, messages and target_uri) from
>> objectstore to memory, by the implementation of a dictionary of
>> AsyncTasks objects.
>>
>> It also removes the tasks cleanup process from objectstore.
>>
>> Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
>> ---
>>   src/wok/asynctask.py   | 58 
>> +++++++++++++++++++++++++++++++++-----------------
>>   src/wok/i18n.py        |  2 --
>>   src/wok/objectstore.py |  6 ------
>>   src/wok/utils.py       | 19 -----------------
>>   4 files changed, 39 insertions(+), 46 deletions(-)
>>
>> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
>> index fb614a2..3298787 100644
>> --- a/src/wok/asynctask.py
>> +++ b/src/wok/asynctask.py
>> @@ -21,24 +21,55 @@
>>
>>   import cherrypy
>>   import threading
>> +import time
>>   import traceback
>>
>>
>> -from wok.exception import OperationFailed
>> +task_id = 0
>> +tasks_queue = {}
>>
>>
>> -class AsyncTask(object):
>> -    def __init__(self, id, target_uri, fn, objstore, opaque=None):
>> -        if objstore is None:
>> -            raise OperationFailed("WOKASYNC0001E")
>> +def get_next_task_id():
>> +    global task_id
>> +    task_id += 1
>> +    return task_id
>> +
>> +
>
>
>> +def add_task(target_uri, fn, opaque=None):
>> +    # let's prevent memory leak
>> +    clean_old_tasks()
>> +    id = str(get_next_task_id())
>> +    tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
>> +    return id
>> +
>> +
>> +def remove_task(id):
>> +    try:
>> +        del tasks_queue[id]
>> +    except KeyError:
>> +        cherrypy.log.error_log.error("There's no task_id %s in 
>> tasks_queue."
>> +                                     " Nothing changed." % id)
>> +
>
> Why do include that to the AsyncTask class as create() and destroy() ?
> It will be more OOO than having separated functions to do that.
>

Nether a create() function is needed. The instance create is done on 
__init__()
So there the uuid is generated in the instance is returned.

It will require more changes in the plugins.

return AsynTask(...).id

instead of

return add_task(...)

>> +def clean_old_tasks():
>> +    """
>> +    Check for all tasks in tasks_queue and remove those if timestamp 
>> < than
>> +    12 hours ago and status equal do finished or failed.
>> +    """
>> +    for id, task in tasks_queue.items():
>> +        if (task.timestamp < (time.time()-43200)):
>> +            if (task.status is 'finished') or (task.status is 
>> 'failed'):
>> +                remove_task(id)
>> +
>> +
>> +class AsyncTask(object):
>> +    def __init__(self, id, target_uri, fn, opaque=None):
>>           self.id = str(id)
>>           self.target_uri = target_uri
>>           self.fn = fn
>> -        self.objstore = objstore
>>           self.status = 'running'
>> -        self.message = 'OK'
>> -        self._save_helper()
>> +        self.message = 'The request is being processing.'
>> +        self.timestamp = time.time()
>>           self._cp_request = cherrypy.serving.request
>>           self.thread = threading.Thread(target=self._run_helper,
>>                                          args=(opaque, self._status_cb))
>> @@ -51,17 +82,6 @@ class AsyncTask(object):
>>
>>           if message.strip():
>>               self.message = message
>> -            self._save_helper()
>> -
>> -    def _save_helper(self):
>> -        obj = {}
>> -        for attr in ('id', 'target_uri', 'message', 'status'):
>> -            obj[attr] = getattr(self, attr)
>> -        try:
>> -            with self.objstore as session:
>> -                session.store('task', self.id, obj)
>> -        except Exception as e:
>> -            raise OperationFailed('WOKASYNC0002E', {'err': e.message})
>>
>>       def _run_helper(self, opaque, cb):
>>           cherrypy.serving.request = self._cp_request
>> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
>> index 33107ee..1a0446b 100644
>> --- a/src/wok/i18n.py
>> +++ b/src/wok/i18n.py
>> @@ -33,8 +33,6 @@ messages = {
>>       "WOKAPI0008E": _("Parameters does not match requirement in 
>> schema: %(err)s"),
>>       "WOKAPI0009E": _("You don't have permission to perform this 
>> operation."),
>>
>> -    "WOKASYNC0001E": _("Datastore is not initiated in the model 
>> object."),
>> -    "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
>>       "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired 
>> while running task '%(task)s."),
>>
>>       "WOKAUTH0001E": _("Authentication failed for user 
>> '%(username)s'. [Error code: %(code)s]"),
>> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
>> index 59354f3..817f60c 100644
>> --- a/src/wok/objectstore.py
>> +++ b/src/wok/objectstore.py
>> @@ -107,8 +107,6 @@ class ObjectStore(object):
>>           c.execute('''SELECT * FROM sqlite_master WHERE type='table' 
>> AND
>>                        tbl_name='objects'; ''')
>>           res = c.fetchall()
>> -        # Because the tasks are regarded as temporary resource, the 
>> task states
>> -        # are purged every time the daemon startup
>>           if len(res) == 0:
>>               c.execute('''CREATE TABLE objects
>>                         (id TEXT, type TEXT, json TEXT, version TEXT,
>> @@ -116,10 +114,6 @@ class ObjectStore(object):
>>               conn.commit()
>>               return
>>
>> -        # Clear out expired objects from a previous session
>> -        c.execute('''DELETE FROM objects WHERE type = 'task'; ''')
>> -        conn.commit()
>> -
>>       def _get_conn(self):
>>           ident = threading.currentThread().ident
>>           try:
>> diff --git a/src/wok/utils.py b/src/wok/utils.py
>> index 4c132a1..7599e85 100644
>> --- a/src/wok/utils.py
>> +++ b/src/wok/utils.py
>> @@ -39,31 +39,12 @@ from datetime import datetime, timedelta
>>   from multiprocessing import Process, Queue
>>   from threading import Timer
>>
>> -from wok.asynctask import AsyncTask
>>   from wok.config import paths, PluginPaths
>>   from wok.exception import InvalidParameter, TimeoutExpired
>>   from wok.stringutils import decode_value
>>
>>
>>   wok_log = cherrypy.log.error_log
>> -task_id = 0
>> -
>> -
>> -def get_next_task_id():
>> -    global task_id
>> -    task_id += 1
>> -    return task_id
>> -
>> -
>> -def get_task_id():
>> -    global task_id
>> -    return task_id
>> -
>> -
>> -def add_task(target_uri, fn, objstore, opaque=None):
>> -    id = get_next_task_id()
>> -    AsyncTask(id, target_uri, fn, objstore, opaque)
>> -    return id
>>
>>
>>   def is_digit(value):
>
> _______________________________________________
> Kimchi-devel mailing list
> Kimchi-devel at ovirt.org
> http://lists.ovirt.org/mailman/listinfo/kimchi-devel
>




More information about the Kimchi-devel mailing list