On 08/25/2016 11:23 AM, Lucio Correia wrote:
One suggestion is to move all task stuff to asynctask.py
+1
One more comment below.
On 24-08-2016 16:36, pvital(a)linux.vnet.ibm.com wrote:
> From: Paulo Vital <pvital(a)linux.vnet.ibm.com>
>
> This patch moves all infrastructure of AsyncTask to store and read
> the tasks' information (id, status, messages and target_uri) from
> objectstore to memory, by the implementation of a dictionary of
> AsyncTasks objects.
>
> It also removes the tasks cleanup process from objectstore.
>
> Signed-off-by: Paulo Vital <pvital(a)linux.vnet.ibm.com>
> ---
> src/wok/asynctask.py | 25 ++++---------------------
> src/wok/i18n.py | 2 --
> src/wok/objectstore.py | 6 ------
> src/wok/utils.py | 31 ++++++++++++++++++++++++-------
> 4 files changed, 28 insertions(+), 36 deletions(-)
>
> diff --git a/src/wok/asynctask.py b/src/wok/asynctask.py
> index fb614a2..2008c18 100644
> --- a/src/wok/asynctask.py
> +++ b/src/wok/asynctask.py
> @@ -21,24 +21,18 @@
>
> import cherrypy
> import threading
> +import time
> import traceback
>
>
> -from wok.exception import OperationFailed
> -
> -
> class AsyncTask(object):
> - def __init__(self, id, target_uri, fn, objstore, opaque=None):
> - if objstore is None:
> - raise OperationFailed("WOKASYNC0001E")
> -
> + def __init__(self, id, target_uri, fn, opaque=None):
> self.id = str(id)
> self.target_uri = target_uri
> self.fn = fn
> - self.objstore = objstore
> self.status = 'running'
> - self.message = 'OK'
> - self._save_helper()
> + self.message = 'Starting AsyncTask'
> + self.timestamp = time.time()
> self._cp_request = cherrypy.serving.request
> self.thread = threading.Thread(target=self._run_helper,
> args=(opaque, self._status_cb))
> @@ -51,17 +45,6 @@ class AsyncTask(object):
>
> if message.strip():
> self.message = message
> - self._save_helper()
> -
> - def _save_helper(self):
> - obj = {}
> - for attr in ('id', 'target_uri', 'message',
'status'):
> - obj[attr] = getattr(self, attr)
> - try:
> - with self.objstore as session:
> - session.store('task', self.id, obj)
> - except Exception as e:
> - raise OperationFailed('WOKASYNC0002E', {'err':
e.message})
>
> def _run_helper(self, opaque, cb):
> cherrypy.serving.request = self._cp_request
> diff --git a/src/wok/i18n.py b/src/wok/i18n.py
> index 33107ee..1a0446b 100644
> --- a/src/wok/i18n.py
> +++ b/src/wok/i18n.py
> @@ -33,8 +33,6 @@ messages = {
> "WOKAPI0008E": _("Parameters does not match requirement in
> schema: %(err)s"),
> "WOKAPI0009E": _("You don't have permission to perform this
> operation."),
>
> - "WOKASYNC0001E": _("Datastore is not initiated in the model
> object."),
> - "WOKASYNC0002E": _("Unable to start task due error:
%(err)s"),
> "WOKASYNC0003E": _("Timeout of %(seconds)s seconds expired while
> running task '%(task)s."),
>
> "WOKAUTH0001E": _("Authentication failed for user
> '%(username)s'. [Error code: %(code)s]"),
> diff --git a/src/wok/objectstore.py b/src/wok/objectstore.py
> index 59354f3..817f60c 100644
> --- a/src/wok/objectstore.py
> +++ b/src/wok/objectstore.py
> @@ -107,8 +107,6 @@ class ObjectStore(object):
> c.execute('''SELECT * FROM sqlite_master WHERE
type='table' AND
> tbl_name='objects'; ''')
> res = c.fetchall()
> - # Because the tasks are regarded as temporary resource, the
> task states
> - # are purged every time the daemon startup
> if len(res) == 0:
> c.execute('''CREATE TABLE objects
> (id TEXT, type TEXT, json TEXT, version TEXT,
> @@ -116,10 +114,6 @@ class ObjectStore(object):
> conn.commit()
> return
>
> - # Clear out expired objects from a previous session
> - c.execute('''DELETE FROM objects WHERE type = 'task';
''')
> - conn.commit()
> -
> def _get_conn(self):
> ident = threading.currentThread().ident
> try:
> diff --git a/src/wok/utils.py b/src/wok/utils.py
> index 4c132a1..8c13c70 100644
> --- a/src/wok/utils.py
> +++ b/src/wok/utils.py
> @@ -31,6 +31,7 @@ import re
> import sqlite3
> import subprocess
> import sys
> +import time
> import traceback
> import xml.etree.ElementTree as ET
>
> @@ -47,6 +48,7 @@ from wok.stringutils import decode_value
>
> wok_log = cherrypy.log.error_log
> task_id = 0
> +tasks_queue = {}
>
>
> def get_next_task_id():
> @@ -55,17 +57,32 @@ def get_next_task_id():
> return task_id
>
>
> -def get_task_id():
> - global task_id
> - return task_id
> -
> -
> def add_task(target_uri, fn, objstore, opaque=None):
objstore is not in use here
> - id = get_next_task_id()
> - AsyncTask(id, target_uri, fn, objstore, opaque)
> + # let's prevent memory leak
> + clean_old_tasks()
> + id = str(get_next_task_id())
> + tasks_queue[id] = AsyncTask(id, target_uri, fn, opaque)
> return id
>
>
> +def remove_task(id):
> + try:
> + del tasks_queue[id]
> + except KeyError:
> + cherrypy.log.error_log.error("There's no task_id %s in
> tasks_queue."
> + " Nothing changed." % id)
> +
> +
> +def clean_old_tasks():
> + """
> + Check for all tasks in tasks_queue and remove those if timestamp
> < than
> + 10 minutes ago.
> + """
> + for id, task in tasks_queue.items():
> + if task.timestamp < (time.time()-600):
> + remove_task(id)
> +
> +
> def is_digit(value):
> if isinstance(value, int):
> return True
>