[Kimchi-devel] [PATCH 1/3] Issue #377: Validate repository URLs

Crístian Viana vianac at linux.vnet.ibm.com
Mon Aug 4 13:19:36 UTC 2014


The repository URLs have a specific syntax and should point to a valid
destination. Currently, Kimchi does not check those constraints before
creating and updating a repository.

Add URL validation before creating and updating a repository. The URLs
must have one of the valid protocols (i.e. http://, https://, ftp://,
file://) and the URL address must point to a valid destination. That
means that if the URL protocol is file://, it should point to an
existing file on the system, otherwise it should point to an available
remote location.

Signed-off-by: Crístian Viana <vianac at linux.vnet.ibm.com>
---
 src/kimchi/mockmodel.py    | 41 ++++++++++++++++++++++++++++++++++++++---
 src/kimchi/repositories.py | 29 ++++++++++++++++++++++++++++-
 tests/test_model.py        | 42 ++++++++++++++++++++++++++++++++++++++++--
 tests/test_rest.py         | 39 ++++++++++++++++++++++++++++++++++++++-
 4 files changed, 144 insertions(+), 7 deletions(-)

diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index a42f2dd..f093bce 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -52,7 +52,7 @@ from kimchi.model.storageservers import STORAGE_SERVERS
 from kimchi.model.utils import get_vm_name
 from kimchi.objectstore import ObjectStore
 from kimchi.screenshot import VMScreenshot
-from kimchi.utils import pool_name_from_uri
+from kimchi.utils import check_url_path, pool_name_from_uri
 from kimchi.utils import template_name_from_uri
 from kimchi.vmtemplate import VMTemplate
 
@@ -60,6 +60,22 @@ from kimchi.vmtemplate import VMTemplate
 fake_user = {'admin': 'letmein!'}
 
 
+def _validate_repo_url(url):
+    url_parts = url.split('://') # [0] = prefix, [1] = rest of URL
+
+    if url_parts[0] == '':
+        raise InvalidParameter("KCHREPOS0002E")
+
+    if url_parts[0] in ['http', 'https', 'ftp']:
+        if not check_url_path(url):
+            raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+    elif url_parts[0] == 'file':
+        if not os.path.exists(url_parts[1]):
+            raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+    else:
+        raise InvalidParameter("KCHREPOS0002E")
+
+
 class MockModel(object):
     def __init__(self, objstore_loc=None):
         self.reset()
@@ -1241,13 +1257,22 @@ class MockRepositories(object):
         # Create and enable the repository
         repo_id = params['repo_id']
         config = params.get('config', {})
+        baseurl = params.get('baseurl')
+        mirrorlist = config.get('mirrorlist', "")
+
+        if baseurl:
+            _validate_repo_url(baseurl)
+
+        if mirrorlist:
+            _validate_repo_url(mirrorlist)
+
         repo = {'repo_id': repo_id,
-                'baseurl': params.get('baseurl'),
+                'baseurl': baseurl,
                 'enabled': True,
                 'config': {'repo_name': config.get('repo_name', repo_id),
                            'gpgkey': config.get('gpgkey', []),
                            'gpgcheck': True,
-                           'mirrorlist': config.get('mirrorlist', "")}
+                           'mirrorlist': mirrorlist}
                 }
 
         self._repos[repo_id] = repo
@@ -1292,6 +1317,16 @@ class MockRepositories(object):
         if repo_id not in self._repos.keys():
             raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
 
+        baseurl = params.get('baseurl', None)
+        config = params.get('config', {})
+        mirrorlist = config.get('mirrorlist', None)
+
+        if baseurl:
+            _validate_repo_url(baseurl)
+
+        if mirrorlist:
+            _validate_repo_url(mirrorlist)
+
         info = self._repos[repo_id]
         info.update(params)
         del self._repos[repo_id]
