[Kimchi-devel] [PATCH] [Wok 1/3] Issue #158: Move AsyncTask information to memory
Aline Manera
alinefm at linux.vnet.ibm.com
Fri Aug 26 11:17:05 UTC 2016
On 08/25/2016 11:23 AM, Lucio Correia wrote:
> One suggestion is to move all task stuff to asynctask.py
>
+1
> One more comment below.
>
> On 24-08-2016 16:36, pvital at linux.vnet.ibm.com wrote:
>> From: Paulo Vital <pvital at linux.vnet.ibm.com>
>>
>> This patch moves all infrastructure of AsyncTask to store and read
>> the tasks' information (id, status, messages and target_uri) from
>> objectstore to memory, by the implementation of a dictionary of
>> AsyncTasks objects.
>>
>> It also removes the tasks cleanup process from objectstore.
>>
>> Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
>> ---
>> src/wok/asynctask.py | 25 ++++---------------------
>> src/wok/i18n.py | 2 --
>> src/wok/objectstore.py | 6 ------
>> src/wok/utils.py | 31 ++++++++++++++++++++++++-------
>> 4 files changed, 28 insertions(+), 36 deletions(-)
>>
>> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
>> index fb614a2..2008c18 100644
>> --- a/src/wok/asynctask.py
>> +++ b/src/wok/asynctask.py
>> @@ -21,24 +21,18 @@
>>
>> import cherrypy
>> import threading
>> +import time
>> import traceback
>>
>>
>> -from wok.exception import OperationFailed
>> -
>> -
>> class AsyncTask(object):
>> - def __init__(self, id, target_uri, fn, objstore, opaque=None):
>> - if objstore is None:
>> - raise OperationFailed("WOKASYNC0001E")
>> -
>> + def __init__(self, id, target_uri, fn, opaque=None):
>> self.id = str(id)
>> self.target_uri = target_uri
>> self.fn = fn
>> - self.objstore = objstore
>> self.status = 'running'
>> - self.message = 'OK'
>> - self._save_helper()
>> + self.message = 'Starting AsyncTask'
>> + self.timestamp = time.time()
>> self._cp_request = cherrypy.serving.request
>> self.thread = threading.Thread(target=self._run_helper,
>> args=(opaque, self._status_cb))
>> @@ -51,17 +45,6 @@ class AsyncTask(object):
>>
>> if message.strip():
>> self.message = message
>> - self._save_helper()
>> -
>> - def _save_helper(self):
>> - obj = {}
>> - for attr in ('id', 'target_uri', 'message', 'status'):
>> - obj[attr] = getattr(self, attr)
>> - try:
>> - with self.objstore as session:
>> - session.store('task', self.id, obj)
>> - except Exception as e:
>> - raise OperationFailed('WOKASYNC0002E', {'err': e.message})
>>
>> def _run_helper(self, opaque, cb):
>> cherrypy.serving.request = self._cp_request
>> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
>> index 33107ee..1a0446b 100644
>> --- a/src/wok/i18n.py
>> +++ b/src/wok/i18n.py
>> @@ -33,8 +33,6 @@ messages = {
>> "WOKAPI0008E": _("Parameters does not match requirement in
>> schema: %(err)s"),
>> "WOKAPI0009E": _("You don't have permission to perform this
>> operation."),
>>
>> - "WOKASYNC0001E": _("Datastore is not initiated in the model
>> object."),
>> - "WOKASYNC0002E": _("Unable to start task due error: %(err)s"),
>> "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while
>> running task '%(task)s."),
>>
>> "WOKAUTH0001E": _("Authentication failed for user
>> '%(username)s'. [Error code: %(code)s]"),
>> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
>> index 59354f3..817f60c 100644
>> --- a/src/wok/objectstore.py
>> +++ b/src/wok/objectstore.py
>> @@ -107,8 +107,6 @@ class ObjectStore(object):
>> c.execute('''SELECT * FROM sqlite_master WHERE type='table' AND
>> tbl_name='objects'; ''')
>> res = c.fetchall()
>> - # Because the tasks are regarded as temporary resource, the
>> task states
>> - # are purged every time the daemon startup
>> if len(res) == 0:
>> c.execute('''CREATE TABLE objects
>> (id TEXT, type TEXT, json TEXT, version TEXT,
>> @@ -116,10 +114,6 @@ class ObjectStore(object):
>> conn.commit()
>> return
>>
>> - # Clear out expired objects from a previous session
>> - c.execute('''DELETE FROM objects WHERE type = 'task'; ''')
>> - conn.commit()
>> -
>> def _get_conn(self):
>> ident = threading.currentThread().ident
>> try:
>> diff --git a/src/wok/utils.py b/src/wok/utils.py
>> index 4c132a1..8c13c70 100644
>> --- a/src/wok/utils.py
>> +++ b/src/wok/utils.py
>> @@ -31,6 +31,7 @@ import re
>> import sqlite3
>> import subprocess
>> import sys
>> +import time
>> import traceback
>> import xml.etree.ElementTree as ET
>>
>> @@ -47,6 +48,7 @@ from wok.stringutils import decode_value
>>
>> wok_log = cherrypy.log.error_log
>> task_id = 0
>> +tasks_queue = {}
>>
>>
>> def get_next_task_id():
>> @@ -55,17 +57,32 @@ def get_next_task_id():
>> return task_id
>>
>>
>> -def get_task_id():
>> - global task_id
>> - return task_id
>> -
>> -
>> def add_task(target_uri, fn, objstore, opaque=None):
> objstore is not in use here
>
>> - id = get_next_task_id()
>> - AsyncTask(id, target_uri, fn, objstore, opaque)
>> + # let's prevent memory leak
>> + clean_old_tasks()
>> + id = str(get_next_task_id())
>> + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
>> return id
>>
>>
>> +def remove_task(id):
>> + try:
>> + del tasks_queue[id]
>> + except KeyError:
>> + cherrypy.log.error_log.error("There's no task_id %s in
>> tasks_queue."
>> + " Nothing changed." % id)
>> +
>> +
>> +def clean_old_tasks():
>> + """
>> + Check for all tasks in tasks_queue and remove those if timestamp
>> < than
>> + 10 minutes ago.
>> + """
>> + for id, task in tasks_queue.items():
>> + if task.timestamp < (time.time()-600):
>> + remove_task(id)
>> +
>> +
>> def is_digit(value):
>> if isinstance(value, int):
>> return True
>>
>
>
More information about the Kimchi-devel
mailing list