On Aug 29 04:55PM, Lucio Correia wrote:
On 29-08-2016 16:18, pvital(a)linux.vnet.ibm.com wrote:
> From: Paulo Vital <pvital(a)linux.vnet.ibm.com>
>
> This patch moves all infrastructure of AsyncTask to store and read
> the tasks' information (id, status, messages and target_uri) from
> objectstore to memory, by the implementation of a dictionary of
> AsyncTasks objects.
>
> It also removes the tasks cleanup process from objectstore.
>
> Signed-off-by: Paulo Vital <pvital(a)linux.vnet.ibm.com>
> ---
> src/wok/asynctask.py | 48 ++++++++++++++++++++++++++++--------------------
> src/wok/i18n.py | 2 --
> src/wok/objectstore.py | 6 ------
> src/wok/utils.py | 19 -------------------
> 4 files changed, 28 insertions(+), 47 deletions(-)
>
> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
> index fb614a2..1a114a8 100644
> --- a/src/wok/asynctask.py
> +++ b/src/wok/asynctask.py
> @@ -21,29 +21,41 @@
>
> import cherrypy
> import threading
> +import time
> import traceback
> +import uuid
>
>
> -from wok.exception import OperationFailed
> +tasks_queue = {}
>
>
> -class AsyncTask(object):
> - def __init__(self, id, target_uri, fn, objstore, opaque=None):
> - if objstore is None:
> - raise OperationFailed("WOKASYNC0001E")
> +def clean_old_tasks():
> + """
> + Remove from tasks_queue any task that started before 12 hours ago and has
> + current status equal do finished or failed.
> + """
> + for id, task in tasks_queue.items():
> + if (task.timestamp < (time.time()-43200)):
> + if (task.status is 'finished') or (task.status is
'failed'):
> + task.remove(id)
> +
>
> - self.id = str(id)
> +class AsyncTask(object):
> + def __init__(self, target_uri, fn, opaque=None):
> + self.id = str(uuid.uuid1())
> self.target_uri = target_uri
> self.fn = fn
> - self.objstore = objstore
> self.status = 'running'
> - self.message = 'OK'
> - self._save_helper()
> + self.message = 'The request is being processing.'
> + self.timestamp = time.time()
> self._cp_request = cherrypy.serving.request
> self.thread = threading.Thread(target=self._run_helper,
> args=(opaque, self._status_cb))
> self.thread.setDaemon(True)
> self.thread.start()
> + # let's prevent memory leak in tasks_queue
> + clean_old_tasks()
> + tasks_queue[self.id] = self
>
> def _status_cb(self, message, success=None):
> if success is not None:
> @@ -51,17 +63,6 @@ class AsyncTask(object):
>
> if message.strip():
> self.message = message
> - self._save_helper()
> -
> - def _save_helper(self):
> - obj = {}
> - for attr in ('id', 'target_uri', 'message',
'status'):
> - obj[attr] = getattr(self, attr)
> - try:
> - with self.objstore as session:
> - session.store('task', self.id, obj)
> - except Exception as e:
> - raise OperationFailed('WOKASYNC0002E', {'err':
e.message})
>
> def _run_helper(self, opaque, cb):
> cherrypy.serving.request = self._cp_request
> @@ -71,3 +72,10 @@ class AsyncTask(object):
> cherrypy.log.error_log.error("Error in async_task %s " %
self.id)
> cherrypy.log.error_log.error(traceback.format_exc())
> cb(e.message, False)
> +
> + def remove(self, id):
A task should not kill another one. I believe we don't need a parameter id
here, just use self.id.
I'm not killing any task here, only removing the requested task from the
dictionary that controls the AysncTasks objects and only if the task has
status equal to 'finished' or 'failed'.
But I got your point and it will be addressed in the V6
> + try:
> + del tasks_queue[id]
> + except KeyError:
> + msg = "There's no task_id %s in tasks_queue. Nothing
changed."
> + cherrypy.log.error_log.error(msg % id)
> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
> index 33107ee..1a0446b 100644
> --- a/src/wok/i18n.py
> +++ b/src/wok/i18n.py
> @@ -33,8 +33,6 @@ messages = {
> "WOKAPI0008E": _("Parameters does not match requirement in
schema: %(err)s"),
> "WOKAPI0009E": _("You don't have permission to perform this
operation."),
>
> - "WOKASYNC0001E": _("Datastore is not initiated in the model
object."),
> - "WOKASYNC0002E": _("Unable to start task due error:
%(err)s"),
> "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while
running task '%(task)s."),
>
> "WOKAUTH0001E": _("Authentication failed for user
'%(username)s'. [Error code: %(code)s]"),
> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
> index 59354f3..817f60c 100644
> --- a/src/wok/objectstore.py
> +++ b/src/wok/objectstore.py
> @@ -107,8 +107,6 @@ class ObjectStore(object):
> c.execute('''SELECT * FROM sqlite_master WHERE
type='table' AND
> tbl_name='objects'; ''')
> res = c.fetchall()
> - # Because the tasks are regarded as temporary resource, the task states
> - # are purged every time the daemon startup
> if len(res) == 0:
> c.execute('''CREATE TABLE objects
> (id TEXT, type TEXT, json TEXT, version TEXT,
> @@ -116,10 +114,6 @@ class ObjectStore(object):
> conn.commit()
> return
>
> - # Clear out expired objects from a previous session
> - c.execute('''DELETE FROM objects WHERE type = 'task';
''')
> - conn.commit()
> -
> def _get_conn(self):
> ident = threading.currentThread().ident
> try:
> diff --git a/src/wok/utils.py b/src/wok/utils.py
> index 4c132a1..7599e85 100644
> --- a/src/wok/utils.py
> +++ b/src/wok/utils.py
> @@ -39,31 +39,12 @@ from datetime import datetime, timedelta
> from multiprocessing import Process, Queue
> from threading import Timer
>
> -from wok.asynctask import AsyncTask
> from wok.config import paths, PluginPaths
> from wok.exception import InvalidParameter, TimeoutExpired
> from wok.stringutils import decode_value
>
>
> wok_log = cherrypy.log.error_log
> -task_id = 0
> -
> -
> -def get_next_task_id():
> - global task_id
> - task_id += 1
> - return task_id
> -
> -
> -def get_task_id():
> - global task_id
> - return task_id
> -
> -
> -def add_task(target_uri, fn, objstore, opaque=None):
> - id = get_next_task_id()
> - AsyncTask(id, target_uri, fn, objstore, opaque)
> - return id
>
>
> def is_digit(value):
>
--
Lucio Correia
Software Engineer
IBM LTC Brazil
_______________________________________________
Kimchi-devel mailing list
Kimchi-devel(a)ovirt.org
http://lists.ovirt.org/mailman/listinfo/kimchi-devel
--
Paulo Ricardo Paz Vital
Linux Technology Center, IBM Systems
http://www.ibm.com/linux/ltc/