[Kimchi-devel] [PATCH] [WoK] Asynchronous UI notification implementation

Aline Manera alinefm at linux.vnet.ibm.com
Mon Feb 27 14:23:59 UTC 2017



On 02/24/2017 10:22 AM, dhbarboza82 at gmail.com wrote:
> From: Daniel Henrique Barboza <danielhb at linux.vnet.ibm.com>
>
> This patch makes backend and UI changes to implement the asynchronous
> UI notification in WoK.
>
> - Backend:
>
> A push server was implemented from scratch to manage the opened websocket
> connections. The push server connects to the /run/woknotifications
> UNIX socket and broadcasts all messages to all connections.
>
> The websocket module is the same module that exists in the Kimchi
> plug-in. The idea is to remove the module from Kimchi and make it
> use the module from WoK. ws_proxy initialization was also added
> in src/wok/server.py.
>
> - Frontend:
>
> In ui/js/wok.main.js two new functions were added to help the
> usage of asynchronous notifications in the frontend. The idea:
> a single websocket is opened per session. This opened websocket
> will broadcast all incoming messages to all listeners registered.
> Listeners can be added by the new wok.addNotificationListener()
> method. This method will clean up any registered listener by
> itself when the user changes tabs/URL.
>
> The single websocket sends heartbeats to the backend side each
> 30 seconds. No reply from the backend is issued or expected. This
> heartbeat is just a way to ensure that the browser does not
> close the connection due to inactivity. This behavior varies from
> browser to browser but this 30 second heartbeat is more than enough
> to ensure that the websocket is kept alive.
>
> - Working example in User Log:
>
> A simple usage is provided in this patch. A change was made in
> src/wok/reqlogger.py to send an asynchronous notification each
> time a new log entry is created. In ui/js/wok.user-log.js a
> websocket listener is added using wok.addNotificationListener()
> and, for each message that indicates a new user log entry, a
> refresh in the listing is issued.
>
> Signed-off-by: Daniel Henrique Barboza <danielhb at linux.vnet.ibm.com>
> ---
>   src/wok/model/notifications.py |  18 +++++-
>   src/wok/pushserver.py          | 132 +++++++++++++++++++++++++++++++++++++++++
>   src/wok/reqlogger.py           |   7 ++-
>   src/wok/server.py              |   8 ++-
>   src/wok/websocket.py           | 123 ++++++++++++++++++++++++++++++++++++++
>   ui/js/src/wok.main.js          |  38 ++++++++++++
>   ui/js/wok.user-log.js          |   6 ++
>   7 files changed, 327 insertions(+), 5 deletions(-)
>   create mode 100644 src/wok/pushserver.py
>   create mode 100644 src/wok/websocket.py
>
> diff --git a/src/wok/model/notifications.py b/src/wok/model/notifications.py
> index bdb7c78..597eac5 100644
> --- a/src/wok/model/notifications.py
> +++ b/src/wok/model/notifications.py
> @@ -1,7 +1,7 @@
>   #
>   # Project Wok
>   #
> -# Copyright IBM Corp, 2016
> +# Copyright IBM Corp, 2016-2017
>   #
>   # This library is free software; you can redistribute it and/or
>   # modify it under the terms of the GNU Lesser General Public
> @@ -19,12 +19,22 @@
>
>   from datetime import datetime
>
> +from wok import config
>   from wok.exception import NotFoundError, OperationFailed
>   from wok.message import WokMessage
> +from wok.pushserver import PushServer
>   from wok.utils import wok_log
>
>
>   notificationsStore = {}
> +push_server = None
> +
> +
> +def send_websocket_notification(message):
> +    global push_server
> +
> +    if push_server:
> +        push_server.send_notification(message)
>
>
>   def add_notification(code, args=None, plugin_name=None):
> @@ -57,7 +67,11 @@ def del_notification(code):
>
>   class NotificationsModel(object):
>       def __init__(self, **kargs):
> -        pass
> +        global push_server
> +

> +        test_mode = config.config.get('server', 'test').lower() == 'true'
> +        if not test_mode:
> +            push_server = PushServer()

Why distinguish test mode here? While running on test mode, aka, using 
MockModel, the server should behave the same as on real model.

>
>       def get_list(self):
>           global notificationsStore
> diff --git a/src/wok/pushserver.py b/src/wok/pushserver.py
> new file mode 100644
> index 0000000..8993f00
> --- /dev/null
> +++ b/src/wok/pushserver.py
> @@ -0,0 +1,132 @@
> +#
> +# Project Wok
> +#
> +# Copyright IBM Corp, 2017
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
> +#
> +
> +import cherrypy
> +import os
> +import select
> +import socket
> +import threading
> +