diff --git a/src/kimchi/repositories.py b/src/kimchi/repositories.py
index 414f146..5ac8d1e 100644
--- a/src/kimchi/repositories.py
+++ b/src/kimchi/repositories.py
@@ -25,8 +25,25 @@ from ConfigParser import ConfigParser
 
 from kimchi.basemodel import Singleton
 from kimchi.config import kimchiLock
-from kimchi.exception import InvalidOperation
+from kimchi.exception import InvalidOperation, InvalidParameter
 from kimchi.exception import OperationFailed, NotFoundError, MissingParameter
+from kimchi.utils import check_url_path
+
+
+def _validate_repo_url(url):
+    url_parts = url.split('://') # [0] = prefix, [1] = rest of URL
+
+    if url_parts[0] == '':
+        raise InvalidParameter("KCHREPOS0002E")
+
+    if url_parts[0] in ['http', 'https', 'ftp']:
+        if not check_url_path(url):
+            raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+    elif url_parts[0] == 'file':
+        if not os.path.exists(url_parts[1]):
+            raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+    else:
+        raise InvalidParameter("KCHREPOS0002E")
 
 
 class Repositories(object):
@@ -177,6 +194,12 @@ class YumRepo(object):
         if not baseurl and not mirrorlist:
             raise MissingParameter("KCHREPOS0013E")
 
+        if baseurl:
+            _validate_repo_url(baseurl)
+
+        if mirrorlist:
+            _validate_repo_url(mirrorlist)
+
         repo_id = params.get('repo_id', None)
         if repo_id is None:
             repo_id = "kimchi_repo_%s" % str(int(time.time() * 1000))
@@ -260,12 +283,14 @@ class YumRepo(object):
         mirrorlist = config.get('mirrorlist', None)
 
         if baseurl is not None:
+            _validate_repo_url(baseurl)
             entry.baseurl = baseurl
 
         if mirrorlist == '':
             mirrorlist = None
 
         if mirrorlist is not None:
+            _validate_repo_url(mirrorlist)
             entry.mirrorlist = mirrorlist
 
         entry.id = params.get('repo_id', repo_id)
@@ -414,6 +439,8 @@ class AptRepo(object):
         dist = config['dist']
         comps = config.get('comps', [])
 
+        _validate_repo_url(uri)
+
         kimchiLock.acquire()
         try:
             repos = self._get_repos()
