The repository URLs have a specific syntax and should point to a valid
destination. Currently, Kimchi does not check those constraints before
creating and updating a repository.
Add URL validation before creating and updating a repository. The URLs
must have one of the valid protocols (i.e. http://, https://, ftp://,
file://) and the URL address must point to a valid destination. That
means that if the URL protocol is file://, it should point to an
existing file on the system, otherwise it should point to an available
remote location.
Signed-off-by: Crístian Viana <vianac(a)linux.vnet.ibm.com>
---
src/kimchi/mockmodel.py | 25 ++++++++++++++++++++++---
src/kimchi/repositories.py | 11 +++++++++++
src/kimchi/utils.py | 15 +++++++++++++++
tests/test_model.py | 42 ++++++++++++++++++++++++++++++++++++++++--
tests/test_rest.py | 39 ++++++++++++++++++++++++++++++++++++++-
5 files changed, 126 insertions(+), 6 deletions(-)
diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index a42f2dd..4f267bb 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -52,7 +52,7 @@ from kimchi.model.storageservers import STORAGE_SERVERS
from kimchi.model.utils import get_vm_name
from kimchi.objectstore import ObjectStore
from kimchi.screenshot import VMScreenshot
-from kimchi.utils import pool_name_from_uri
+from kimchi.utils import pool_name_from_uri, validate_repo_url
from kimchi.utils import template_name_from_uri
from kimchi.vmtemplate import VMTemplate
@@ -1241,13 +1241,22 @@ class MockRepositories(object):
# Create and enable the repository
repo_id = params['repo_id']
config = params.get('config', {})
+ baseurl = params.get('baseurl')
+ mirrorlist = config.get('mirrorlist', "")
+
+ if baseurl:
+ validate_repo_url(baseurl)
+
+ if mirrorlist:
+ validate_repo_url(mirrorlist)
+
repo = {'repo_id': repo_id,
- 'baseurl': params.get('baseurl'),
+ 'baseurl': baseurl,
'enabled': True,
'config': {'repo_name': config.get('repo_name',
repo_id),
'gpgkey': config.get('gpgkey', []),
'gpgcheck': True,
- 'mirrorlist': config.get('mirrorlist',
"")}
+ 'mirrorlist': mirrorlist}
}
self._repos[repo_id] = repo
@@ -1292,6 +1301,16 @@ class MockRepositories(object):
if repo_id not in self._repos.keys():
raise NotFoundError("KCHREPOS0012E", {'repo_id': repo_id})
+ baseurl = params.get('baseurl', None)
+ config = params.get('config', {})
+ mirrorlist = config.get('mirrorlist', None)
+
+ if baseurl:
+ validate_repo_url(baseurl)
+
+ if mirrorlist:
+ validate_repo_url(mirrorlist)
+
info = self._repos[repo_id]
info.update(params)
del self._repos[repo_id]
diff --git a/src/kimchi/repositories.py b/src/kimchi/repositories.py
index 414f146..5a0c58a 100644
--- a/src/kimchi/repositories.py
+++ b/src/kimchi/repositories.py
@@ -27,6 +27,7 @@ from kimchi.basemodel import Singleton
from kimchi.config import kimchiLock
from kimchi.exception import InvalidOperation
from kimchi.exception import OperationFailed, NotFoundError, MissingParameter
+from kimchi.utils import validate_repo_url
class Repositories(object):
@@ -177,6 +178,12 @@ class YumRepo(object):
if not baseurl and not mirrorlist:
raise MissingParameter("KCHREPOS0013E")
+ if baseurl:
+ validate_repo_url(baseurl)
+
+ if mirrorlist:
+ validate_repo_url(mirrorlist)
+
repo_id = params.get('repo_id', None)
if repo_id is None:
repo_id = "kimchi_repo_%s" % str(int(time.time() * 1000))
@@ -260,12 +267,14 @@ class YumRepo(object):
mirrorlist = config.get('mirrorlist', None)
if baseurl is not None:
+ validate_repo_url(baseurl)
entry.baseurl = baseurl
if mirrorlist == '':
mirrorlist = None
if mirrorlist is not None:
+ validate_repo_url(mirrorlist)
entry.mirrorlist = mirrorlist
entry.id = params.get('repo_id', repo_id)
@@ -414,6 +423,8 @@ class AptRepo(object):
dist = config['dist']
comps = config.get('comps', [])
+ _validate_repo_url(uri)
+
kimchiLock.acquire()
try:
repos = self._get_repos()
diff --git a/src/kimchi/utils.py b/src/kimchi/utils.py
index c070c00..ad458ac 100644
--- a/src/kimchi/utils.py
+++ b/src/kimchi/utils.py
@@ -280,3 +280,18 @@ def probe_file_permission_as_user(file, user):
p.start()
p.join()
return queue.get()
+
+def validate_repo_url(url):
+ url_parts = url.split('://') # [0] = prefix, [1] = rest of URL
+
+ if url_parts[0] == '':
+ raise InvalidParameter("KCHREPOS0002E")
+
+ if url_parts[0] in ['http', 'https', 'ftp']:
+ if not check_url_path(url):
+ raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+ elif url_parts[0] == 'file':
+ if not os.path.exists(url_parts[1]):
+ raise InvalidParameter("KCHUTILS0001E", {'uri': url})
+ else:
+ raise InvalidParameter("KCHREPOS0002E")
diff --git a/tests/test_model.py b/tests/test_model.py
index 9cfa312..85b11fa 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -1189,7 +1189,7 @@ class ModelTests(unittest.TestCase):
'baseurl': 'http://www.fedora.org'},
{'repo_id': 'fedora-updates-fake',
'config':
- {'mirrorlist': 'http://www.fedora.org/updates',
+ {'mirrorlist': 'http://www.fedoraproject.org',
'gpgkey':
'file:///tmp/KEY-fedora-updates-fake-19'}}]
deb_repos = [{'baseurl':
'http://br.archive.ubuntu.com/kimchi/fake',
@@ -1207,6 +1207,27 @@ class ModelTests(unittest.TestCase):
# skip test case
return
+ invalid_urls = [
+ 'www.fedora.org', # missing protocol
+ '://www.fedora.org', # missing protocol
+ 'http://www.fedora', # invalid domain name
+ 'file:///home/userdoesnotexist' # invalid path
+ ]
+
+ # create repositories with invalid baseurl
+ for url in invalid_urls:
+ repo = { 'repo_id': 'repo_fake',
+ 'baseurl': url,
+ 'config': { 'dist': 'quantal' } }
+ self.assertRaises(InvalidParameter, inst.repositories_create, repo)
+
+ # create repositories with invalid mirrorlist
+ for url in invalid_urls:
+ repo = { 'repo_id': 'repo_fake',
+ 'config': { 'mirrorlist': url,
+ 'dist': 'quantal' } }
+ self.assertRaises(InvalidParameter, inst.repositories_create, repo)
+
for repo in test_repos:
system_host_repos = len(inst.repositories_get_list())
repo_id = inst.repositories_create(repo)
@@ -1242,7 +1263,7 @@ class ModelTests(unittest.TestCase):
yum_repo = {'repo_id': 'fedora-fake',
'baseurl': 'http://www.fedora.org'}
- yum_new_repo = {'baseurl': 'http://www.fedora.org/updates'}
+ yum_new_repo = {'baseurl': 'http://www.fedoraproject.org'}
deb_repo = {'baseurl':
'http://br.archive.ubuntu.com/kimchi/fake',
'config': {'dist': 'quantal'}}
@@ -1266,6 +1287,23 @@ class ModelTests(unittest.TestCase):
host_repos = inst.repositories_get_list()
self.assertEquals(system_host_repos + 1, len(host_repos))
+ invalid_urls = [
+ 'www.fedora.org', # missing protocol
+ '://www.fedora.org', # missing protocol
+ 'http://www.fedora', # invalid domain name
+ 'file:///home/userdoesnotexist' # invalid path
+ ]
+
+ # update repositories with invalid baseurl
+ for url in invalid_urls:
+ wrong_repo = { 'baseurl': url }
+ self.assertRaises(InvalidParameter, inst.repository_update, repo_id,
wrong_repo)
+
+ # update repositories with invalid mirrorlist
+ for url in invalid_urls:
+ wrong_repo = { 'config': { 'mirrorlist': url } }
+ self.assertRaises(InvalidParameter, inst.repository_update, repo_id,
wrong_repo)
+
new_repo_id = inst.repository_update(repo_id, new_repo)
repo_info = inst.repository_lookup(new_repo_id)
diff --git a/tests/test_rest.py b/tests/test_rest.py
index 1455205..bcb85f5 100644
--- a/tests/test_rest.py
+++ b/tests/test_rest.py
@@ -1771,6 +1771,27 @@ class RestTests(unittest.TestCase):
# Already have one repo in Kimchi's system
self.assertEquals(1, len(json.loads(resp.read())))
+ invalid_urls = [
+ 'www.fedora.org', # missing protocol
+ '://www.fedora.org', # missing protocol
+ 'http://www.fedora', # invalid domain name
+ 'file:///home/userdoesnotexist' # invalid path
+ ]
+
+ # Create repositories with invalid baseurl
+ for url in invalid_urls:
+ repo = { 'repo_id': 'fedora-fake', 'baseurl': url }
+ req = json.dumps(repo)
+ resp = self.request(base_uri, req, 'POST')
+ self.assertEquals(400, resp.status)
+
+ # Create repositories with invalid mirrorlist
+ for url in invalid_urls:
+ repo = { 'repo_id': 'fedora-fake', 'mirrorlist': url
}
+ req = json.dumps(repo)
+ resp = self.request(base_uri, req, 'POST')
+ self.assertEquals(400, resp.status)
+
# Create a repository
repo = {'repo_id': 'fedora-fake',
'baseurl': 'http://www.fedora.org'}
@@ -1782,9 +1803,25 @@ class RestTests(unittest.TestCase):
res = json.loads(self.request('%s/fedora-fake' % base_uri).read())
verify_repo(repo, res)
+ # Update repositories with invalid baseurl
+ for url in invalid_urls:
+ params = {}
+ params['baseurl'] = url
+ resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
+ 'PUT')
+ self.assertEquals(400, resp.status)
+
+ # Update repositories with invalid mirrorlist
+ for url in invalid_urls:
+ params = {}
+ params['mirrorlist'] = url
+ resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
+ 'PUT')
+ self.assertEquals(400, resp.status)
+
# Update the repository
params = {}
- params['baseurl'] = repo['baseurl'] =
'http://www.fedora.org/update'
+ params['baseurl'] = repo['baseurl'] =
'http://www.fedoraproject.org'
resp = self.request('%s/fedora-fake' % base_uri, json.dumps(params),
'PUT')
--
1.9.3