[Kimchi-devel] [PATCH] [WoK] Asynchronous UI notification implementation

Daniel Henrique Barboza dhbarboza82 at gmail.com
Mon Feb 27 21:12:14 UTC 2017



On 02/27/2017 11:41 AM, Aline Manera wrote:
>
>
> On 02/27/2017 11:35 AM, Lucio Correia wrote:
>> Hi Daniel, that is great feature, see my comments below.
>>
>> On 24/02/2017 10:22, dhbarboza82 at gmail.com wrote:
>>> From: Daniel Henrique Barboza <danielhb at linux.vnet.ibm.com>
>>>
>>> This patch makes backend and UI changes to implement the asynchronous
>>> UI notification in WoK.
>>>
>>> - Backend:
>>>
>>> A push server was implemented from scratch to manage the opened 
>>> websocket
>>> connections. The push server connects to the /run/woknotifications
>>> UNIX socket and broadcasts all messages to all connections.
>>>
>>> The websocket module is the same module that exists in the Kimchi
>>> plug-in. The idea is to remove the module from Kimchi and make it
>>> use the module from WoK. ws_proxy initialization was also added
>>> in src/wok/server.py.
>>>
>>> - Frontend:
>>>
>>> In ui/js/wok.main.js two new functions were added to help the
>>> usage of asynchronous notifications in the frontend. The idea:
>>> a single websocket is opened per session. This opened websocket
>>> will broadcast all incoming messages to all listeners registered.
>>> Listeners can be added by the new wok.addNotificationListener()
>>> method. This method will clean up any registered listener by
>>> itself when the user changes tabs/URL.
>>>
>>> The single websocket sends heartbeats to the backend side each
>>> 30 seconds. No reply from the backend is issued or expected. This
>>> heartbeat is just a way to ensure that the browser does not
>>> close the connection due to inactivity. This behavior varies from
>>> browser to browser but this 30 second heartbeat is more than enough
>>> to ensure that the websocket is kept alive.
>>>
>>> - Working example in User Log:
>>>
>>> A simple usage is provided in this patch. A change was made in
>>> src/wok/reqlogger.py to send an asynchronous notification each
>>> time a new log entry is created. In ui/js/wok.user-log.js a
>>> websocket listener is added using wok.addNotificationListener()
>>> and, for each message that indicates a new user log entry, a
>>> refresh in the listing is issued.
>>>
>>> Signed-off-by: Daniel Henrique Barboza <danielhb at linux.vnet.ibm.com>
>>> ---
>>>  src/wok/model/notifications.py |  18 +++++-
>>>  src/wok/pushserver.py          | 132 
>>> +++++++++++++++++++++++++++++++++++++++++
>>>  src/wok/reqlogger.py           |   7 ++-
>>>  src/wok/server.py              |   8 ++-
>>>  src/wok/websocket.py           | 123 
>>> ++++++++++++++++++++++++++++++++++++++
>>>  ui/js/src/wok.main.js          |  38 ++++++++++++
>>>  ui/js/wok.user-log.js          |   6 ++
>>>  7 files changed, 327 insertions(+), 5 deletions(-)
>>>  create mode 100644 src/wok/pushserver.py
>>>  create mode 100644 src/wok/websocket.py
>>>
>>> diff --git a/src/wok/model/notifications.py 
>>> b/src/wok/model/notifications.py
>>> index bdb7c78..597eac5 100644
>>> --- a/src/wok/model/notifications.py
>>> +++ b/src/wok/model/notifications.py
>>> @@ -1,7 +1,7 @@
>>>  #
>>>  # Project Wok
>>>  #
>>> -# Copyright IBM Corp, 2016
>>> +# Copyright IBM Corp, 2016-2017
>>>  #
>>>  # This library is free software; you can redistribute it and/or
>>>  # modify it under the terms of the GNU Lesser General Public
>>> @@ -19,12 +19,22 @@
>>>
>>>  from datetime import datetime
>>>
>>> +from wok import config
>>>  from wok.exception import NotFoundError, OperationFailed
>>>  from wok.message import WokMessage
>>> +from wok.pushserver import PushServer
>>>  from wok.utils import wok_log
>>>
>>>
>>>  notificationsStore = {}
>>> +push_server = None
>>> +
>>> +
>>> +def send_websocket_notification(message):
>>> +    global push_server
>>> +
>>> +    if push_server:
>>> +        push_server.send_notification(message)
>>>
>>>
>>>  def add_notification(code, args=None, plugin_name=None):
>>> @@ -57,7 +67,11 @@ def del_notification(code):
>>>
>>>  class NotificationsModel(object):
>>>      def __init__(self, **kargs):
>>> -        pass
>>> +        global push_server
>>> +
>>> +        test_mode = config.config.get('server', 'test').lower() == 
>>> 'true'
>>
>> This will cause some tests to initialize pushserver, since 'test' 
>> option is now tied to which model runs (mockmodel or model). Problem 
>> is that now wok tests run without sudo (patch in ML), and 
>> pushserver's BASE_DIRECTORY is only writable with sudo permission. 
>> Gave a suggestion there.