diff --git a/tests/test_model.py b/tests/test_model.py
index 9cfa312..85b11fa 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -1189,7 +1189,7 @@ class ModelTests(unittest.TestCase):
                       'baseurl': 'http://www.fedora.org'},
                      {'repo_id': 'fedora-updates-fake',
                       'config':
-                      {'mirrorlist': 'http://www.fedora.org/updates',
+                      {'mirrorlist': 'http://www.fedoraproject.org',
                        'gpgkey': 'file:///tmp/KEY-fedora-updates-fake-19'}}]
 
         deb_repos = [{'baseurl': 'http://br.archive.ubuntu.com/kimchi/fake',
@@ -1207,6 +1207,27 @@ class ModelTests(unittest.TestCase):
             # skip test case
             return
 
+        invalid_urls = [
+                'www.fedora.org',               # missing protocol
+                '://www.fedora.org',            # missing protocol
+                'http://www.fedora',            # invalid domain name
+                'file:///home/userdoesnotexist' # invalid path
+        ]
+
+        # create repositories with invalid baseurl
+        for url in invalid_urls:
+            repo = { 'repo_id': 'repo_fake',
+                     'baseurl': url,
+                     'config': { 'dist': 'quantal' } }
+            self.assertRaises(InvalidParameter, inst.repositories_create, repo)
+
+        # create repositories with invalid mirrorlist
+        for url in invalid_urls:
+            repo = { 'repo_id': 'repo_fake',
+                     'config': { 'mirrorlist': url,
+                                 'dist': 'quantal' } }
+            self.assertRaises(InvalidParameter, inst.repositories_create, repo)
+
         for repo in test_repos:
             system_host_repos = len(inst.repositories_get_list())
             repo_id = inst.repositories_create(repo)
@@ -1242,7 +1263,7 @@ class ModelTests(unittest.TestCase):
 
         yum_repo = {'repo_id': 'fedora-fake',
                     'baseurl': 'http://www.fedora.org'}
-        yum_new_repo = {'baseurl': 'http://www.fedora.org/updates'}
+        yum_new_repo = {'baseurl': 'http://www.fedoraproject.org'}
 
         deb_repo = {'baseurl': 'http://br.archive.ubuntu.com/kimchi/fake',
                     'config': {'dist': 'quantal'}}
@@ -1266,6 +1287,23 @@ class ModelTests(unittest.TestCase):
         host_repos = inst.repositories_get_list()
         self.assertEquals(system_host_repos + 1, len(host_repos))
 
+        invalid_urls = [
+                'www.fedora.org',               # missing protocol
+                '://www.fedora.org',            # missing protocol
+                'http://www.fedora',            # invalid domain name
+                'file:///home/userdoesnotexist' # invalid path
+        ]
+
+        # update repositories with invalid baseurl
+        for url in invalid_urls:
+            wrong_repo = { 'baseurl': url }
+            self.assertRaises(InvalidParameter, inst.repository_update, repo_id, wrong_repo)
+
+        # update repositories with invalid mirrorlist
+        for url in invalid_urls:
+            wrong_repo = { 'config': { 'mirrorlist': url } }
+            self.assertRaises(InvalidParameter, inst.repository_update, repo_id, wrong_repo)
+
         new_repo_id = inst.repository_update(repo_id, new_repo)
         repo_info = inst.repository_lookup(new_repo_id)
 
diff --git a/tests/test_rest.py b/tests/test_rest.py
index 1455205..bcb85f5 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -1771,6 +1771,27 @@ class RestTests(unittest.TestCase):
         # Already have one repo in Kimchi's system
         self.assertEquals(1, len(json.loads(resp.read())))
 
+        invalid_urls = [
+                'www.fedora.org',               # missing protocol
+                '://www.fedora.org',            # missing protocol
+                'http://www.fedora',            # invalid domain name
+                'file:///home/userdoesnotexist' # invalid path
+        ]
+
+        # Create repositories with invalid baseurl
+        for url in invalid_urls:
+            repo = { 'repo_id': 'fedora-fake', 'baseurl': url }
+            req = json.dumps(repo)
+            resp = self.request(base_uri, req, 'POST')
+            self.assertEquals(400, resp.status)
+
+        # Create repositories with invalid mirrorlist
+        for url in invalid_urls:
+            repo = { 'repo_id': 'fedora-fake', 'mirrorlist': url }
+            req = json.dumps(repo)
+            resp = self.request(base_uri, req, 'POST')
+            self.assertEquals(400, resp.status)
+
         # Create a repository
         repo = {'repo_id': 'fedora-fake',
                 'baseurl': 'http://www.fedora.org'}
@@ -1782,9 +1803,25 @@ class RestTests(unittest.TestCase):
         res = json.loads(self.request('%s/fedora-fake' % base_uri).read())
         verify_repo(repo, res)
 
+        # Update repositories with invalid baseurl
+        for url in invalid_urls:
+            params = {}
+            params['baseurl'] = url
+            resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
+                                'PUT')
+            self.assertEquals(400, resp.status)
+
+        # Update repositories with invalid mirrorlist
+        for url in invalid_urls:
+            params = {}
+            params['mirrorlist'] = url
+            resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
+                                'PUT')
+            self.assertEquals(400, resp.status)
+
         # Update the repository
         params = {}
-        params['baseurl'] = repo['baseurl'] = 'http://www.fedora.org/update'
+        params['baseurl'] = repo['baseurl'] = 'http://www.fedoraproject.org'
         resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
                             'PUT')
 
-- 
1.9.3




More information about the Kimchi-devel mailing list