> +import websocket
> +from utils import wok_log
> +

Use full import path.

import wok.websocket
from wok.utils import wok_log

> +
> +BASE_DIRECTORY = '/run'

I think this directory depends on distro settings. Some ones uses /var/run
You will need to get it on build settings. Check config.py.in for details.

> +TOKEN_NAME = 'woknotifications'
> +

Maybe only 'notifications' under a wok directory is better.
Just to place all the wok files related to the same directory. I am not 
sure if others files is places on /run but i think there are.

> +
> +class PushServer(object):
> +
> +    def set_socket_file(self):
> +        if not os.path.isdir(BASE_DIRECTORY):
> +            try:
> +                os.mkdir(BASE_DIRECTORY)
> +            except OSError:
> +                raise RuntimeError('PushServer base UNIX socket dir %s '
> +                                   'not found.' % BASE_DIRECTORY)
> +
> +        self.server_addr = os.path.join(BASE_DIRECTORY, TOKEN_NAME)
> +
> +        if os.path.exists(self.server_addr):
> +            try:
> +                os.remove(self.server_addr)
> +            except:
> +                raise RuntimeError('There is an existing connection in %s' %
> +                                   self.server_addr)
> +
> +    def __init__(self):
> +        self.set_socket_file()
> +
> +        websocket.add_proxy_token(TOKEN_NAME, self.server_addr, True)
> +
> +        self.connections = []
> +
> +        self.server_running = True
> +        self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> +        self.server_socket.setsockopt(socket.SOL_SOCKET,
> +                                      socket.SO_REUSEADDR, 1)
> +        self.server_socket.bind(self.server_addr)
> +        self.server_socket.listen(10)
> +        wok_log.info('Push server created on address %s' % self.server_addr)
> +
> +        self.connections.append(self.server_socket)
> +        cherrypy.engine.subscribe('stop', self.close_server, 1)
> +
> +        server_loop = threading.Thread(target=self.listen)
> +        server_loop.start()
> +
> +    def listen(self):
> +        try:
> +            while self.server_running:
> +                read_ready, _, _ = select.select(self.connections,
> +                                                 [], [], 1)
> +                for sock in read_ready:
> +                    if not self.server_running:
> +                        break
> +
> +                    if sock == self.server_socket:
> +
> +                        new_socket, addr = self.server_socket.accept()
> +                        self.connections.append(new_socket)
> +                    else:
> +                        try:
> +                            data = sock.recv(4096)
> +                        except:
> +                            try:
> +                                self.connections.remove(sock)
> +                            except ValueError:
> +                                pass
> +
> +                            continue
> +                        if data and data == 'CLOSE':
> +                            sock.send('ACK')
> +                            try:
> +                                self.connections.remove(sock)
> +                            except ValueError:
> +                                pass
> +                            sock.close()
> +
> +        except Exception as e:
> +            raise RuntimeError('Exception ocurred in listen() of pushserver '
> +                               'module: %s' % e.message)
> +
> +    def send_notification(self, message):
> +        for sock in self.connections:
> +            if sock != self.server_socket:
> +                try:
> +                    sock.send(message)
> +                except IOError as e:
> +                    if 'Broken pipe' in str(e):
> +                        sock.close()
> +                        try:
> +                            self.connections.remove(sock)
> +                        except ValueError:
> +                            pass
> +
> +    def close_server(self):
> +        try:
> +            self.server_running = False
> +            self.server_socket.shutdown(socket.SHUT_RDWR)
> +            self.server_socket.close()
> +            os.remove(self.server_addr)
> +        except:
> +            pass
> +        finally:
> +            cherrypy.engine.unsubscribe('stop', self.close_server)
> diff --git a/src/wok/reqlogger.py b/src/wok/reqlogger.py
> index 92e155d..1b774e2 100644
> --- a/src/wok/reqlogger.py
> +++ b/src/wok/reqlogger.py
> @@ -1,7 +1,7 @@
>   #
>   # Project Wok
>   #
> -# Copyright IBM Corp, 2016
> +# Copyright IBM Corp, 2016-2017
>   #
>   # This library is free software; you can redistribute it and/or
>   # modify it under the terms of the GNU Lesser General Public
> @@ -34,6 +34,7 @@ from wok.auth import USER_NAME
>   from wok.config import get_log_download_path, paths
>   from wok.exception import InvalidParameter, OperationFailed
>   from wok.message import WokMessage
> +from wok.model.notifications import send_websocket_notification
>   from wok.stringutils import ascii_dict
>   from wok.utils import remove_old_files
>
> @@ -68,6 +69,8 @@ WOK_REQUEST_LOGGER = "wok_request_logger"
>   # AsyncTask handling
>   ASYNCTASK_REQUEST_METHOD = 'TASK'
>
> +NEW_LOG_ENTRY_MESSAGE = 'new_log_entry'
> +
>
>   def log_request(code, params, exception, method, status, app=None, user=None,
>                   ip=None):
> @@ -114,6 +117,8 @@ def log_request(code, params, exception, method, status, app=None, user=None,
>           ip=ip
>       ).log()
>
> +    send_websocket_notification(NEW_LOG_ENTRY_MESSAGE)
> +
>       return log_id
>
>
> diff --git a/src/wok/server.py b/src/wok/server.py
> index fc2e167..2d823c9 100644
> --- a/src/wok/server.py
> +++ b/src/wok/server.py
> @@ -25,8 +25,7 @@ import logging
>   import logging.handlers
>   import os
>
> -from wok import auth
> -from wok import config
> +from wok import auth, config, websocket
>   from wok.config import config as configParser
>   from wok.config import WokConfig
>   from wok.control import sub_nodes
> @@ -159,6 +158,11 @@ class Server(object):
>           cherrypy.tree.mount(WokRoot(model.Model(), dev_env),
>                               options.server_root, self.configObj)

