[Kimchi-devel] [PATCH] [WOK] Wok tests
pvital at linux.vnet.ibm.com
pvital at linux.vnet.ibm.com
Wed Sep 23 19:28:05 UTC 2015
From: Paulo Vital <pvital at linux.vnet.ibm.com>
Moved those tests related to Wok framework from plugins/kimchi to wok structure.
Updated build files and tests to make all test structure usable.
Signed-off-by: Paulo Vital <pvital at linux.vnet.ibm.com>
---
Makefile.am | 5 +-
configure.ac | 1 +
plugins/kimchi/tests/test_exception.py | 123 ------------
plugins/kimchi/tests/test_objectstore.py | 97 ---------
plugins/kimchi/tests/test_plugin.py | 126 ------------
plugins/kimchi/tests/test_rollbackcontext.py | 99 ---------
plugins/kimchi/tests/test_server.py | 289 ---------------------------
plugins/kimchi/tests/test_utils.py | 69 -------
tests/Makefile.am | 50 +++++
tests/run_tests.sh.in | 55 +++++
tests/test_config.py.in | 132 ++++++++++++
tests/test_exception.py | 130 ++++++++++++
tests/test_objectstore.py | 97 +++++++++
tests/test_plugin.py | 120 +++++++++++
tests/test_rollbackcontext.py | 99 +++++++++
tests/test_server.py | 282 ++++++++++++++++++++++++++
tests/test_utils.py | 69 +++++++
tests/utils.py | 259 ++++++++++++++++++++++++
18 files changed, 1297 insertions(+), 805 deletions(-)
delete mode 100644 plugins/kimchi/tests/test_exception.py
delete mode 100644 plugins/kimchi/tests/test_objectstore.py
delete mode 100644 plugins/kimchi/tests/test_plugin.py
delete mode 100644 plugins/kimchi/tests/test_rollbackcontext.py
delete mode 100644 plugins/kimchi/tests/test_server.py
delete mode 100644 plugins/kimchi/tests/test_utils.py
create mode 100644 tests/Makefile.am
create mode 100644 tests/run_tests.sh.in
create mode 100644 tests/test_config.py.in
create mode 100644 tests/test_exception.py
create mode 100644 tests/test_objectstore.py
create mode 100644 tests/test_plugin.py
create mode 100644 tests/test_rollbackcontext.py
create mode 100644 tests/test_server.py
create mode 100644 tests/test_utils.py
create mode 100644 tests/utils.py
diff --git a/Makefile.am b/Makefile.am
index c7914d0..1d76260 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -16,7 +16,8 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-SUBDIRS = src ui docs contrib po plugins
+
+SUBDIRS = src ui docs contrib po plugins tests
man_MANS = docs/wokd.8
@@ -35,7 +36,7 @@ EXTRA_DIST = \
$(NULL)
-PEP8_BLACKLIST = *src/wok/config.py,*src/wok/i18n.py,*plugins/kimchi
+PEP8_BLACKLIST = *src/wok/config.py,*src/wok/i18n.py,*plugins/kimchi,*tests/test_config.py
SKIP_PYFLAKES_ERR = "\./src/wok/websocket\.py"
diff --git a/configure.ac b/configure.ac
index e11a17d..ee139e6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -126,6 +126,7 @@ AC_CONFIG_FILES([
contrib/DEBIAN/control
contrib/wok.spec.fedora
contrib/wok.spec.suse
+ tests/Makefile
],[
chmod +x po/gen-pot
])
diff --git a/plugins/kimchi/tests/test_exception.py b/plugins/kimchi/tests/test_exception.py
deleted file mode 100644
index 4459aa6..0000000
--- a/plugins/kimchi/tests/test_exception.py
+++ /dev/null
@@ -1,123 +0,0 @@
-#
-# Kimchi
-#
-# Copyright IBM, Corp. 2013-2014
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import json
-import os
-import unittest
-
-from wok.plugins.kimchi import mockmodel
-
-from utils import get_free_port, patch_auth, request, run_server
-
-
-test_server = None
-model = None
-host = None
-port = None
-ssl_port = None
-
-
-def setup_server(environment='development'):
- global test_server, model, host, port, ssl_port
-
- patch_auth()
- model = mockmodel.MockModel('/tmp/obj-store-test')
- host = '127.0.0.1'
- port = get_free_port('http')
- ssl_port = get_free_port('https')
- test_server = run_server(host, port, ssl_port, test_mode=True, model=model,
- environment=environment)
-
-
-class ExceptionTests(unittest.TestCase):
- def tearDown(self):
- test_server.stop()
- os.unlink('/tmp/obj-store-test')
-
- def test_production_env(self):
- """
- Test reasons sanitized in production env
- """
- setup_server('production')
- # test 404
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms/blah').read()
- )
- self.assertEquals('404 Not Found', resp.get('code'))
-
- # test 405 wrong method
- resp = json.loads(request(host, ssl_port, '/', None, 'DELETE').read())
- msg = u'WOKAPI0002E: Delete is not allowed for wokroot'
- self.assertEquals('405 Method Not Allowed', resp.get('code'))
- self.assertEquals(msg, resp.get('reason'))
-
- # test 400 parse error
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms', '{', 'POST').read()
- )
- msg = u'WOKAPI0006E: Unable to parse JSON request'
- self.assertEquals('400 Bad Request', resp.get('code'))
- self.assertEquals(msg, resp.get('reason'))
- self.assertNotIn('call_stack', resp)
-
- # test 400 missing required parameter
- req = json.dumps({})
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms', req, 'POST').read()
- )
- self.assertEquals('400 Bad Request', resp.get('code'))
- m = u"KCHVM0016E: Specify a template to create a virtual machine from"
- self.assertEquals(m, resp.get('reason'))
- self.assertNotIn('call_stack', resp)
-
- def test_development_env(self):
- """
- Test traceback thrown in development env
- """
- setup_server()
- # test 404
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms/blah').read()
- )
- self.assertEquals('404 Not Found', resp.get('code'))
-
- # test 405 wrong method
- resp = json.loads(request(host, ssl_port, '/', None, 'DELETE').read())
- msg = u'WOKAPI0002E: Delete is not allowed for wokroot'
- self.assertEquals('405 Method Not Allowed', resp.get('code'))
- self.assertEquals(msg, resp.get('reason'))
-
- # test 400 parse error
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms', '{', 'POST').read()
- )
- msg = u'WOKAPI0006E: Unable to parse JSON request'
- self.assertEquals('400 Bad Request', resp.get('code'))
- self.assertEquals(msg, resp.get('reason'))
- self.assertIn('call_stack', resp)
-
- # test 400 missing required parameter
- req = json.dumps({})
- resp = json.loads(
- request(host, ssl_port, '/plugins/kimchi/vms', req, 'POST').read()
- )
- m = u"KCHVM0016E: Specify a template to create a virtual machine from"
- self.assertEquals('400 Bad Request', resp.get('code'))
- self.assertEquals(m, resp.get('reason'))
- self.assertIn('call_stack', resp)
diff --git a/plugins/kimchi/tests/test_objectstore.py b/plugins/kimchi/tests/test_objectstore.py
deleted file mode 100644
index 632786f..0000000
--- a/plugins/kimchi/tests/test_objectstore.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Project Kimchi
-#
-# Copyright IBM, Corp. 2015
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import os
-import tempfile
-import threading
-import unittest
-
-from wok import objectstore
-from wok.exception import NotFoundError
-
-
-tmpfile = None
-
-
-def setUpModule():
- global tmpfile
- tmpfile = tempfile.mktemp()
-
-
-def tearDownModule():
- os.unlink(tmpfile)
-
-
-class ObjectStoreTests(unittest.TestCase):
- def test_objectstore(self):
- store = objectstore.ObjectStore(tmpfile)
-
- with store as session:
- # Test create
- session.store('fÇÇ', 'tÄst1', {'α': 1})
- session.store('fÇÇ', 'tÄst2', {'β': 2})
-
- # Test list
- items = session.get_list('fÇÇ')
- self.assertTrue(u'tÄst1' in items)
- self.assertTrue(u'tÄst2' in items)
-
- # Test get
- item = session.get('fÇÇ', 'tÄst1')
- self.assertEquals(1, item[u'α'])
-
- # Test delete
- session.delete('fÇÇ', 'tÄst2')
- self.assertEquals(1, len(session.get_list('fÇÇ')))
-
- # Test get non-existent item
-
- self.assertRaises(NotFoundError, session.get,
- 'α', 'β')
-
- # Test delete non-existent item
- self.assertRaises(NotFoundError, session.delete,
- 'fÇÇ', 'tÄst2')
-
- # Test refresh existing item
- session.store('fÇÇ', 'tÄst1', {'α': 2})
- item = session.get('fÇÇ', 'tÄst1')
- self.assertEquals(2, item[u'α'])
-
- def test_object_store_threaded(self):
- def worker(ident):
- with store as session:
- session.store('foo', ident, {})
-
- store = objectstore.ObjectStore(tmpfile)
-
- threads = []
- for i in xrange(50):
- t = threading.Thread(target=worker, args=(i,))
- t.setDaemon(True)
- t.start()
- threads.append(t)
-
- for t in threads:
- t.join()
-
- with store as session:
- self.assertEquals(50, len(session.get_list('foo')))
- self.assertEquals(10, len(store._connections.keys()))
diff --git a/plugins/kimchi/tests/test_plugin.py b/plugins/kimchi/tests/test_plugin.py
deleted file mode 100644
index fc8e277..0000000
--- a/plugins/kimchi/tests/test_plugin.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#
-# Project Kimchi
-#
-# Copyright IBM, Corp. 2013-2014
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import json
-import os
-import unittest
-from functools import partial
-
-from wok.utils import get_enabled_plugins
-
-from wok.plugins.kimchi import mockmodel
-
-import utils
-
-
-test_server = None
-model = None
-host = None
-port = None
-ssl_port = None
-
-
-def setUpModule():
- global test_server, model, host, port, ssl_port
-
- utils.patch_auth()
- model = mockmodel.MockModel('/tmp/obj-store-test')
- host = '127.0.0.1'
- port = utils.get_free_port('http')
- ssl_port = utils.get_free_port('https')
- test_server = utils.run_server(host, port, ssl_port, test_mode=True,
- model=model)
-
-
-def tearDownModule():
- test_server.stop()
- os.unlink('/tmp/obj-store-test')
-
-
- at unittest.skipUnless(
- 'sample' in [plugin for plugin, _config in get_enabled_plugins()],
- 'sample plugin is not enabled, skip this test!')
-class PluginTests(unittest.TestCase):
-
- def setUp(self):
- self.request = partial(utils.request, host, ssl_port)
-
- def _create_rectangle(self, name, length, width):
- req = json.dumps({'name': name, 'length': length, 'width': width})
- resp = self.request('/plugins/sample/rectangles', req, 'POST')
- return resp
-
- def _get_rectangle(self, name):
- resp = self.request('/plugins/sample/rectangles/%s' % name)
- return json.loads(resp.read())
-
- def _create_rectangle_and_assert(self, name, length, width):
- resp = self._create_rectangle(name, length, width)
- self.assertEquals(201, resp.status)
-
- rectangle = self._get_rectangle(name)
- self.assertEquals(rectangle['name'], name)
- self.assertEquals(rectangle['length'], length)
- self.assertEquals(rectangle['width'], width)
-
- def _get_rectangles_list(self):
- resp = self.request('/plugins/sample/rectangles')
- rectangles = json.loads(resp.read())
- name_list = [rectangle['name'] for rectangle in rectangles]
- return name_list
-
- def test_rectangles(self):
- # Create two new rectangles
- self._create_rectangle_and_assert('small', 10, 8)
- self._create_rectangle_and_assert('big', 20, 16)
-
- # Verify they're in the list
- name_list = self._get_rectangles_list()
- self.assertIn('small', name_list)
- self.assertIn('big', name_list)
-
- # Update the big rectangle.
- req = json.dumps({'length': 40, 'width': 30})
- resp = self.request('/plugins/sample/rectangles/big', req, 'PUT')
- self.assertEquals(200, resp.status)
- big = self._get_rectangle('big')
- self.assertEquals(big['length'], 40)
- self.assertEquals(big['width'], 30)
-
- # Delete two rectangles
- resp = self.request('/plugins/sample/rectangles/big', '{}', 'DELETE')
- self.assertEquals(204, resp.status)
- resp = self.request('/plugins/sample/rectangles/small', '{}', 'DELETE')
- self.assertEquals(204, resp.status)
- name_list = self._get_rectangles_list()
- self.assertEquals([], name_list)
-
- def test_bad_params(self):
- # Bad name
- resp = self._create_rectangle(1.0, 30, 40)
- self.assertEquals(400, resp.status)
-
- # Bad length value
- resp = self._create_rectangle('test', -10.0, 40)
- self.assertEquals(400, resp.status)
-
- # Missing param for width
- req = json.dumps({'name': 'nowidth', 'length': 40})
- resp = self.request('/plugins/sample/rectangles', req, 'POST')
- self.assertEquals(400, resp.status)
diff --git a/plugins/kimchi/tests/test_rollbackcontext.py b/plugins/kimchi/tests/test_rollbackcontext.py
deleted file mode 100644
index 6eac6d0..0000000
--- a/plugins/kimchi/tests/test_rollbackcontext.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# Project Kimchi
-#
-# Copyright IBM, Corp. 2014
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import unittest
-
-from wok.rollbackcontext import RollbackContext
-
-
-class FirstError(Exception):
- '''A hypothetical exception to be raise in the test firstly.'''
- pass
-
-
-class SecondError(Exception):
- '''A hypothetical exception to be raise in the test secondly.'''
- pass
-
-
-class RollbackContextTests(unittest.TestCase):
-
- def setUp(self):
- self._counter = 0
-
- def _inc_counter(self):
- self._counter += 1
-
- def _raise(self, exception=FirstError):
- raise exception()
-
- def test_rollback(self):
- with RollbackContext() as rollback:
- rollback.prependDefer(self._inc_counter)
- rollback.prependDefer(self._inc_counter)
- self.assertEquals(self._counter, 2)
-
- def test_raise(self):
- try:
- with RollbackContext() as rollback:
- rollback.prependDefer(self._inc_counter)
- rollback.prependDefer(self._inc_counter)
- raise FirstError()
- rollback.prependDefer(self._inc_counter)
- except FirstError:
- # All undo before the FirstError should be run
- self.assertEquals(self._counter, 2)
- else:
- self.fail('Should have raised FirstError')
-
- def test_raise_undo(self):
- try:
- with RollbackContext() as rollback:
- rollback.prependDefer(self._inc_counter)
- rollback.prependDefer(self._raise)
- rollback.prependDefer(self._inc_counter)
- except FirstError:
- # All undo should be run
- self.assertEquals(self._counter, 2)
- else:
- self.fail('Should have raised FirstError')
-
- def test_raise_prefer_original(self):
- try:
- with RollbackContext() as rollback:
- rollback.prependDefer(self._raise, SecondError)
- raise FirstError()
- except FirstError:
- pass
- except SecondError:
- self.fail('Should have preferred FirstError to SecondError')
- else:
- self.fail('Should have raised FirstError')
-
- def test_raise_prefer_first_undo(self):
- try:
- with RollbackContext() as rollback:
- rollback.prependDefer(self._raise, SecondError)
- rollback.prependDefer(self._raise, FirstError)
- except FirstError:
- pass
- except SecondError:
- self.fail('Should have preferred FirstError to SecondError')
- else:
- self.fail('Should have raised FirstError')
diff --git a/plugins/kimchi/tests/test_server.py b/plugins/kimchi/tests/test_server.py
deleted file mode 100644
index d5ef565..0000000
--- a/plugins/kimchi/tests/test_server.py
+++ /dev/null
@@ -1,289 +0,0 @@
-#
-# Project Kimchi
-#
-# Copyright IBM, Corp. 2013-2015
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import base64
-import cherrypy
-import json
-import os
-import tempfile
-import threading
-import unittest
-from functools import partial
-
-from wok.control.base import Collection, Resource
-
-from wok.plugins.kimchi import mockmodel
-
-import utils
-
-
-test_server = None
-model = None
-host = None
-port = None
-ssl_port = None
-cherrypy_port = None
-tmpfile = None
-
-
-def setUpModule():
- global test_server, model, host, port, ssl_port, cherrypy_port, tmpfile
-
- utils.patch_auth()
- tmpfile = tempfile.mktemp()
- model = mockmodel.MockModel(tmpfile)
- host = '127.0.0.1'
- port = utils.get_free_port('http')
- ssl_port = utils.get_free_port('https')
- cherrypy_port = utils.get_free_port('cherrypy_port')
- test_server = utils.run_server(host, port, ssl_port, test_mode=True,
- cherrypy_port=cherrypy_port, model=model)
-
-
-def tearDownModule():
- test_server.stop()
- os.unlink(tmpfile)
-
-
-class ServerTests(unittest.TestCase):
- def setUp(self):
- self.request = partial(utils.request, host, ssl_port)
- model.reset()
-
- def assertValidJSON(self, txt):
- try:
- json.loads(txt)
- except ValueError:
- self.fail("Invalid JSON: %s" % txt)
-
- def test_server_start(self):
- """
- Test that we can start a server and receive HTTP:200.
- """
- resp = self.request('/')
- self.assertEquals(200, resp.status)
-
- def test_multithreaded_connection(self):
- def worker():
- for i in xrange(100):
- ret = model.vms_get_list()
- self.assertEquals('test', ret[0])
-
- threads = []
- for i in xrange(100):
- t = threading.Thread(target=worker)
- t.setDaemon(True)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
-
- def test_collection(self):
- c = Collection(model)
-
- # The base Collection is always empty
- cherrypy.request.method = 'GET'
- cherrypy.request.headers['Accept'] = 'application/json'
- self.assertEquals('[]', c.index())
-
- # POST and DELETE raise HTTP:405 by default
- for method in ('POST', 'DELETE'):
- cherrypy.request.method = method
- try:
- c.index()
- except cherrypy.HTTPError, e:
- self.assertEquals(405, e.code)
- else:
- self.fail("Expected exception not raised")
-
- def test_resource(self):
- r = Resource(model)
-
- # Test the base Resource representation
- cherrypy.request.method = 'GET'
- cherrypy.request.headers['Accept'] = 'application/json'
- self.assertEquals('{}', r.index())
-
- # POST and DELETE raise HTTP:405 by default
- for method in ('POST', 'DELETE'):
- cherrypy.request.method = method
- try:
- r.index()
- except cherrypy.HTTPError, e:
- self.assertEquals(405, e.code)
- else:
- self.fail("Expected exception not raised")
-
- def test_404(self):
- """
- A non-existent path should return HTTP:404
- """
- url_list = ['/plugins/kimchi/doesnotexist', '/plugins/kimchi/vms/blah']
- for url in url_list:
- resp = self.request(url)
- self.assertEquals(404, resp.status)
-
- # Verify it works for DELETE too
- resp = self.request('/plugins/kimchi/templates/blah', '', 'DELETE')
- self.assertEquals(404, resp.status)
-
- def test_accepts(self):
- """
- Verify the following expectations regarding the client Accept header:
- If omitted, default to html
- If 'application/json', serve the rest api
- If 'text/html', serve the UI
- If both of the above (in any order), serve the rest api
- If neither of the above, HTTP:406
- """
- resp = self.request("/", headers={})
- location = resp.getheader('location')
- self.assertTrue(location.endswith("login.html"))
- resp = self.request("/login.html", headers={})
- self.assertTrue('<!doctype html>' in resp.read().lower())
-
- resp = self.request("/", headers={'Accept': 'application/json'})
- self.assertValidJSON(resp.read())
-
- resp = self.request("/", headers={'Accept': 'text/html'})
- location = resp.getheader('location')
- self.assertTrue(location.endswith("login.html"))
-
- resp = self.request("/", headers={'Accept':
- 'application/json, text/html'})
- self.assertValidJSON(resp.read())
-
- resp = self.request("/", headers={'Accept':
- 'text/html, application/json'})
- self.assertValidJSON(resp.read())
-
- h = {'Accept': 'text/plain'}
- resp = self.request('/', None, 'GET', h)
- self.assertEquals(406, resp.status)
-
- def test_auth_unprotected(self):
- hdrs = {'AUTHORIZATION': ''}
- uris = ['/plugins/kimchi/js/kimchi.min.js',
- '/plugins/kimchi/css/theme-default.min.css',
- '/plugins/kimchi/images/icon-vm.png',
- '/libs/jquery-1.10.0.min.js',
- '/login.html',
- '/logout']
-
- for uri in uris:
- resp = self.request(uri, None, 'HEAD', hdrs)
- self.assertEquals(200, resp.status)
-
- def test_auth_protected(self):
- hdrs = {'AUTHORIZATION': ''}
- uris = ['/plugins/kimchi/vms',
- '/plugins/kimchi/vms/doesnotexist',
- '/tasks']
-
- for uri in uris:
- resp = self.request(uri, None, 'GET', hdrs)
- self.assertEquals(401, resp.status)
-
- def test_auth_bad_creds(self):
- # Test HTTPBA
- hdrs = {'AUTHORIZATION': "Basic " + base64.b64encode("nouser:badpass")}
- resp = self.request('/plugins/kimchi/vms', None, 'GET', hdrs)
- self.assertEquals(401, resp.status)
-
- # Test REST API
- hdrs = {'AUTHORIZATION': ''}
- req = json.dumps({'username': 'nouser', 'password': 'badpass'})
- resp = self.request('/login', req, 'POST', hdrs)
- self.assertEquals(401, resp.status)
-
- def test_auth_browser_no_httpba(self):
- # Kimchi detects REST requests from the browser by looking for a
- # specific header
- hdrs = {"X-Requested-With": "XMLHttpRequest"}
-
- # Try our request (Note that request() will add a valid HTTPBA header)
- resp = self.request('/plugins/kimchi/vms', None, 'GET', hdrs)
- self.assertEquals(401, resp.status)
- self.assertEquals(None, resp.getheader('WWW-Authenticate'))
-
- def test_auth_session(self):
- hdrs = {'AUTHORIZATION': '',
- 'Content-Type': 'application/json',
- 'Accept': 'application/json'}
-
- # Test we are logged out
- resp = self.request('/tasks', None, 'GET', hdrs)
- self.assertEquals(401, resp.status)
-
- # Execute a login call
- user, pw = mockmodel.fake_user.items()[0]
- req = json.dumps({'username': user, 'password': pw})
- resp = self.request('/login', req, 'POST', hdrs)
- self.assertEquals(200, resp.status)
-
- user_info = json.loads(resp.read())
- self.assertEquals(sorted(user_info.keys()),
- ['groups', 'roles', 'username'])
- roles = user_info['roles']
- for tab, role in roles.iteritems():
- self.assertEquals(role, u'admin')
-
- cookie = resp.getheader('set-cookie')
- hdrs['Cookie'] = cookie
-
- # Test we are logged in with the cookie
- resp = self.request('/tasks', None, 'GET', hdrs)
- self.assertEquals(200, resp.status)
-
- # Execute a logout call
- resp = self.request('/logout', '{}', 'POST', hdrs)
- self.assertEquals(200, resp.status)
- del hdrs['Cookie']
-
- # Test we are logged out
- resp = self.request('/tasks', None, 'GET', hdrs)
- self.assertEquals(401, resp.status)
-
- def test_get_param(self):
- # Create a mock ISO file
- mockiso = '/tmp/mock.iso'
- open('/tmp/mock.iso', 'w').close()
-
- # Create 2 different templates
- req = json.dumps({'name': 'test-tmpl1', 'cdrom': mockiso})
- self.request('/plugins/kimchi/templates', req, 'POST')
-
- req = json.dumps({'name': 'test-tmpl2', 'cdrom': mockiso})
- self.request('/plugins/kimchi/templates', req, 'POST')
-
- # Remove mock iso
- os.unlink(mockiso)
-
- # Get the templates
- resp = self.request('/plugins/kimchi/templates')
- self.assertEquals(200, resp.status)
- res = json.loads(resp.read())
- self.assertEquals(2, len(res))
-
- # Get a specific template
- resp = self.request('/plugins/kimchi/templates?name=test-tmpl1')
- self.assertEquals(200, resp.status)
- res = json.loads(resp.read())
- self.assertEquals(1, len(res))
- self.assertEquals('test-tmpl1', res[0]['name'])
diff --git a/plugins/kimchi/tests/test_utils.py b/plugins/kimchi/tests/test_utils.py
deleted file mode 100644
index bcb14e2..0000000
--- a/plugins/kimchi/tests/test_utils.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Project Kimchi
-#
-# Copyright IBM, Corp. 2015
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
-import unittest
-
-from wok.exception import InvalidParameter
-from wok.utils import convert_data_size
-
-
-class UtilsTests(unittest.TestCase):
- def test_convert_data_size(self):
- failure_data = [{'val': None, 'from': 'MiB'},
- {'val': self, 'from': 'MiB'},
- {'val': 1, 'from': None},
- {'val': 1, 'from': ''},
- {'val': 1, 'from': 'foo'},
- {'val': 1, 'from': 'kib'},
- {'val': 1, 'from': 'MiB', 'to': None},
- {'val': 1, 'from': 'MiB', 'to': ''},
- {'val': 1, 'from': 'MiB', 'to': 'foo'},
- {'val': 1, 'from': 'MiB', 'to': 'kib'}]
-
- for d in failure_data:
- if 'to' in d:
- self.assertRaises(InvalidParameter, convert_data_size,
- d['val'], d['from'], d['to'])
- else:
- self.assertRaises(InvalidParameter, convert_data_size,
- d['val'], d['from'])
-
- success_data = [{'got': convert_data_size(5, 'MiB', 'MiB'),
- 'want': 5},
- {'got': convert_data_size(5, 'MiB', 'KiB'),
- 'want': 5120},
- {'got': convert_data_size(5, 'MiB', 'M'),
- 'want': 5.24288},
- {'got': convert_data_size(5, 'MiB', 'GiB'),
- 'want': 0.0048828125},
- {'got': convert_data_size(5, 'MiB', 'Tb'),
- 'want': 4.194304e-05},
- {'got': convert_data_size(5, 'KiB', 'MiB'),
- 'want': 0.0048828125},
- {'got': convert_data_size(5, 'M', 'MiB'),
- 'want': 4.76837158203125},
- {'got': convert_data_size(5, 'GiB', 'MiB'),
- 'want': 5120},
- {'got': convert_data_size(5, 'Tb', 'MiB'),
- 'want': 596046.4477539062},
- {'got': convert_data_size(5, 'MiB'),
- 'want': convert_data_size(5, 'MiB', 'B')}]
-
- for d in success_data:
- self.assertEquals(d['got'], d['want'])
diff --git a/tests/Makefile.am b/tests/Makefile.am
new file mode 100644
index 0000000..c1f6784
--- /dev/null
+++ b/tests/Makefile.am
@@ -0,0 +1,50 @@
+#
+# Kimchi
+#
+# Copyright IBM Corp, 2013
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+EXTRA_DIST = \
+ Makefile.am \
+ run_tests.sh.in \
+ test_config.py.in \
+ $(filter-out test_config.py, $(wildcard *.py)) \
+ $(NULL)
+
+noinst_SCRIPTS = run_tests.sh
+
+do_substitution = \
+ sed -e 's,[@]HAVE_PYMOD_UNITTEST[@],$(HAVE_PYMOD_UNITTEST),g' \
+ -e 's,[@]prefix[@],$(prefix),g' \
+ -e 's,[@]datadir[@],$(datadir),g' \
+ -e 's,[@]PYTHON_VERSION[@],$(PYTHON_VERSION),g' \
+ -e 's,[@]wokdir[@],$(pythondir)/wok,g' \
+ -e 's,[@]pkgdatadir[@],$(pkgdatadir),g'
+
+
+run_tests.sh: run_tests.sh.in Makefile
+ $(do_substitution) < $(srcdir)/run_tests.sh.in > run_tests.sh
+ chmod +x run_tests.sh
+
+test_config.py: test_config.py.in Makefile
+ $(do_substitution) < $(srcdir)/test_config.py.in > test_config.py
+
+check-local:
+ $(MKDIR_P) $(top_srcdir)/data/screenshots
+ ./run_tests.sh
+
+BUILT_SOURCES = test_config.py
+CLEANFILES = run_tests.sh test_config.py
diff --git a/tests/run_tests.sh.in b/tests/run_tests.sh.in
new file mode 100644
index 0000000..d1f4b38
--- /dev/null
+++ b/tests/run_tests.sh.in
@@ -0,0 +1,55 @@
+#!/bin/bash
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2013-2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+HAVE_UNITTEST=@HAVE_PYMOD_UNITTEST@
+PYTHON_VER=@PYTHON_VERSION@
+
+if [ "$1" = "-v" ]; then
+ OPTS="-v"
+ shift
+else
+ OPTS=""
+fi
+
+if [ $# -ne 0 ]; then
+ ARGS="$@"
+else
+ ARGS=`find -name "test_*.py" | xargs -I @ basename @ .py`
+fi
+
+if [ "$HAVE_UNITTEST" != "yes" -o "$PYTHON_VER" == "2.6" ]; then
+ CMD="unit2"
+else
+ CMD="python -m unittest"
+fi
+
+LIST=($ARGS)
+MODEL_LIST=()
+MOCK_LIST=()
+for ((i=0;i<${#LIST[@]};i++)); do
+
+ if [[ ${LIST[$i]} == test_model* ]]; then
+ MODEL_LIST+=(${LIST[$i]})
+ else
+ MOCK_LIST+=(${LIST[$i]})
+ fi
+done
+
+PYTHONPATH=../src:../ $CMD $OPTS ${MODEL_LIST[@]} ${MOCK_LIST[@]}
diff --git a/tests/test_config.py.in b/tests/test_config.py.in
new file mode 100644
index 0000000..3ef431f
--- /dev/null
+++ b/tests/test_config.py.in
@@ -0,0 +1,132 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2014-2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import unittest
+
+from wok.config import Paths, WokConfig
+
+
+get_prefix = None
+
+
+def setUpModule():
+ global get_prefix
+ get_prefix = Paths.get_prefix
+
+
+def tearDownModule():
+ Paths.get_prefix = get_prefix
+
+
+class ConfigTests(unittest.TestCase):
+ def assertInstalledPath(self, actual, expected):
+ if '@pkgdatadir@' != '/usr/share/wok':
+ usr_local = '/usr/local'
+ if not expected.startswith('/usr'):
+ expected = usr_local + expected
+ self.assertEquals(actual, expected)
+
+ def test_installed_paths(self):
+ Paths.get_prefix = lambda self: '@datadir@/wok'
+ paths = Paths()
+ self.assertInstalledPath(paths.state_dir, '/var/lib/wok')
+ self.assertInstalledPath(paths.log_dir, '/var/log/wok')
+ self.assertInstalledPath(paths.conf_dir, '/etc/wok')
+ self.assertInstalledPath(paths.src_dir, '@wokdir@')
+ self.assertInstalledPath(paths.plugins_dir, '@wokdir@/plugins')
+ self.assertInstalledPath(paths.ui_dir, '@datadir@/wok/ui')
+ self.assertInstalledPath(paths.mo_dir, '@prefix@/share/locale')
+
+ def test_uninstalled_paths(self):
+ Paths.get_prefix = lambda self: '/home/user/wok'
+ paths = Paths()
+ self.assertEquals(paths.state_dir, '/home/user/wok/data')
+ self.assertEquals(paths.log_dir, '/home/user/wok/log')
+ self.assertEquals(paths.conf_dir, '/home/user/wok/src')
+ self.assertEquals(paths.src_dir, '/home/user/wok/src/wok')
+ self.assertEquals(paths.plugins_dir, '/home/user/wok/plugins')
+ self.assertEquals(paths.ui_dir, '/home/user/wok/ui')
+ self.assertEquals(paths.mo_dir, '/home/user/wok/mo')
+
+ def test_wok_config(self):
+ Paths.get_prefix = get_prefix
+ paths = Paths()
+ CACHEEXPIRES = 31536000
+ SESSIONSTIMEOUT = 10
+ configObj = {
+ '/': {
+ 'tools.trailing_slash.on': False,
+ 'request.methods_with_bodies': ('POST', 'PUT'),
+ 'tools.nocache.on': True,
+ 'tools.proxy.on': True,
+ 'tools.sessions.on': True,
+ 'tools.sessions.name': 'wok',
+ 'tools.sessions.secure': True,
+ 'tools.sessions.httponly': True,
+ 'tools.sessions.locking': 'explicit',
+ 'tools.sessions.storage_type': 'ram',
+ 'tools.sessions.timeout': SESSIONSTIMEOUT,
+ 'tools.wokauth.on': False
+ },
+ '/wok-ui.html': {
+ 'tools.wokauth.on': True
+ },
+ '/css': {
+ 'tools.staticdir.on': True,
+ 'tools.staticdir.dir': '%s/ui/css' % paths.prefix,
+ 'tools.expires.on': True,
+ 'tools.expires.secs': CACHEEXPIRES,
+ 'tools.nocache.on': False,
+ 'tools.wokauth.on': False
+ },
+ '/js': {
+ 'tools.staticdir.on': True,
+ 'tools.staticdir.dir': '%s/ui/js' % paths.prefix,
+ 'tools.expires.on': True,
+ 'tools.expires.secs': CACHEEXPIRES,
+ 'tools.nocache.on': False,
+ 'tools.wokauth.on': False
+ },
+ '/libs': {
+ 'tools.staticdir.on': True,
+ 'tools.staticdir.dir': '%s/ui/libs' % paths.prefix,
+ 'tools.expires.on': True,
+ 'tools.expires.secs': CACHEEXPIRES,
+ 'tools.nocache.on': False,
+ 'tools.wokauth.on': False
+ },
+ '/images': {
+ 'tools.staticdir.on': True,
+ 'tools.staticdir.dir': '%s/ui/images' % paths.prefix,
+ 'tools.nocache.on': False,
+ 'tools.wokauth.on': False
+ },
+ '/favicon.ico': {
+ 'tools.staticfile.on': True,
+ 'tools.staticfile.filename':
+ '%s/images/logo.ico' % paths.ui_dir
+ },
+ '/robots.txt': {
+ 'tools.staticfile.on': True,
+ 'tools.staticfile.filename': '%s/robots.txt' % paths.ui_dir
+ },
+ }
+
+ wok_config = WokConfig()
+ self.assertEquals(wok_config, configObj)
diff --git a/tests/test_exception.py b/tests/test_exception.py
new file mode 100644
index 0000000..c0fc11e
--- /dev/null
+++ b/tests/test_exception.py
@@ -0,0 +1,130 @@
+#
+# Kimchi
+#
+# Copyright IBM, Corp. 2013-2014
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import json
+import unittest
+
+from utils import get_free_port, patch_auth, request, run_server
+
+
+test_server = None
+model = None
+host = None
+port = None
+ssl_port = None
+
+
+def setup_server(environment='development'):
+ global test_server, model, host, port, ssl_port
+
+ patch_auth()
+ host = '127.0.0.1'
+ port = get_free_port('http')
+ ssl_port = get_free_port('https')
+ test_server = run_server(host, port, ssl_port, test_mode=True,
+ environment=environment)
+
+
+class ExceptionTests(unittest.TestCase):
+ def tearDown(self):
+ test_server.stop()
+
+ def test_production_env(self):
+ """
+ Test reasons sanitized in production env
+ """
+ setup_server('production')
+
+ # test 404
+ resp = json.loads(request(host, ssl_port, '/tasks/blah').read())
+ self.assertEquals('404 Not Found', resp.get('code'))
+
+ # test 405 wrong method
+ resp = json.loads(request(host, ssl_port, '/', None, 'DELETE').read())
+ msg = u'WOKAPI0002E: Delete is not allowed for wokroot'
+ self.assertEquals('405 Method Not Allowed', resp.get('code'))
+ self.assertEquals(msg, resp.get('reason'))
+
+ # test 400 parse error
+ resp = json.loads(request(host, ssl_port, '/tasks', '{',
+ 'POST').read())
+ msg = u'WOKAPI0006E: Unable to parse JSON request'
+ self.assertEquals('400 Bad Request', resp.get('code'))
+ self.assertEquals(msg, resp.get('reason'))
+ self.assertNotIn('call_stack', resp)
+
+ # test 400 missing required parameter
+ # TODO: need add this test when some REST API from wok accepts POST
+# req = json.dumps({})
+# resp = json.loads(request(host, ssl_port, '/tasks', req,
+# 'POST').read())
+# self.assertEquals('400 Bad Request', resp.get('code'))
+# m = u"KCHVM0016E: Specify a template to create a virtual machine from"
+# self.assertEquals(m, resp.get('reason'))
+# self.assertNotIn('call_stack', resp)
+
+ # test 405 method not allowed
+ req = json.dumps({})
+ resp = json.loads(request(host, ssl_port, '/tasks', req,
+ 'POST').read())
+ m = u"WOKAPI0005E: Create is not allowed for tasks"
+ self.assertEquals('405 Method Not Allowed', resp.get('code'))
+ self.assertEquals(m, resp.get('reason'))
+
+ def test_development_env(self):
+ """
+ Test traceback thrown in development env
+ """
+ setup_server()
+ # test 404
+ resp = json.loads(request(host, ssl_port, '/tasks/blah').read())
+ self.assertEquals('404 Not Found', resp.get('code'))
+
+ # test 405 wrong method
+ resp = json.loads(request(host, ssl_port, '/', None, 'DELETE').read())
+ msg = u'WOKAPI0002E: Delete is not allowed for wokroot'
+ self.assertEquals('405 Method Not Allowed', resp.get('code'))
+ self.assertEquals(msg, resp.get('reason'))
+
+ # test 400 parse error
+ resp = json.loads(request(host, ssl_port, '/tasks', '{',
+ 'POST').read())
+ msg = u'WOKAPI0006E: Unable to parse JSON request'
+ self.assertEquals('400 Bad Request', resp.get('code'))
+ self.assertEquals(msg, resp.get('reason'))
+ self.assertIn('call_stack', resp)
+
+ # test 400 missing required parameter
+ # TODO: need add this test when some REST API from wok accepts POST
+# req = json.dumps({})
+# resp = json.loads(request(host, ssl_port, '/tasks', req,
+# 'POST').read())
+# m = u"KCHVM0016E: Specify a template to create a virtual machine from"
+# self.assertEquals('400 Bad Request', resp.get('code'))
+# self.assertEquals(m, resp.get('reason'))
+# self.assertIn('call_stack', resp)
+
+ # test 405 method not allowed
+ req = json.dumps({})
+ resp = json.loads(request(host, ssl_port, '/tasks', req,
+ 'POST').read())
+ m = u"WOKAPI0005E: Create is not allowed for tasks"
+ self.assertEquals('405 Method Not Allowed', resp.get('code'))
+ self.assertEquals(m, resp.get('reason'))
+ self.assertIn('call_stack', resp)
diff --git a/tests/test_objectstore.py b/tests/test_objectstore.py
new file mode 100644
index 0000000..632786f
--- /dev/null
+++ b/tests/test_objectstore.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import tempfile
+import threading
+import unittest
+
+from wok import objectstore
+from wok.exception import NotFoundError
+
+
+tmpfile = None
+
+
+def setUpModule():
+ global tmpfile
+ tmpfile = tempfile.mktemp()
+
+
+def tearDownModule():
+ os.unlink(tmpfile)
+
+
+class ObjectStoreTests(unittest.TestCase):
+ def test_objectstore(self):
+ store = objectstore.ObjectStore(tmpfile)
+
+ with store as session:
+ # Test create
+ session.store('fÇÇ', 'tÄst1', {'α': 1})
+ session.store('fÇÇ', 'tÄst2', {'β': 2})
+
+ # Test list
+ items = session.get_list('fÇÇ')
+ self.assertTrue(u'tÄst1' in items)
+ self.assertTrue(u'tÄst2' in items)
+
+ # Test get
+ item = session.get('fÇÇ', 'tÄst1')
+ self.assertEquals(1, item[u'α'])
+
+ # Test delete
+ session.delete('fÇÇ', 'tÄst2')
+ self.assertEquals(1, len(session.get_list('fÇÇ')))
+
+ # Test get non-existent item
+
+ self.assertRaises(NotFoundError, session.get,
+ 'α', 'β')
+
+ # Test delete non-existent item
+ self.assertRaises(NotFoundError, session.delete,
+ 'fÇÇ', 'tÄst2')
+
+ # Test refresh existing item
+ session.store('fÇÇ', 'tÄst1', {'α': 2})
+ item = session.get('fÇÇ', 'tÄst1')
+ self.assertEquals(2, item[u'α'])
+
+ def test_object_store_threaded(self):
+ def worker(ident):
+ with store as session:
+ session.store('foo', ident, {})
+
+ store = objectstore.ObjectStore(tmpfile)
+
+ threads = []
+ for i in xrange(50):
+ t = threading.Thread(target=worker, args=(i,))
+ t.setDaemon(True)
+ t.start()
+ threads.append(t)
+
+ for t in threads:
+ t.join()
+
+ with store as session:
+ self.assertEquals(50, len(session.get_list('foo')))
+ self.assertEquals(10, len(store._connections.keys()))
diff --git a/tests/test_plugin.py b/tests/test_plugin.py
new file mode 100644
index 0000000..5d371e5
--- /dev/null
+++ b/tests/test_plugin.py
@@ -0,0 +1,120 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2013-2014
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import json
+import unittest
+from functools import partial
+
+from wok.utils import get_enabled_plugins
+
+import utils
+
+
+test_server = None
+model = None
+host = None
+port = None
+ssl_port = None
+
+
+def setUpModule():
+ global test_server, model, host, port, ssl_port
+
+ utils.patch_auth()
+ host = '127.0.0.1'
+ port = utils.get_free_port('http')
+ ssl_port = utils.get_free_port('https')
+ test_server = utils.run_server(host, port, ssl_port, test_mode=True)
+
+
+def tearDownModule():
+ test_server.stop()
+
+
+ at unittest.skipUnless(
+ 'sample' in [plugin for plugin, _config in get_enabled_plugins()],
+ 'sample plugin is not enabled, skip this test!')
+class PluginTests(unittest.TestCase):
+
+ def setUp(self):
+ self.request = partial(utils.request, host, ssl_port)
+
+ def _create_rectangle(self, name, length, width):
+ req = json.dumps({'name': name, 'length': length, 'width': width})
+ resp = self.request('/plugins/sample/rectangles', req, 'POST')
+ return resp
+
+ def _get_rectangle(self, name):
+ resp = self.request('/plugins/sample/rectangles/%s' % name)
+ return json.loads(resp.read())
+
+ def _create_rectangle_and_assert(self, name, length, width):
+ resp = self._create_rectangle(name, length, width)
+ self.assertEquals(201, resp.status)
+
+ rectangle = self._get_rectangle(name)
+ self.assertEquals(rectangle['name'], name)
+ self.assertEquals(rectangle['length'], length)
+ self.assertEquals(rectangle['width'], width)
+
+ def _get_rectangles_list(self):
+ resp = self.request('/plugins/sample/rectangles')
+ rectangles = json.loads(resp.read())
+ name_list = [rectangle['name'] for rectangle in rectangles]
+ return name_list
+
+ def test_rectangles(self):
+ # Create two new rectangles
+ self._create_rectangle_and_assert('small', 10, 8)
+ self._create_rectangle_and_assert('big', 20, 16)
+
+ # Verify they're in the list
+ name_list = self._get_rectangles_list()
+ self.assertIn('small', name_list)
+ self.assertIn('big', name_list)
+
+ # Update the big rectangle.
+ req = json.dumps({'length': 40, 'width': 30})
+ resp = self.request('/plugins/sample/rectangles/big', req, 'PUT')
+ self.assertEquals(200, resp.status)
+ big = self._get_rectangle('big')
+ self.assertEquals(big['length'], 40)
+ self.assertEquals(big['width'], 30)
+
+ # Delete two rectangles
+ resp = self.request('/plugins/sample/rectangles/big', '{}', 'DELETE')
+ self.assertEquals(204, resp.status)
+ resp = self.request('/plugins/sample/rectangles/small', '{}', 'DELETE')
+ self.assertEquals(204, resp.status)
+ name_list = self._get_rectangles_list()
+ self.assertEquals([], name_list)
+
+ def test_bad_params(self):
+ # Bad name
+ resp = self._create_rectangle(1.0, 30, 40)
+ self.assertEquals(400, resp.status)
+
+ # Bad length value
+ resp = self._create_rectangle('test', -10.0, 40)
+ self.assertEquals(400, resp.status)
+
+ # Missing param for width
+ req = json.dumps({'name': 'nowidth', 'length': 40})
+ resp = self.request('/plugins/sample/rectangles', req, 'POST')
+ self.assertEquals(400, resp.status)
diff --git a/tests/test_rollbackcontext.py b/tests/test_rollbackcontext.py
new file mode 100644
index 0000000..6eac6d0
--- /dev/null
+++ b/tests/test_rollbackcontext.py
@@ -0,0 +1,99 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2014
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import unittest
+
+from wok.rollbackcontext import RollbackContext
+
+
+class FirstError(Exception):
+ '''A hypothetical exception to be raise in the test firstly.'''
+ pass
+
+
+class SecondError(Exception):
+ '''A hypothetical exception to be raise in the test secondly.'''
+ pass
+
+
+class RollbackContextTests(unittest.TestCase):
+
+ def setUp(self):
+ self._counter = 0
+
+ def _inc_counter(self):
+ self._counter += 1
+
+ def _raise(self, exception=FirstError):
+ raise exception()
+
+ def test_rollback(self):
+ with RollbackContext() as rollback:
+ rollback.prependDefer(self._inc_counter)
+ rollback.prependDefer(self._inc_counter)
+ self.assertEquals(self._counter, 2)
+
+ def test_raise(self):
+ try:
+ with RollbackContext() as rollback:
+ rollback.prependDefer(self._inc_counter)
+ rollback.prependDefer(self._inc_counter)
+ raise FirstError()
+ rollback.prependDefer(self._inc_counter)
+ except FirstError:
+ # All undo before the FirstError should be run
+ self.assertEquals(self._counter, 2)
+ else:
+ self.fail('Should have raised FirstError')
+
+ def test_raise_undo(self):
+ try:
+ with RollbackContext() as rollback:
+ rollback.prependDefer(self._inc_counter)
+ rollback.prependDefer(self._raise)
+ rollback.prependDefer(self._inc_counter)
+ except FirstError:
+ # All undo should be run
+ self.assertEquals(self._counter, 2)
+ else:
+ self.fail('Should have raised FirstError')
+
+ def test_raise_prefer_original(self):
+ try:
+ with RollbackContext() as rollback:
+ rollback.prependDefer(self._raise, SecondError)
+ raise FirstError()
+ except FirstError:
+ pass
+ except SecondError:
+ self.fail('Should have preferred FirstError to SecondError')
+ else:
+ self.fail('Should have raised FirstError')
+
+ def test_raise_prefer_first_undo(self):
+ try:
+ with RollbackContext() as rollback:
+ rollback.prependDefer(self._raise, SecondError)
+ rollback.prependDefer(self._raise, FirstError)
+ except FirstError:
+ pass
+ except SecondError:
+ self.fail('Should have preferred FirstError to SecondError')
+ else:
+ self.fail('Should have raised FirstError')
diff --git a/tests/test_server.py b/tests/test_server.py
new file mode 100644
index 0000000..636591d
--- /dev/null
+++ b/tests/test_server.py
@@ -0,0 +1,282 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2013-2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import base64
+import cherrypy
+import json
+import tempfile
+import threading
+import unittest
+from functools import partial
+
+from wok.control.base import Collection, Resource
+
+import utils
+
+
+test_server = None
+model = None
+host = None
+port = None
+ssl_port = None
+cherrypy_port = None
+tmpfile = None
+
+
+def setUpModule():
+ global test_server, model, host, port, ssl_port, cherrypy_port, tmpfile
+
+ utils.patch_auth()
+ tmpfile = tempfile.mktemp()
+ host = '127.0.0.1'
+ port = utils.get_free_port('http')
+ ssl_port = utils.get_free_port('https')
+ cherrypy_port = utils.get_free_port('cherrypy_port')
+ test_server = utils.run_server(host, port, ssl_port, test_mode=True,
+ cherrypy_port=cherrypy_port)
+
+
+def tearDownModule():
+ test_server.stop()
+
+
+class ServerTests(unittest.TestCase):
+ def setUp(self):
+ self.request = partial(utils.request, host, ssl_port)
+
+ def assertValidJSON(self, txt):
+ try:
+ json.loads(txt)
+ except ValueError:
+ self.fail("Invalid JSON: %s" % txt)
+
+ def test_server_start(self):
+ """
+ Test that we can start a server and receive HTTP:200.
+ """
+ resp = self.request('/')
+ self.assertEquals(200, resp.status)
+
+ def test_multithreaded_connection(self):
+ def worker():
+ for i in xrange(100):
+ ret = ['test']
+ self.assertEquals('test', ret[0])
+
+ threads = []
+ for i in xrange(100):
+ t = threading.Thread(target=worker)
+ t.setDaemon(True)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+
+ def test_collection(self):
+ c = Collection(model)
+
+ # The base Collection is always empty
+ cherrypy.request.method = 'GET'
+ cherrypy.request.headers['Accept'] = 'application/json'
+ self.assertEquals('[]', c.index())
+
+ # POST and DELETE raise HTTP:405 by default
+ for method in ('POST', 'DELETE'):
+ cherrypy.request.method = method
+ try:
+ c.index()
+ except cherrypy.HTTPError, e:
+ self.assertEquals(405, e.code)
+ else:
+ self.fail("Expected exception not raised")
+
+ def test_resource(self):
+ r = Resource(model)
+
+ # Test the base Resource representation
+ cherrypy.request.method = 'GET'
+ cherrypy.request.headers['Accept'] = 'application/json'
+ self.assertEquals('{}', r.index())
+
+ # POST and DELETE raise HTTP:405 by default
+ for method in ('POST', 'DELETE'):
+ cherrypy.request.method = method
+ try:
+ r.index()
+ except cherrypy.HTTPError, e:
+ self.assertEquals(405, e.code)
+ else:
+ self.fail("Expected exception not raised")
+
+ def test_404(self):
+ """
+ A non-existent path should return HTTP:404
+ """
+ url_list = ['/doesnotexist', '/tasks/blah']
+ for url in url_list:
+ resp = self.request(url)
+ self.assertEquals(404, resp.status)
+
+ # Verify it works for DELETE too
+ resp = self.request('/tasks/blah', '', 'DELETE')
+ self.assertEquals(404, resp.status)
+
+ def test_accepts(self):
+ """
+ Verify the following expectations regarding the client Accept header:
+ If omitted, default to html
+ If 'application/json', serve the rest api
+ If 'text/html', serve the UI
+ If both of the above (in any order), serve the rest api
+ If neither of the above, HTTP:406
+ """
+ resp = self.request("/", headers={})
+ location = resp.getheader('location')
+ self.assertTrue(location.endswith("login.html"))
+ resp = self.request("/login.html", headers={})
+ self.assertTrue('<!doctype html>' in resp.read().lower())
+
+ resp = self.request("/", headers={'Accept': 'application/json'})
+ self.assertValidJSON(resp.read())
+
+ resp = self.request("/", headers={'Accept': 'text/html'})
+ location = resp.getheader('location')
+ self.assertTrue(location.endswith("login.html"))
+
+ resp = self.request("/", headers={'Accept':
+ 'application/json, text/html'})
+ self.assertValidJSON(resp.read())
+
+ resp = self.request("/", headers={'Accept':
+ 'text/html, application/json'})
+ self.assertValidJSON(resp.read())
+
+ h = {'Accept': 'text/plain'}
+ resp = self.request('/', None, 'GET', h)
+ self.assertEquals(406, resp.status)
+
+ def test_auth_unprotected(self):
+ hdrs = {'AUTHORIZATION': ''}
+ uris = ['/js/wok.min.js',
+ '/css/theme-default.min.css',
+ '/images/favicon.png',
+ '/libs/jquery/jquery.min.js',
+ '/login.html',
+ '/logout']
+
+ for uri in uris:
+ resp = self.request(uri, None, 'HEAD', hdrs)
+ self.assertEquals(200, resp.status)
+
+ def test_auth_protected(self):
+ hdrs = {'AUTHORIZATION': ''}
+ uris = ['/tasks']
+
+ for uri in uris:
+ resp = self.request(uri, None, 'GET', hdrs)
+ self.assertEquals(401, resp.status)
+
+ def test_auth_bad_creds(self):
+ # Test HTTPBA
+ hdrs = {'AUTHORIZATION': "Basic " + base64.b64encode("nouser:badpass")}
+ resp = self.request('/tasks', None, 'GET', hdrs)
+ self.assertEquals(401, resp.status)
+
+ # Test REST API
+ hdrs = {'AUTHORIZATION': ''}
+ req = json.dumps({'username': 'nouser', 'password': 'badpass'})
+ resp = self.request('/login', req, 'POST', hdrs)
+ self.assertEquals(401, resp.status)
+
+ def test_auth_browser_no_httpba(self):
+ # Kimchi detects REST requests from the browser by looking for a
+ # specific header
+ hdrs = {"X-Requested-With": "XMLHttpRequest"}
+
+ # Try our request (Note that request() will add a valid HTTPBA header)
+ resp = self.request('/tasks', None, 'GET', hdrs)
+ self.assertEquals(401, resp.status)
+ self.assertEquals(None, resp.getheader('WWW-Authenticate'))
+
+ def test_auth_session(self):
+ hdrs = {'AUTHORIZATION': '',
+ 'Content-Type': 'application/json',
+ 'Accept': 'application/json'}
+
+ # Test we are logged out
+ resp = self.request('/tasks', None, 'GET', hdrs)
+ self.assertEquals(401, resp.status)
+
+ # Execute a login call
+ user, pw = utils.fake_user.items()[0]
+ req = json.dumps({'username': user, 'password': pw})
+ resp = self.request('/login', req, 'POST', hdrs)
+ self.assertEquals(200, resp.status)
+
+ user_info = json.loads(resp.read())
+ self.assertEquals(sorted(user_info.keys()),
+ ['groups', 'roles', 'username'])
+ roles = user_info['roles']
+ for tab, role in roles.iteritems():
+ self.assertEquals(role, u'admin')
+
+ cookie = resp.getheader('set-cookie')
+ hdrs['Cookie'] = cookie
+
+ # Test we are logged in with the cookie
+ resp = self.request('/tasks', None, 'GET', hdrs)
+ self.assertEquals(200, resp.status)
+
+ # Execute a logout call
+ resp = self.request('/logout', '{}', 'POST', hdrs)
+ self.assertEquals(200, resp.status)
+ del hdrs['Cookie']
+
+ # Test we are logged out
+ resp = self.request('/tasks', None, 'GET', hdrs)
+ self.assertEquals(401, resp.status)
+
+ # TODO: uncomment and adapt when some wok API accepts parameters to test
+# def test_get_param(self):
+# # Create a mock ISO file
+# mockiso = '/tmp/mock.iso'
+# open('/tmp/mock.iso', 'w').close()
+#
+# # Create 2 different templates
+# req = json.dumps({'name': 'test-tmpl1', 'cdrom': mockiso})
+# self.request('/plugins/kimchi/templates', req, 'POST')
+#
+# req = json.dumps({'name': 'test-tmpl2', 'cdrom': mockiso})
+# self.request('/plugins/kimchi/templates', req, 'POST')
+#
+# # Remove mock iso
+# os.unlink(mockiso)
+#
+# # Get the templates
+# resp = self.request('/plugins/kimchi/templates')
+# self.assertEquals(200, resp.status)
+# res = json.loads(resp.read())
+# self.assertEquals(2, len(res))
+#
+# # Get a specific template
+# resp = self.request('/plugins/kimchi/templates?name=test-tmpl1')
+# self.assertEquals(200, resp.status)
+# res = json.loads(resp.read())
+# self.assertEquals(1, len(res))
+# self.assertEquals('test-tmpl1', res[0]['name'])
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..bcb14e2
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,69 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import unittest
+
+from wok.exception import InvalidParameter
+from wok.utils import convert_data_size
+
+
+class UtilsTests(unittest.TestCase):
+ def test_convert_data_size(self):
+ failure_data = [{'val': None, 'from': 'MiB'},
+ {'val': self, 'from': 'MiB'},
+ {'val': 1, 'from': None},
+ {'val': 1, 'from': ''},
+ {'val': 1, 'from': 'foo'},
+ {'val': 1, 'from': 'kib'},
+ {'val': 1, 'from': 'MiB', 'to': None},
+ {'val': 1, 'from': 'MiB', 'to': ''},
+ {'val': 1, 'from': 'MiB', 'to': 'foo'},
+ {'val': 1, 'from': 'MiB', 'to': 'kib'}]
+
+ for d in failure_data:
+ if 'to' in d:
+ self.assertRaises(InvalidParameter, convert_data_size,
+ d['val'], d['from'], d['to'])
+ else:
+ self.assertRaises(InvalidParameter, convert_data_size,
+ d['val'], d['from'])
+
+ success_data = [{'got': convert_data_size(5, 'MiB', 'MiB'),
+ 'want': 5},
+ {'got': convert_data_size(5, 'MiB', 'KiB'),
+ 'want': 5120},
+ {'got': convert_data_size(5, 'MiB', 'M'),
+ 'want': 5.24288},
+ {'got': convert_data_size(5, 'MiB', 'GiB'),
+ 'want': 0.0048828125},
+ {'got': convert_data_size(5, 'MiB', 'Tb'),
+ 'want': 4.194304e-05},
+ {'got': convert_data_size(5, 'KiB', 'MiB'),
+ 'want': 0.0048828125},
+ {'got': convert_data_size(5, 'M', 'MiB'),
+ 'want': 4.76837158203125},
+ {'got': convert_data_size(5, 'GiB', 'MiB'),
+ 'want': 5120},
+ {'got': convert_data_size(5, 'Tb', 'MiB'),
+ 'want': 596046.4477539062},
+ {'got': convert_data_size(5, 'MiB'),
+ 'want': convert_data_size(5, 'MiB', 'B')}]
+
+ for d in success_data:
+ self.assertEquals(d['got'], d['want'])
diff --git a/tests/utils.py b/tests/utils.py
new file mode 100644
index 0000000..1c6ee12
--- /dev/null
+++ b/tests/utils.py
@@ -0,0 +1,259 @@
+#
+# Project Kimchi
+#
+# Copyright IBM, Corp. 2013-2015
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+
+import base64
+import cherrypy
+import grp
+import httplib
+import inspect
+import json
+import os
+import socket
+import ssl
+import sys
+import threading
+import time
+import unittest
+from contextlib import closing
+from lxml import etree
+
+import wok.server
+from wok.config import config, PluginPaths
+from wok.auth import User, USER_NAME, USER_GROUPS, USER_ROLES, tabs
+from wok.exception import NotFoundError, OperationFailed
+from wok.utils import wok_log
+
+
+_ports = {}
+fake_user = {'test': 'passw0rd'}
+
+# provide missing unittest decorators and API for python 2.6; these decorators
+# do not actually work, just avoid the syntax failure
+if sys.version_info[:2] == (2, 6):
+ def skipUnless(condition, reason):
+ if not condition:
+ sys.stderr.write('[expected failure] ')
+ raise Exception(reason)
+ return lambda obj: obj
+
+ unittest.skipUnless = skipUnless
+ unittest.expectedFailure = lambda obj: obj
+
+ def assertGreater(self, a, b, msg=None):
+ if not a > b:
+ self.fail('%s not greater than %s' % (repr(a), repr(b)))
+
+ def assertGreaterEqual(self, a, b, msg=None):
+ if not a >= b:
+ self.fail('%s not greater than or equal to %s'
+ % (repr(a), repr(b)))
+
+ def assertIsInstance(self, obj, cls, msg=None):
+ if not isinstance(obj, cls):
+ self.fail('%s is not an instance of %r' % (repr(obj), cls))
+
+ def assertIn(self, a, b, msg=None):
+ if a not in b:
+ self.fail("%s is not in %b" % (repr(a), repr(b)))
+
+ def assertNotIn(self, a, b, msg=None):
+ if a in b:
+ self.fail("%s is in %b" % (repr(a), repr(b)))
+
+ unittest.TestCase.assertGreaterEqual = assertGreaterEqual
+ unittest.TestCase.assertGreater = assertGreater
+ unittest.TestCase.assertIsInstance = assertIsInstance
+ unittest.TestCase.assertIn = assertIn
+ unittest.TestCase.assertNotIn = assertNotIn
+
+
+def get_free_port(name='http'):
+ global _ports
+ if _ports.get(name) is not None:
+ return _ports[name]
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ with closing(sock):
+ try:
+ sock.bind(("0.0.0.0", 0))
+ except:
+ raise Exception("Could not find a free port")
+ _ports[name] = sock.getsockname()[1]
+ return _ports[name]
+
+
+def run_server(host, port, ssl_port, test_mode, cherrypy_port=None,
+ model=None, environment='development'):
+
+ if cherrypy_port is None:
+ cherrypy_port = get_free_port('cherrypy_port')
+
+ if ssl_port is None:
+ ssl_port = get_free_port('https')
+
+ args = type('_', (object,),
+ {'host': host, 'port': port, 'ssl_port': ssl_port,
+ 'cherrypy_port': cherrypy_port, 'max_body_size': '4*1024',
+ 'ssl_cert': '', 'ssl_key': '',
+ 'test': test_mode, 'access_log': '/dev/null',
+ 'error_log': '/dev/null', 'environment': environment,
+ 'log_level': 'debug'})()
+ if model is not None:
+ setattr(args, 'model', model)
+
+ s = wok.server.Server(args)
+ t = threading.Thread(target=s.start)
+ t.setDaemon(True)
+ t.start()
+ cherrypy.engine.wait(cherrypy.engine.states.STARTED)
+ return s
+
+
+def silence_server():
+ """
+ Silence server status messages on stdout
+ """
+ cherrypy.config.update({"environment": "embedded"})
+
+
+def running_as_root():
+ return os.geteuid() == 0
+
+
+def _request(conn, path, data, method, headers):
+ if headers is None:
+ headers = {'Content-Type': 'application/json',
+ 'Accept': 'application/json'}
+ if 'AUTHORIZATION' not in headers.keys():
+ user, pw = fake_user.items()[0]
+ hdr = "Basic " + base64.b64encode("%s:%s" % (user, pw))
+ headers['AUTHORIZATION'] = hdr
+ conn.request(method, path, data, headers)
+ return conn.getresponse()
+
+
+def request(host, port, path, data=None, method='GET', headers=None):
+ # verify if HTTPSConnection has context parameter
+ if "context" in inspect.getargspec(httplib.HTTPSConnection.__init__).args:
+ context = ssl._create_unverified_context()
+ conn = httplib.HTTPSConnection(host, port, context=context)
+ else:
+ conn = httplib.HTTPSConnection(host, port)
+
+ return _request(conn, path, data, method, headers)
+
+
+def get_remote_iso_path():
+ """
+ Get a remote iso with the right arch from the distro files shipped
+ with kimchi.
+ """
+ host_arch = os.uname()[4]
+ remote_path = ''
+ with open(os.path.join(PluginPaths('kimchi').conf_dir, 'distros.d',
+ 'fedora.json')) as fedora_isos:
+ # Get a list of dicts
+ json_isos_list = json.load(fedora_isos)
+ for iso in json_isos_list:
+ if (iso.get('os_arch')) == host_arch:
+ remote_path = iso.get('path')
+ break
+
+ return remote_path
+
+
+class FakeUser(User):
+ auth_type = "fake"
+ sudo = True
+
+ def __init__(self, username):
+ self.user = {}
+ self.user[USER_NAME] = username
+ self.user[USER_GROUPS] = None
+ self.user[USER_ROLES] = dict.fromkeys(tabs, 'user')
+
+ def get_groups(self):
+ return sorted([group.gr_name for group in grp.getgrall()])[0:3]
+
+ def get_roles(self):
+ if self.sudo:
+ self.user[USER_ROLES] = dict.fromkeys(tabs, 'admin')
+ return self.user[USER_ROLES]
+
+ def get_user(self):
+ return self.user
+
+ @staticmethod
+ def authenticate(username, password, service="passwd"):
+ try:
+ return fake_user[username] == password
+ except KeyError, e:
+ raise OperationFailed("WOKAUTH0001E", {'username': 'username',
+ 'code': e.message})
+
+
+def patch_auth(sudo=True):
+ """
+ Override the authenticate function with a simple test against an
+ internal dict of users and passwords.
+ """
+ config.set("authentication", "method", "fake")
+ FakeUser.sudo = sudo
+
+
+def normalize_xml(xml_str):
+ return etree.tostring(etree.fromstring(xml_str,
+ etree.XMLParser(remove_blank_text=True)))
+
+
+def wait_task(task_lookup, taskid, timeout=10):
+ for i in range(0, timeout):
+ task_info = task_lookup(taskid)
+ if task_info['status'] == "running":
+ wok_log.info("Waiting task %s, message: %s",
+ taskid, task_info['message'])
+ time.sleep(1)
+ else:
+ return
+ wok_log.error("Timeout while process long-run task, "
+ "try to increase timeout value.")
+
+
+# The action functions in model backend raise NotFoundError exception if the
+# element is not found. But in some tests, these functions are called after
+# the element has been deleted if test finishes correctly, then NofFoundError
+# exception is raised and rollback breaks. To avoid it, this wrapper ignores
+# the NotFoundError.
+def rollback_wrapper(func, resource, *args):
+ try:
+ func(resource, *args)
+ except NotFoundError:
+ # VM has been deleted already
+ return
+
+
+# This function is used to test storage volume upload.
+# If we use self.request, we may encode multipart formdata by ourselves
+# requests lib take care of encode part, so use this lib instead
+def fake_auth_header():
+ headers = {'Accept': 'application/json'}
+ user, pw = fake_user.items()[0]
+ hdr = "Basic " + base64.b64encode("%s:%s" % (user, pw))
+ headers['AUTHORIZATION'] = hdr
+ return headers
--
2.4.3
More information about the Kimchi-devel
mailing list