Current design is that the websocket_proxy isn't initiated in test_mode.


>>
>>
>>> +        if not test_mode:
>>> +            push_server = PushServer()
>> All 'users' of functionality will have their own instance of 
>> pushserver (each with a "while True" running)? Why not a single 
>> instance used by everybody?
>>

One WoK instance will have it's own push server. The constructor of 
NotificationsModel is called
only once per WoK init.

Also, I haven't prepared this feature to be run in a scenario of 
multiple WoK instances running at the
same time - since it's always the same unix socket used, multiples 
instances can't be launched at
once.

To allow multiple push servers to be started simultaneously I would need 
to make the unix socket
randomly generated. This means that the websocket URL would change. The 
UI then would need
a way to discover the current websocket URL to be used, probably using 
the /config API. It is feasible,
but more changes would need to be made.

>
> Good point! Maybe it is better to move it do Server() to initiate the 
> websockets connections to everyone. Also there is no need to 
> distinguish test mode.

Websockets connections are available to everyone as is.

As I said in my previous reply, test_mode is already being distinguished 
in the websocket initialization
of Kimchi:

root.py line 48:

         # When running on test mode, specify the objectstore location to
         # remove the file on server shutting down. That way, the system 
will
         # not suffer any change while running on test mode
         if wok_options.test and (wok_options.test is True or
                                  wok_options.test.lower() == 'true'):
             self.objectstore_loc = tempfile.mktemp()
             self.model = mockmodel.MockModel(self.objectstore_loc)

             def remove_objectstore():
                 if os.path.exists(self.objectstore_loc):
                     os.unlink(self.objectstore_loc)
             cherrypy.engine.subscribe('exit', remove_objectstore)
         else:
             self.model = kimchiModel.Model()
             ws_proxy = websocket.new_ws_proxy()
             cherrypy.engine.subscribe('exit', ws_proxy.terminate)


I assumed that this design was intended and I haven't thought of any
good reason to change it, so this design was considered in the
push_server and also in the websocket initiation in WoK.


>
>>
>>>
>>>      def get_list(self):
>>>          global notificationsStore
>>> diff --git a/src/wok/pushserver.py b/src/wok/pushserver.py
>>> new file mode 100644
>>> index 0000000..8993f00
>>> --- /dev/null
>>> +++ b/src/wok/pushserver.py
>>> @@ -0,0 +1,132 @@
>>> +#
>>> +# Project Wok
>>> +#
>>> +# Copyright IBM Corp, 2017
>>> +#
>>> +# This library is free software; you can redistribute it and/or
>>> +# modify it under the terms of the GNU Lesser General Public
>>> +# License as published by the Free Software Foundation; either
>>> +# version 2.1 of the License, or (at your option) any later version.
>>> +#
>>> +# This library is distributed in the hope that it will be useful,
>>> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
>>> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>>> +# Lesser General Public License for more details.
>>> +#
>>> +# You should have received a copy of the GNU Lesser General Public
>>> +# License along with this library; if not, write to the Free Software
>>> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
>>> 02110-1301 USA
>>> +#
>>> +
>>> +import cherrypy
>>> +import os
>>> +import select
>>> +import socket
>>> +import threading
>>> +
>>> +import websocket
>>> +from utils import wok_log
>>> +
>>> +
>>> +BASE_DIRECTORY = '/run'
>> Suggestion to use:
>> os.path.join('/run/user', str(os.getuid()))
>> in order tests may be run with root.