> +        test_mode = config.config.get('server', 'test').lower() == 'true'
> +        if not test_mode:
> +            ws_proxy = websocket.new_ws_proxy()
> +            cherrypy.engine.subscribe('exit', ws_proxy.terminate)
> +

The same question I did before.

>           self._load_plugins()
>           cherrypy.lib.sessions.init()
>
> diff --git a/src/wok/websocket.py b/src/wok/websocket.py
> new file mode 100644
> index 0000000..5d7fb91
> --- /dev/null
> +++ b/src/wok/websocket.py
> @@ -0,0 +1,123 @@
> +#!/usr/bin/env python2
> +#
> +# Project Wok
> +#
> +# Copyright IBM Corp, 2017
> +#
> +# Code derived from Project Kimchi
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
> +
> +import base64
> +import errno
> +import os
> +
> +from multiprocessing import Process
> +from websockify import WebSocketProxy
> +
> +from config import config, PluginPaths
> +
> +
> +try:
> +    from websockify.token_plugins import TokenFile
> +    tokenFile = True
> +except ImportError:
> +    tokenFile = False
> +
> +try:
> +    from websockify import ProxyRequestHandler as request_proxy
> +except:
> +    from websockify import WebSocketProxy as request_proxy
> +
> +
> +WS_TOKENS_DIR = os.path.join(PluginPaths('kimchi').state_dir, 'ws-tokens')
> +
> +
> +class CustomHandler(request_proxy):
> +
> +    def get_target(self, target_plugin, path):
> +        if issubclass(CustomHandler, object):
> +            target = super(CustomHandler, self).get_target(target_plugin,
> +                                                           path)
> +        else:
> +            target = request_proxy.get_target(self, target_plugin, path)
> +
> +        if target[0] == 'unix_socket':
> +            try:
> +                self.server.unix_target = target[1]
> +            except:
> +                self.unix_target = target[1]
> +        else:
> +            try:
> +                self.server.unix_target = None
> +            except:
> +                self.unix_target = None
> +        return target
> +
> +
> +def new_ws_proxy():
> +    try:
> +        os.makedirs(WS_TOKENS_DIR, mode=0755)
> +    except OSError as e:
> +        if e.errno == errno.EEXIST:
> +            pass
> +
> +    params = {'listen_host': '127.0.0.1',
> +              'listen_port': config.get('server', 'websockets_port'),
> +              'ssl_only': False}
> +
> +    # old websockify: do not use TokenFile
> +    if not tokenFile:
> +        params['target_cfg'] = WS_TOKENS_DIR
> +
> +    # websockify 0.7 and higher: use TokenFile
> +    else:
> +        params['token_plugin'] = TokenFile(src=WS_TOKENS_DIR)
> +
> +    def start_proxy():
> +        try:
> +            server = WebSocketProxy(RequestHandlerClass=CustomHandler,
> +                                    **params)
> +        except TypeError:
> +            server = CustomHandler(**params)
> +
> +        server.start_server()
> +
> +    proc = Process(target=start_proxy)
> +    proc.start()
> +    return proc
> +
> +
> +def add_proxy_token(name, port, is_unix_socket=False):
> +    with open(os.path.join(WS_TOKENS_DIR, name), 'w') as f:
> +        """
> +        From python documentation base64.urlsafe_b64encode(s)
> +        substitutes - instead of + and _ instead of / in the
> +        standard Base64 alphabet, BUT the result can still
> +        contain = which is not safe in a URL query component.
> +        So remove it when needed as base64 can work well without it.
> +        """
> +        name = base64.urlsafe_b64encode(name).rstrip('=')
> +        if is_unix_socket:
> +            f.write('%s: unix_socket:%s' % (name.encode('utf-8'), port))
> +        else:
> +            f.write('%s: localhost:%s' % (name.encode('utf-8'), port))
> +
> +
> +def remove_proxy_token(name):
> +    try:
> +        os.unlink(os.path.join(WS_TOKENS_DIR, name))
> +    except OSError:
> +        pass
> diff --git a/ui/js/src/wok.main.js b/ui/js/src/wok.main.js
> index 20c017e..f5031ce 100644
> --- a/ui/js/src/wok.main.js
> +++ b/ui/js/src/wok.main.js
> @@ -29,6 +29,41 @@ wok.getConfig(function(result) {
>       wok.config = {};
>   });
>
> +
> +wok.notificationListeners = {};
> +wok.addNotificationListener = function(name, func) {
> +    wok.notificationListeners[name] = func;
> +    $(window).one("hashchange", function() {
> +        delete wok.notificationListeners[name];
> +    });
> +};
> +
> +wok.notificationsWebSocket = undefined;
> +wok.startNotificationWebSocket = function () {
> +    var addr = window.location.hostname + ':' + window.location.port;
> +    var token = wok.urlSafeB64Encode('woknotifications').replace(/=*$/g, "");
> +    var url = 'wss://' + addr + '/websockify?token=' + token;
> +    wok.notificationsWebSocket = new WebSocket(url, ['base64']);
> +
> +    wok.notificationsWebSocket.onmessage = function(event) {
> +        var message = window.atob(event.data);
> +        for (name in wok.notificationListeners) {
> +            func = wok.notificationListeners[name];
> +            func(message);
> +        }
> +    };
> +
> +    sessionStorage.setItem('wokNotificationWebSocket', 'true');
> +    var heartbeat = setInterval(function() {
> +        wok.notificationsWebSocket.send(window.btoa('heartbeat'));
> +    }, 30000);
> +
> +    wok.notificationsWebSocket.onclose = function() {
> +        clearInterval(heartbeat);
> +    };
> +};
> +
> +
>   wok.main = function() {
>       wok.isLoggingOut = false;
>       wok.popable();
> @@ -395,6 +430,9 @@ wok.main = function() {
>
>           // Set handler for help button
>           $('#btn-help').on('click', wok.openHelp);
> +
> +        // start WebSocket
> +        wok.startNotificationWebSocket();
>       };
>
>       var initUI = function() {
> diff --git a/ui/js/wok.user-log.js b/ui/js/wok.user-log.js
> index 0e8fb09..083b6c3 100644
> --- a/ui/js/wok.user-log.js
> +++ b/ui/js/wok.user-log.js
> @@ -153,6 +153,12 @@ wok.initUserLogContent = function() {
>           $("#user-log-grid").bootgrid("search");
>           wok.initUserLogConfigGridData();
>       });
> +

> +    wok.addNotificationListener('userlog', function(message) {
> +        if (message === 'new_log_entry') {
> +            $("#refresh-button").click();
> +        }
> +    });

Backend is sending a notification with 'new_log_entry' and you are 
registering a notification for 'userlog'.
How is it working?

Shouldn't the 'name' required by addNotificationListener be 
'new_log_entry'? So wok would be redirect to the function automatically?
Why do you need to distinguish the 'new_log_entry' later?

Also, we will need to create a pattern message for notification to avoid 
having a plugin overwriting a notification from another plugin.

That said, my suggestion is: <plugin>_<resource>_<action> which will 
result in something like:

wok_log_create
kimchi_vms_create
kimchi_vm_update
kimchi_vm_start
ginger_backups_create
ginger_backup_restore

and so on.

That way, we can automatically add the notification on control/base.py 
which will work for everyone. And then, the plugin just need to register 
the listener on UI.
Something like:

@model/base.py
#Collection.index and Resource.index (need to cover both)
def index(...):
(...)
     # on sucsess
     notification = "%s_%s_%s" % (plugin_name, resource/collection, action)
     send_websocket_notification(notification)

def _generate_action_handler_base(...):
     (...)
     # on success
     notification = "%s_%s_%s" % (plugin_name, resource/collection, action)
     send_websocket_notification(notification)

What do you think?

>   };
>
>   wok.initUserLogWindow = function() {



More information about the Kimchi-devel mailing list