I would prefer to choose a path that can be written by anyone else to 
allow the push_server
to be started without root.


>>
>>> +TOKEN_NAME = 'woknotifications'
>>> +
>>> +
>>> +class PushServer(object):
>>> +
>>> +    def set_socket_file(self):
>>> +        if not os.path.isdir(BASE_DIRECTORY):
>>> +            try:
>>> +                os.mkdir(BASE_DIRECTORY)
>>> +            except OSError:
>>> +                raise RuntimeError('PushServer base UNIX socket dir 
>>> %s '
>>> +                                   'not found.' % BASE_DIRECTORY)
>>> +
>>> +        self.server_addr = os.path.join(BASE_DIRECTORY, TOKEN_NAME)
>>> +
>>> +        if os.path.exists(self.server_addr):
>>> +            try:
>>> +                os.remove(self.server_addr)
>>> +            except:
>>> +                raise RuntimeError('There is an existing connection 
>>> in %s' %
>>> +                                   self.server_addr)
>>> +
>>> +    def __init__(self):
>>> +        self.set_socket_file()
>>> +
>>> +        websocket.add_proxy_token(TOKEN_NAME, self.server_addr, True)
>>> +
>>> +        self.connections = []
>>> +
>>> +        self.server_running = True
>>> +        self.server_socket = socket.socket(socket.AF_UNIX, 
>>> socket.SOCK_STREAM)
>>> +        self.server_socket.setsockopt(socket.SOL_SOCKET,
>>> +                                      socket.SO_REUSEADDR, 1)
>>> +        self.server_socket.bind(self.server_addr)
>>> +        self.server_socket.listen(10)
>>> +        wok_log.info('Push server created on address %s' % 
>>> self.server_addr)
>>> +
>>> +        self.connections.append(self.server_socket)
>>> +        cherrypy.engine.subscribe('stop', self.close_server, 1)
>>> +
>>> +        server_loop = threading.Thread(target=self.listen)
>>> +        server_loop.start()
>>> +
>>> +    def listen(self):
>>> +        try:
>>> +            while self.server_running:
>>> +                read_ready, _, _ = select.select(self.connections,
>>> +                                                 [], [], 1)
>>> +                for sock in read_ready:
>>> +                    if not self.server_running:
>>> +                        break
>>> +
>>> +                    if sock == self.server_socket:
>>> +
>>> +                        new_socket, addr = self.server_socket.accept()
>>> +                        self.connections.append(new_socket)
>>> +                    else:
>>> +                        try:
>>> +                            data = sock.recv(4096)
>>> +                        except:
>>> +                            try:
>>> +                                self.connections.remove(sock)
>>> +                            except ValueError:
>>> +                                pass
>>> +
>>> +                            continue
>>> +                        if data and data == 'CLOSE':
>>> +                            sock.send('ACK')
>>> +                            try:
>>> +                                self.connections.remove(sock)
>>> +                            except ValueError:
>>> +                                pass
>>> +                            sock.close()
>>> +
>>> +        except Exception as e:
>>> +            raise RuntimeError('Exception ocurred in listen() of 
>>> pushserver '
>>> +                               'module: %s' % e.message)
>>> +
>>> +    def send_notification(self, message):
>>> +        for sock in self.connections:
>>> +            if sock != self.server_socket:
>>> +                try:
>>> +                    sock.send(message)
>>> +                except IOError as e:
>>> +                    if 'Broken pipe' in str(e):
>>> +                        sock.close()
>>> +                        try:
>>> +                            self.connections.remove(sock)
>>> +                        except ValueError:
>>> +                            pass
>>> +
>>> +    def close_server(self):
>>> +        try:
>>> +            self.server_running = False
>>> +            self.server_socket.shutdown(socket.SHUT_RDWR)
>>> +            self.server_socket.close()
>>> +            os.remove(self.server_addr)
>>> +        except:
>>> +            pass
>>> +        finally:
>>> +            cherrypy.engine.unsubscribe('stop', self.close_server)
>>> diff --git a/src/wok/reqlogger.py b/src/wok/reqlogger.py
>>> index 92e155d..1b774e2 100644
>>> --- a/src/wok/reqlogger.py
>>> +++ b/src/wok/reqlogger.py
>>> @@ -1,7 +1,7 @@
>>>  #
>>>  # Project Wok
>>>  #
>>> -# Copyright IBM Corp, 2016
>>> +# Copyright IBM Corp, 2016-2017
>>>  #
>>>  # This library is free software; you can redistribute it and/or
>>>  # modify it under the terms of the GNU Lesser General Public
>>> @@ -34,6 +34,7 @@ from wok.auth import USER_NAME
>>>  from wok.config import get_log_download_path, paths
>>>  from wok.exception import InvalidParameter, OperationFailed
>>>  from wok.message import WokMessage
>>> +from wok.model.notifications import send_websocket_notification
>>>  from wok.stringutils import ascii_dict
>>>  from wok.utils import remove_old_files
>>>
>>> @@ -68,6 +69,8 @@ WOK_REQUEST_LOGGER = "wok_request_logger"
>>>  # AsyncTask handling
>>>  ASYNCTASK_REQUEST_METHOD = 'TASK'
>>>
>>> +NEW_LOG_ENTRY_MESSAGE = 'new_log_entry'
>>> +
>>>
>>>  def log_request(code, params, exception, method, status, app=None, 
>>> user=None,
>>>                  ip=None):
>>> @@ -114,6 +117,8 @@ def log_request(code, params, exception, method, 
>>> status, app=None, user=None,
>>>          ip=ip
>>>      ).log()
>>>
>>> +    send_websocket_notification(NEW_LOG_ENTRY_MESSAGE)
>>> +
>>>      return log_id
>>>
>>>
>>> diff --git a/src/wok/server.py b/src/wok/server.py
>>> index fc2e167..2d823c9 100644
>>> --- a/src/wok/server.py
>>> +++ b/src/wok/server.py
>>> @@ -25,8 +25,7 @@ import logging
>>>  import logging.handlers
>>>  import os
>>>
>>> -from wok import auth
>>> -from wok import config
>>> +from wok import auth, config, websocket
>>>  from wok.config import config as configParser
>>>  from wok.config import WokConfig
>>>  from wok.control import sub_nodes
>>> @@ -159,6 +158,11 @@ class Server(object):
>>>          cherrypy.tree.mount(WokRoot(model.Model(), dev_env),
>>>                              options.server_root, self.configObj)
>>>
>>> +        test_mode = config.config.get('server', 'test').lower() == 
>>> 'true'
>>> +        if not test_mode:
>>> +            ws_proxy = websocket.new_ws_proxy()
>>> +            cherrypy.engine.subscribe('exit', ws_proxy.terminate)
>>> +
>>>          self._load_plugins()
>>>          cherrypy.lib.sessions.init()
>>>
>>> diff --git a/src/wok/websocket.py b/src/wok/websocket.py
>>> new file mode 100644
>>> index 0000000..5d7fb91
>>> --- /dev/null
>>> +++ b/src/wok/websocket.py
>>> @@ -0,0 +1,123 @@
>>> +#!/usr/bin/env python2
>>> +#
>>> +# Project Wok
>>> +#
>>> +# Copyright IBM Corp, 2017
>>> +#
>>> +# Code derived from Project Kimchi
>>> +#
>>> +# This library is free software; you can redistribute it and/or
>>> +# modify it under the terms of the GNU Lesser General Public
>>> +# License as published by the Free Software Foundation; either
>>> +# version 2.1 of the License, or (at your option) any later version.
>>> +#
>>> +# This library is distributed in the hope that it will be useful,
>>> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
>>> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>>> +# Lesser General Public License for more details.
>>> +#
>>> +# You should have received a copy of the GNU Lesser General Public
>>> +# License along with this library; if not, write to the Free Software
>>> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
>>> 02110-1301 USA
>>> +
>>> +import base64
>>> +import errno
>>> +import os
>>> +
>>> +from multiprocessing import Process
>>> +from websockify import WebSocketProxy
>> Add dep on websockify to Wok

Good catch. I'll add it in v2.

>>
>>
>>> +
>>> +from config import config, PluginPaths
>>> +
>>> +
>>> +try:
>>> +    from websockify.token_plugins import TokenFile
>>> +    tokenFile = True
>>> +except ImportError:
>>> +    tokenFile = False
>>> +
>>> +try:
>>> +    from websockify import ProxyRequestHandler as request_proxy
>>> +except:
>>> +    from websockify import WebSocketProxy as request_proxy
>>> +
>>> +
>>> +WS_TOKENS_DIR = os.path.join(PluginPaths('kimchi').state_dir, 
>>> 'ws-tokens')
>> This should be wok path now, and it would be nice to be inside 
>> /run/user as commented above.
>>
>>
Good catch again. I'll change the 'Kimchi' specific code in v2.

>>> +
>>> +
>>> +class CustomHandler(request_proxy):
>>> +
>>> +    def get_target(self, target_plugin, path):
>>> +        if issubclass(CustomHandler, object):
>>> +            target = super(CustomHandler, 
>>> self).get_target(target_plugin,
>>> + path)
>>> +        else:
>>> +            target = request_proxy.get_target(self, target_plugin, 
>>> path)
>>> +
>>> +        if target[0] == 'unix_socket':
>>> +            try:
>>> +                self.server.unix_target = target[1]
>>> +            except:
>>> +                self.unix_target = target[1]
>>> +        else:
>>> +            try:
>>> +                self.server.unix_target = None
>>> +            except:
>>> +                self.unix_target = None
>>> +        return target
>>> +
>>> +
>>> +def new_ws_proxy():
>>> +    try:
>>> +        os.makedirs(WS_TOKENS_DIR, mode=0755)
>>> +    except OSError as e:
>>> +        if e.errno == errno.EEXIST:
>>> +            pass
>>> +
>>> +    params = {'listen_host': '127.0.0.1',
>>> +              'listen_port': config.get('server', 'websockets_port'),
>>> +              'ssl_only': False}
>>> +
>>> +    # old websockify: do not use TokenFile
>>> +    if not tokenFile:
>>> +        params['target_cfg'] = WS_TOKENS_DIR
>>> +
>>> +    # websockify 0.7 and higher: use TokenFile
>>> +    else:
>>> +        params['token_plugin'] = TokenFile(src=WS_TOKENS_DIR)
>>> +
>>> +    def start_proxy():
>>> +        try:
>>> +            server = WebSocketProxy(RequestHandlerClass=CustomHandler,
>>> +                                    **params)
>>> +        except TypeError:
>>> +            server = CustomHandler(**params)
>>> +
>>> +        server.start_server()
>>> +
>>> +    proc = Process(target=start_proxy)
>>> +    proc.start()
>>> +    return proc
>>> +
>>> +
>>> +def add_proxy_token(name, port, is_unix_socket=False):
>>> +    with open(os.path.join(WS_TOKENS_DIR, name), 'w') as f:
>>> +        """
>>> +        From python documentation base64.urlsafe_b64encode(s)
>>> +        substitutes - instead of + and _ instead of / in the
>>> +        standard Base64 alphabet, BUT the result can still
>>> +        contain = which is not safe in a URL query component.
>>> +        So remove it when needed as base64 can work well without it.
>>> +        """
>>> +        name = base64.urlsafe_b64encode(name).rstrip('=')
>>> +        if is_unix_socket:
>>> +            f.write('%s: unix_socket:%s' % (name.encode('utf-8'), 
>>> port))
>>> +        else:
>>> +            f.write('%s: localhost:%s' % (name.encode('utf-8'), port))
>>> +
>>> +
>>> +def remove_proxy_token(name):
>>> +    try:
>>> +        os.unlink(os.path.join(WS_TOKENS_DIR, name))
>>> +    except OSError:
>>> +        pass
>>> diff --git a/ui/js/src/wok.main.js b/ui/js/src/wok.main.js
>>> index 20c017e..f5031ce 100644
>>> --- a/ui/js/src/wok.main.js
>>> +++ b/ui/js/src/wok.main.js
>>> @@ -29,6 +29,41 @@ wok.getConfig(function(result) {
>>>      wok.config = {};
>>>  });
>>>
>>> +
>>> +wok.notificationListeners = {};
>>> +wok.addNotificationListener = function(name, func) {
>>> +    wok.notificationListeners[name] = func;
>>> +    $(window).one("hashchange", function() {
>>> +        delete wok.notificationListeners[name];
>>> +    });
>>> +};
>>> +
>>> +wok.notificationsWebSocket = undefined;
>>> +wok.startNotificationWebSocket = function () {
>>> +    var addr = window.location.hostname + ':' + window.location.port;
>>> +    var token = 
>>> wok.urlSafeB64Encode('woknotifications').replace(/=*$/g, "");
>>> +    var url = 'wss://' + addr + '/websockify?token=' + token;
>>> +    wok.notificationsWebSocket = new WebSocket(url, ['base64']);
>>> +
>>> +    wok.notificationsWebSocket.onmessage = function(event) {
>>> +        var message = window.atob(event.data);
>>> +        for (name in wok.notificationListeners) {
>>> +            func = wok.notificationListeners[name];
>>> +            func(message);
>>> +        }
>>> +    };
>>> +
>>> +    sessionStorage.setItem('wokNotificationWebSocket', 'true');
>>> +    var heartbeat = setInterval(function() {
>>> + wok.notificationsWebSocket.send(window.btoa('heartbeat'));
>>> +    }, 30000);
>>> +
>>> +    wok.notificationsWebSocket.onclose = function() {
>>> +        clearInterval(heartbeat);
>>> +    };
>>> +};
>>> +
>>> +
>>>  wok.main = function() {
>>>      wok.isLoggingOut = false;
>>>      wok.popable();
>>> @@ -395,6 +430,9 @@ wok.main = function() {
>>>
>>>          // Set handler for help button
>>>          $('#btn-help').on('click', wok.openHelp);
>>> +
>>> +        // start WebSocket
>>> +        wok.startNotificationWebSocket();
>>>      };
>>>
>>>      var initUI = function() {
>>> diff --git a/ui/js/wok.user-log.js b/ui/js/wok.user-log.js
>>> index 0e8fb09..083b6c3 100644
>>> --- a/ui/js/wok.user-log.js
>>> +++ b/ui/js/wok.user-log.js
>>> @@ -153,6 +153,12 @@ wok.initUserLogContent = function() {
>>>          $("#user-log-grid").bootgrid("search");
>>>          wok.initUserLogConfigGridData();
>>>      });
>>> +
>>> +    wok.addNotificationListener('userlog', function(message) {
>>> +        if (message === 'new_log_entry') {
>>> +            $("#refresh-button").click();
>>> +        }
>>> +    });
>>>  };
>>>
>>>  wok.initUserLogWindow = function() {
>>>
>>
>>
>
> _______________________________________________
> Kimchi-devel mailing list
> Kimchi-devel at ovirt.org
> http://lists.ovirt.org/mailman/listinfo/kimchi-devel



More information about the Kimchi-devel mailing list