On 02/13/2014 04:01 AM, Paulo Vital wrote:
Update model and mockmodel to support backend opertions.
Add new file implementing backend operations. There are two new classes:
1) Repositories (object): Class to represent and operate with repositories
information in Kimchi's perspective. It's agnostic to host;s package management
system, and can execute all operations necessary: add repository, get all
repositories list, get information about one repository, update a repository,
enable and disable a repository and remove a repository. This class will load
in runtime the necessary classes to work with the host's package management:
YumRepo for YUM systems based and AptRepo for APT systems based (support for
this last one will be submited in a close future);
2) YumRepo (object): Class to represent and operate with YUM repositories.
Loaded only on those systems that supports YUM, it's responsible to connect,
collect and provide information of YUM repositories in the system. Also it's
responsible to create/delete the files in disk to maintain the repositories in
system after disconnection.
Signed-off-by: Paulo Vital <pvital(a)linux.vnet.ibm.com>
---
src/kimchi/mockmodel.py | 140 +++++++++++
src/kimchi/model/host.py | 56 ++++-
src/kimchi/repositories.py | 590 +++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 785 insertions(+), 1 deletion(-)
create mode 100644 src/kimchi/repositories.py
diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py
index 4e276eb..0c578c8 100644
--- a/src/kimchi/mockmodel.py
+++ b/src/kimchi/mockmodel.py
@@ -77,6 +77,7 @@ class MockModel(object):
self._mock_interfaces = self.dummy_interfaces()
self.next_taskid = 1
self.storagepool_activate('default')
+ self.host_repositories = MockRepositories()
def _static_vm_update(self, dom, params):
state = dom.info['state']
@@ -665,6 +666,44 @@ class MockModel(object):
'display_proxy_port':
kconfig.get('display', 'display_proxy_port')}
+ def repositories_get_list(self):
+ return [repo_id for repo_id in
+ self.host_repositories.getRepositories().keys()]
You can return: self._host_repositories.getRepositories().keys() directly
+
+ def repositories_create(self, params):
+ repo_id = params.get('repo_id', None)
+
+ # create a repo_id based on baseurl if not given by user
+ if repo_id is None:
+ repo_id = "kimchi_repo_%s" % int(time.time())
+ params.update({'repo_id': repo_id})
+
+ if repo_id in self.repositories_get_list():
+ raise InvalidOperation("Repository %s already exists." % repo_id)
You need to try to find a useful repo_id for the user.
Otherwise, we will raise an error that he will not be aware as he didn't
provide the repo_id
if repo_id is None:
repo_id = "kimchi_repo_%s" % int(time.time())
while repo_id in self.repositories_get_list():
repo_id = "kimchi_repo_%s" % int(time.time())
Also I'd suggest to use datetime() instead of time() so we can avoid
getting a duplicated repo_id
+ self.host_repositories.addRepository(params)
+ return repo_id
+
+ def repository_lookup(self, repo_id):
+ return self.host_repositories.getRepository(repo_id)
+
+ def repository_delete(self, repo_id):
+ return self.host_repositories.removeRepository(repo_id)
+
+ def repository_enable(self, repo_id):
+ if not self.host_repositories.enableRepository(repo_id):
+ raise OperationFailed("Could not enable repository
%s." % repo_id)
+
+ def repository_disable(self, repo_id):
+ if not self.host_repositories.disableRepository(repo_id):
+ raise OperationFailed("Could not disable repository %s." %
repo_id)
+
+ def repository_update(self, repo_id, params):
+ try:
+ self.host_repositories.updateRepository(repo_id, params)
+ except:
+ raise OperationFailed("Could not update repository %s." % repo_id)
You need to adjust the errors messages according to the refactor
exception patches
+ return repo_id
+
class MockVMTemplate(VMTemplate):
def __init__(self, args, mockmodel_inst=None):
@@ -825,6 +864,107 @@ class MockVMScreenshot(VMScreenshot):
image.save(thumbnail)
+class MockRepositories(object):
+ def __init__(self):
+ self._repo_storage = {{"repo_id": "kimchi_repo_1392167832",
+ "gpgkey": None,
+ "enabled": True,
+ "baseurl": "http://www.fedora.org",
+ "url_args": None,
+ "gpgcheck": True,
+ "is_mirror": False,
+ "repo_name":
"kimchi_repo_1392167832"}}
+
+ def addRepository(self, params={}):
+ if not params:
+ raise MissingParameter("No parameters to add repository.")
This exception will never be raised as you created a default repo_id and
add it to params
+
+ # Check if already exist a repository with the same repo_id.
+ repo_id = params.get('repo_id')
+ if repo_id in self.getRepositories():
+ raise InvalidParameter("Repository %s already exists." % repo_id)
You can remove it as you create a repo_id above
+
+ # Create and enable the repository
+ repo = {'repo_id': repo_id,
+ 'repo_name': params.get('repo_name', repo_id),
+ 'baseurl': params.get('baseurl', None),
None? baseurl is a required parameter. You can assume it is in params
otherwise we will hide a issue
+ 'url_args': params.get('url_args',
None),
+ 'enabled': True,
+ 'gpgkey': params.get('gpgkey', None),
+ 'is_mirror': params.get('is_mirror', False)}
+
+ if repo['gpgkey'] is not None:
+ repo['gpgcheck'] = True
+ else:
+ repo['gpgcheck'] = False
+
+ self._repo_storage[repo_id] = repo
+
+ def getRepositories(self):
+ return self._repo_storage
+
+ def getRepository(self, repo_id):
+ if not repo_id in self._repo_storage.keys():
+ raise NotFoundError("Repository %s does not exists." % repo_id)
+
+ repo = self._repo_storage[repo_id]
+ if (isinstance(repo['baseurl'], list)) and (len(repo['baseurl'])
> 0):
+ repo['baseurl'] = repo['baseurl'][0]
+
+ return repo
+
+ def enabledRepositories(self):
+ enabled_repos = []
+ for repo_id in self._repo_storage.keys():
+ if self._repo_storage[repo_id]['enabled']:
+ enabled_repos.append(repo_id)
+ return enabled_repos
+
+ def enableRepository(self, repo_id):
+ # Check if repo_id is already enabled
+ if repo_id in self.enabledRepositories():
+ raise NotFoundError("There is no disabled repository called %s."
%
+ repo_id)
+
+ try:
+ repo = self.getRepository(repo_id)
+ repo['enabled'] = True
+ self.updateRepository(repo_id, repo)
+ return True
+ except:
+ raise OperationFailed("Could not enable repository %s" % repo_id)
+
+ def disableRepository(self, repo_id):
+ # Check if repo_id is already disabled
+ if not repo_id in self.enabledRepositories():
+ raise NotFoundError("There is no enabled repository called %s." %
+ repo_id)
+
+ try:
+ repo = self.getRepository(repo_id)
+ repo['enabled'] = False
+ self.updateRepository(repo_id, repo)
+ return True
+ except:
+ raise OperationFailed("Could not disable repository %s" %
repo_id)
+
+ def updateRepository(self, repo_id, new_repo={}):
+ if (len(new_repo) == 0):
+ raise InvalidParameter("No parameters to update repository.")
+
+ repo = self._repo_storage[repo_id]
+ repo.update(new_repo)
+ del self._repo_storage[repo_id]
+ self._repo_storage[repo_id] = repo
+
+ def removeRepository(self, repo_id):
+ if not repo_id in self._repo_storage.keys():
+ raise NotFoundError("There is no repository called %s." %
repo_id)
+
+ del self._repo_storage[repo_id]
+ return True
+
+
def get_mock_environment():
model = MockModel()
for i in xrange(5):
diff --git a/src/kimchi/model/host.py b/src/kimchi/model/host.py
index a3d9e38..414ac6a 100644
--- a/src/kimchi/model/host.py
+++ b/src/kimchi/model/host.py
@@ -31,8 +31,9 @@ from cherrypy.process.plugins import BackgroundTask
from kimchi import disks
from kimchi import netinfo
from kimchi.basemodel import Singleton
-from kimchi.exception import NotFoundError, OperationFailed
+from kimchi.exception import InvalidOperation, NotFoundError, OperationFailed
from kimchi.model.vms import DOM_STATE_MAP
+from kimchi.repositories import Repositories
from kimchi.utils import kimchi_log
@@ -199,3 +200,56 @@ class PartitionModel(object):
raise NotFoundError("Partition %s not found in the host"
% name)
return disks.get_partition_details(name)
+
+
+class RepositoriesModel(object):
+ __metaclass__ = Singleton
Just a suggestion: make the Repositories() a Singleton. As we did for
update system patches
+
+ def __init__(self, **kargs):
+ self.host_repositories = Repositories()
+
+ def get_list(self):
+ return [repo_id for repo_id in
+ self.host_repositories.getRepositories().keys()]
return self.host_repositories.getRepositories().keys() directly - it is
already a list
+
+ def create(self, params):
+ repo_id = params.get('repo_id', None)
+
+ # create a repo_id based on baseurl if not given by user
+ if repo_id is None:
+ repo_id = "kimchi_repo_%s" % int(time.time())
+ params.update({'repo_id': repo_id})
+
+ if repo_id in self.get_list():
+ raise InvalidOperation("Repository %s already exists." % repo_id)
Same I comment in mockmodel.
You need to find a valid repo_id for the user. Otherwise we will raise
an error regarding to a param he didn't provide
+ self.host_repositories.addRepository(params)
+ return repo_id
+
+ def get_repositories(self):
+ return self.host_repositories
If you make Repositories() a Singleton class you don't need this function
> +
> +
> +class RepositoryModel(object):
> + def __init__(self, **kargs):
> + self._repositories = RepositoriesModel().get_repositories()
> +
> + def lookup(self, repo_id):
> + return self._repositories.getRepository(repo_id)
> +
> + def enable(self, repo_id):
> + if not self._repositories.enableRepository(repo_id):
> + raise OperationFailed("Could not enable repository %s." %
repo_id)
> +
> + def disable(self, repo_id):
> + if not self._repositories.disableRepository(repo_id):
> + raise OperationFailed("Could not disable repository %s." %
repo_id)
> +
> + def update(self, repo_id, params):
> + try:
> + self._repositories.updateRepository(repo_id, params)
> + except:
> + raise OperationFailed("Could not update repository %s." %
repo_id)
> + return repo_id
> +
> + def delete(self, repo_id):
> + return self._repositories.removeRepository(repo_id)
> diff --git a/src/kimchi/repositories.py b/src/kimchi/repositories.py
> new file mode 100644
> index 0000000..cc885ba
> --- /dev/null
> +++ b/src/kimchi/repositories.py
> @@ -0,0 +1,590 @@
> +#
> +# Project Kimchi
> +#
> +# Copyright IBM, Corp. 2014
> +#
> +# Authors:
> +# Paulo Vital <pvital(a)linux.vnet.ibm.com>
> +#
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> +
> +import os
> +import platform
> +
> +from kimchi.exception import InvalidOperation, InvalidParameter
> +from kimchi.exception import OperationFailed, NotFoundError, MissingParameter
> +
> +YUM_DISTROS = ['fedora', 'red hat enterprise linux',
> + 'red hat enterprise linux server', 'opensuse ',
> + 'suse linux enterprise server ']
> +APT_DISTROS = ['debian', 'ubuntu']
> +
> +
> +class Repositories(object):
> + """
> + Class to represent and operate with repositories information.
> + """
> + def __init__(self):
> + # This stores all repositories for Kimchi perspective. It's a
> + # dictionary of dictionaries, in the format {<repo_id>: {repo}},
> + # where:
> + # repo = {'repo_id': <string>, 'repo_name':
<string>,
> + # 'baseurl': ([<string>], None), 'url_args':
([<string>, None),
> + # 'enabled': True/False, 'gpgcheck': True/False,
> + # 'gpgkey': ([<string>], None),
> + # 'is_mirror': True/False}
> +
> + self._repo_storage = {}
> +
> + self._distro = platform.linux_distribution()[0].lower()
> + if (self._distro in YUM_DISTROS):
> + self._pkg_mnger = YumRepo()
> + elif (self._distro in APT_DISTROS):
> + self._pkg_mnger = AptRepo()
> + else:
> + raise InvalidOperation("Distro not supported.")
> +
> + if self._pkg_mnger:
> + # update the self._repo_storage with system's repositories
> + self._scanSystemRepositories()
> +
> + def _scanSystemRepositories(self):
> + """
> + Update repositories._repo_storage with system's (host) repositories.
> + """
> + # Call system pkg_mnger to get the repositories as list of dict.
> + for repo in self._pkg_mnger.getRepositoriesList():
> + self.addRepository(repo)
> +
> + def addRepository(self, params={}):
> + """
> + Add and enable a new repository into repositories._repo_storage.
> + """
> + if not params:
> + raise MissingParameter("No parameters to add repository.")
+
+ # Check if already exist a repository with the same repo_id.
+ repo_id = params.get('repo_id')
+ if repo_id in self.getRepositories():
+ raise InvalidParameter("Repository %s already exists." % repo_id)
+
+ # Create and enable the repository
+ repo = {'repo_id': repo_id,
+ 'repo_name': params.get('repo_name', repo_id),
+ 'baseurl': params.get('baseurl', None),
> +
'url_args': params.get('url_args', None),
> + 'enabled': True,
> + 'gpgkey': params.get('gpgkey', None),
> + 'is_mirror': params.get('is_mirror', False)}
> +
> + if repo['gpgkey'] is not None:
> + repo['gpgcheck'] = True
> + else:
> + repo['gpgcheck'] = False
> +
> + self._repo_storage[repo_id] = repo
> +
> + # Check in self._pkg_mnger if the repository already exists there
> + if not repo_id in [irepo['repo_id'] for irepo in
> + self._pkg_mnger.getRepositoriesList()]:
> + self._pkg_mnger.addRepo(repo)
> +
> + def getRepositories(self):
> + """
> + Return a dictionary with all Kimchi's repositories. Each element uses
> + the format {<repo_id>: {repo}}, where repo is a dictionary in the
> + repositories.Repositories() format.
> + """
> + return self._repo_storage
> +
> + def getRepository(self, repo_id):
> + """
> + Return a dictionary with all info from a given repository ID.
> + """
> + if not repo_id in self._repo_storage.keys():
> + raise NotFoundError("Repository %s does not exists." %
repo_id)
> +
> + repo = self._repo_storage[repo_id]
> + if (isinstance(repo['baseurl'], list)) and
(len(repo['baseurl']) > 0):
> + repo['baseurl'] = repo['baseurl'][0]
> +
> + return repo
> +
> + def getRepositoryFromPkgMnger(self, repo_id):
> + """
> + Return a dictionary with all info from a given repository ID.
> + All info come from self._pkg_mnger.getRepo().
> + """
> + return self._pkg_mnger.getRepo(repo_id)
> +
> + def enabledRepositories(self):
> + """
> + Return a list with enabled repositories IDs.
> + """
> + enabled_repos = []
> + for repo_id in self._repo_storage.keys():
> + if self._repo_storage[repo_id]['enabled']:
> + enabled_repos.append(repo_id)
> + return enabled_repos
> +
> + def enableRepository(self, repo_id):
> + """
> + Enable a repository.
> + """
> + # Check if repo_id is already enabled
> + if repo_id in self.enabledRepositories():
> + raise NotFoundError("There is no disabled repository called
%s." %
> + repo_id)
> +
> + try:
> + repo = self.getRepository(repo_id)
> + repo['enabled'] = True
> + self.updateRepository(repo_id, repo)
> + self._pkg_mnger.enableRepo(repo_id)
> + return True
> + except:
> + raise OperationFailed("Could not enable repository %s" %
repo_id)
> +
> + def disableRepository(self, repo_id):
> + """
> + Disable a given repository.
> + """
> + # Check if repo_id is already disabled
> + if not repo_id in self.enabledRepositories():
> + raise NotFoundError("There is no enabled repository called
%s." %
> + repo_id)
> +
> + try:
> + repo = self.getRepository(repo_id)
> + repo['enabled'] = False
> + self.updateRepository(repo_id, repo)
> + self._pkg_mnger.disableRepo(repo_id)
> + return True
> + except:
> + raise OperationFailed("Could not disable repository %s" %
repo_id)
> +
> + def updateRepository(self, repo_id, new_repo={}):
> + """
> + Update the information of a given repository.
> + The input is the repo_id of the repository to be updated and a dict
> + with the information to be updated.
> + """
> + if (len(new_repo) == 0):
> + raise InvalidParameter("No parameters to update repository.")
> +
> + repo = self._repo_storage[repo_id]
> + repo.update(new_repo)
> +
> + del self._repo_storage[repo_id]
> + self._repo_storage[repo_id] = repo
> + self._pkg_mnger.updateRepo(repo_id, self._repo_storage[repo_id])
> +
> + def removeRepository(self, repo_id):
> + """
> + Remove a given repository
> + """
> + if not repo_id in self._repo_storage.keys():
> + raise NotFoundError("There is no repository called %s." %
repo_id)
> +
> + del self._repo_storage[repo_id]
> + self._pkg_mnger.removeRepo(repo_id)
> + return True
> +
> +
> +class YumRepo(object):
> + """
> + Class to represent and operate with YUM repositories.
> + It's loaded only on those systems listed at YUM_DISTROS and loads necessary
> + modules in runtime.
> + """
> + def __init__(self):
> + self._yb = getattr(__import__('yum'), 'YumBase')()
> + self._repos = self._yb.repos
> + self._conf = self._yb.conf
> + self._enabled_repos = self._repos.listEnabled()
> +
> + def getRepositoriesList(self):
> + """
> + Return a list of dictionaries in the repositories.Repositories() format
> + """
> + repo_list = []
> + for repo in self.enabledRepos():
> + irepo = {}
> + irepo['repo_id'] = repo.id
> + irepo['repo_name'] = repo.name
> + irepo['url_args'] = None,
> + irepo['enabled'] = repo.enabled
> + irepo['gpgcheck'] = repo.gpgcheck
> + irepo['gpgkey'] = repo.gpgkey
> + if len(repo.baseurl) > 0:
> + irepo['baseurl'] = repo.baseurl
> + irepo['is_mirror'] = False
> + else:
> + irepo['baseurl'] = [repo.mirrorlist]
> + irepo['is_mirror'] = True
> + repo_list.append(irepo)
> + return repo_list
> +
> + def addRepo(self, repo={}):
> + """
> + Add a given repository in repositories.Repositories() format to YumBase
> + """
> + if len(repo) == 0:
> + raise InvalidParameter("No parameters to add repository.")
> +
> + # At least one base url, or one mirror, must be given.
> + # baseurls must be a list of strings specifying the urls
> + # mirrorlist must be a list of strings specifying a list of mirrors
> + # Here we creates the lists, or set as None
> + if repo['is_mirror']:
> + mirrors = repo['baseurl']
> + baseurl = None
> + else:
> + baseurl = [repo['baseurl']]
> + mirrors = None
> +
> + self._yb.add_enable_repo(repo['repo_id'], baseurl, mirrors,
> + name=repo['repo_name'],
> + gpgcheck=repo['gpgcheck'],
> + gpgkey=[repo['gpgkey']])
> +
> + # write a repo file in the system with repo{} information.
> + self._write2disk(repo)
> +
> + def getRepo(self, repo_id):
> + """
> + Return a dictionary in the repositories.Repositories() of the given
> + repository ID format with the information of a YumRepository object.
> + """
> + try:
> + repo = self._repos.getRepo(repo_id)
> + irepo = {}
> + irepo['repo_id'] = repo.id
> + irepo['repo_name'] = repo.name
> + irepo['url_args'] = None,
> + irepo['enabled'] = repo.enabled
> + irepo['gpgcheck'] = repo.gpgcheck
> + irepo['gpgkey'] = repo.gpgkey
> + if len(repo.baseurl) > 0:
> + irepo['baseurl'] = repo.baseurl
> + irepo['is_mirror'] = False
> + else:
> + irepo['baseurl'] = [repo.mirrorlist]
> + irepo['is_mirror'] = True
> + return irepo
> + except:
> + raise OperationFailed("Repository %s does not exists." %
repo_id)
> +
> + def enabledRepos(self):
> + """
> + Return a list with enabled YUM repositories IDs
> + """
> + return self._enabled_repos
> +
> + def isRepoEnable(self, repo_id):
> + """
> + Return if a given repository ID is enabled or not
> + """
> + for repo in self.enabledRepos():
> + if repo_id == repo.id:
> + return True
> + return False
> +
> + def enableRepo(self, repo_id):
> + """
> + Enable a given repository
> + """
> + try:
> + self._repos.getRepo(repo_id).enable()
> + self._repos.doSetup()
> + return True
> + except:
> + raise OperationFailed("Could not enable repository %s." %
repo_id)
> +
> + def disableRepo(self, repo_id):
> + """
> + Disable a given repository
> + """
> + try:
> + self._repos.getRepo(repo_id).disable()
> + self._repos.doSetup()
> + return True
> + except:
> + raise OperationFailed("Could not disable repository %s." %
repo_id)
> +
> + def updateRepo(self, repo_id, repo={}):
> + """
> + Update a given repository in repositories.Repositories() format
> + """
> + if len(repo) == 0:
> + raise MissingParameter("No parameters to update repository.")
> +
> + self._repos.delete(repo_id)
> + self.addRepo(repo)
> +
> + def removeRepo(self, repo_id):
> + """
> + Remove a given repository
> + """
> + self._repos.delete(repo_id)
> + self._removefromdisk(repo_id)
> +
> + def _write2disk(self, repo={}):
> + """
> + Write repository info into disk.
> + """
> + # Get a list with all reposdir configured in system's YUM.
> + conf_dir = self._conf.reposdir
> + if not conf_dir:
> + raise NotFoundError("There is no YUM configuration
directory.")
> +
> + if len(repo) == 0:
> + raise InvalidParameter("No parameters to create a new repo
file.")
> +
> + # Generate the content to be wrote.
> + repo_content = '[%s]\n' % repo['repo_id']
> + repo_content = repo_content + 'name=%s\n' %
repo['repo_name']
> +
> + if isinstance(repo['baseurl'], list):
> + link = repo['baseurl'][0]
> + else:
> + link = repo['baseurl']
> +
> + if repo['is_mirror']:
> + repo_content = repo_content + 'mirrorlist=%s\n' % link
> + else:
> + repo_content = repo_content + 'baseurl=%s\n' % link
> +
> + if repo['enabled']:
> + repo_content = repo_content + 'enabled=1\n'
> + else:
> + repo_content = repo_content + 'enabled=0\n'
> +
> + if repo['gpgcheck']:
> + repo_content = repo_content + 'gpgcheck=1\n'
> + else:
> + repo_content = repo_content + 'gpgcheck=0\n'
> +
> + if repo['gpgkey']:
> + if isinstance(repo['gpgkey'], list):
> + link = repo['gpgkey'][0]
> + else:
> + link = repo['gpgkey']
> + repo_content = repo_content + 'gpgckey=%s\n' % link
> +
> + # Scan for the confdirs and write the file in the first available
> + # directory in the system. YUM will scan each confdir for repo files
> + # and load it contents, so we can write in the first available dir.
> + for dir in conf_dir:
> + if os.path.isdir(dir):
> + repo_file = dir + '/%s.repo' % repo['repo_id']
> + if os.path.isfile(repo_file):
> + os.remove(repo_file)
> +
> + try:
> + with open(repo_file, 'w') as fd:
> + fd.write(repo_content)
> + fd.close()
> + except:
> + raise OperationFailed("Could not write repo file %s"
%
> + repo_file)
> + break
> + return True
> +
> + def _removefromdisk(self, repo_id):
> + """
> + Delete the repo file from disk of a given repository
> + """
> + conf_dir = self._conf.reposdir
> + if not conf_dir:
> + raise NotFoundError("There is no YUM configuration
directory.")
> +
> + for dir in conf_dir:
> + if os.path.isdir(dir):
> + repo_file = dir + '/%s.repo' % repo_id
> + if os.path.isfile(repo_file):
> + os.remove(repo_file)
> +
> + return True
> +
> +
> +class AptRepo(object):
> + """
> + Class to represent and operate with YUM repositories.
> + It's loaded only on those systems listed at YUM_DISTROS and loads necessary
> + modules in runtime.
> + """
> + def __init__(self):
> + getattr(__import__('apt_pkg'), 'init_config')()
> + getattr(__import__('apt_pkg'), 'init_system')()
> + self._config = getattr(__import__('apt_pkg'), 'config')
> + self._etc_slist = '/%s%s' % (self._config.get('Dir::Etc'),
> + self._config.get('Dir::Etc::sourcelist'))
> + self._etc_sparts = '/%s%s' % (self._config.get('Dir::Etc'),
> + self._config.get('Dir::Etc::sourceparts'))
> +
> +
> + module = __import__('aptsources.sourceslist', globals(), locals(),
> + ['SourcesList'], -1)
> + self._repos = getattr(module, 'SourcesList')()
> +
> + def getRepositoriesList(self):
> + """
> + Return a list of dictionaries in the repositories.Repositories() format
> + """
> + repo_list = []
> + for repo in self.enabledRepos():
> + irepo = {}
> + if repo.file == self._etc_slist:
> + id = "%s%s" % (repo.uri.split("//")[1],
repo.dist)
> + else:
> + id = repo.file.split('/')[-1].split('.')[0]
> + irepo['repo_id'] = id
> + irepo['baseurl'] = repo.uri
> + list = [repo.dist]
> + for comp in repo.comps:
> + list.append(comp)
> + irepo['url_args'] = list
> + irepo['enabled'] = True
> + irepo['is_mirror'] = False
> + irepo['gpgcheck'] = False
> + irepo['gpgkey'] = None
> + repo_list.append(irepo)
> + return repo_list
> +
> + def addRepo(self, repo={}):
> + """
> + Add a given repository in repositories.Repositories() format to APT
> + """
> + if len(repo) == 0:
> + raise InvalidParameter("No parameters to add repository.")
> +
> + if repo['url_args'] is not None:
> + dist = repo['url_args'][0]
> + args = repo['url_args'][1:]
> + else:
> + dist = None
> + args = []
> +
> + _file = '%s/%s.list' % (self._etc_sparts, repo['repo_id'])
> +
> + self._repos.add('deb', repo['baseurl'], dist, args,
file=_file)
> + self._repos.save()
> +
> + def getRepo(self, repo_id):
> + """
> + Return a dictionary in the repositories.Repositories() format of the
> + given repository ID with the information of a SourceEntry object.
> + """
> + for repo in self._repos:
> + irepo = {}
> + if repo.file == self._etc_slist:
> + id = "%s%s" % (repo.uri.split("//")[1],
repo.dist)
> + else:
> + id = repo.file.split('/')[-1].split('.')[0]
> + irepo['repo_id'] = id
> + irepo['baseurl'] = repo.uri
> + list = [repo.dist]
> + for comp in repo.comps:
> + list.append(comp)
> + irepo['url_args'] = list
> + irepo['enabled'] = repo.enabled
> + irepo['is_mirror'] = False
> + irepo['gpgcheck'] = False
> + irepo['gpgkey'] = None
> + return irepo
> + raise OperationFailed("Repository %s does not exists." % repo_id)
> +
> + def enabledRepos(self):
> + """
> + Return a list with enabled APT repositories
> + """
> + enabled_repos = []
> + self._repos.refresh()
> + for repo in self._repos:
> + if (len(repo.str()) > 3) and (not repo.disabled):
> + if repo.type == 'deb':
> + enabled_repos.append(repo)
> + return enabled_repos
> +
> + def enableRepo(self, repo_id):
> + """
> + Enable a given repository
> + """
> + try:
> + lrepo = self._genSourceLine(repo_id)
> + self._repos.refresh()
> + for repo in self._repos:
> + if repo.disabled and (lrepo == repo.line):
> + repo.set_enabled('True')
> + self._repos.save()
> + return True
> + except:
> + raise OperationFailed("Could not enable repository %s." %
repo_id)
> +
> + def disableRepo(self, repo_id):
> + """
> + Disable a given repository
> + """
> + try:
> + lrepo = self._genSourceLine(repo_id)
> + self._repos.refresh()
> + for repo in self._repos:
> + if (not repo.disabled) and (lrepo == repo.line):
> + repo.set_enabled('False')
> + self._repos.save()
> + return True
> + except:
> + raise OperationFailed("Could not disable repository %s." %
repo_id)
> +
> + def updateRepo(self, repo_id, repo={}):
> + """
> + Update a given repository in repositories.Repositories() format
> + """
> + if len(repo) == 0:
> + raise MissingParameter("No parameters to update repository.")
> +
> + self.removeRepo(repo_id)
> + self.addRepo(repo)
> +
> + def removeRepo(self, repo_id):
> + """
> + Remove a given repository
> + """
> + try:
> + lrepo = self._genSourceLine(repo_id)
> + self._repos.refresh()
> + for repo in self._repos:
> + if lrepo == repo.line:
> + self._repos.remove(lrepo)
> + self._repos.save()
> + except:
> + raise OperationFailed("Could not remove repository %s." %
repo_id)
> +
> + def _genSourceLine(self, repo_id):
> + """
> + Generate a source.list line from repo_id information.
> + """
> + line = ''
> + repo = self.getRepo(repo_id)
> + if repo['enabled']:
> + line = 'deb '
> + else:
> + line = '#deb '
> + line = line + repo['baseurl'] + ' '
> + for i in range(0, len(repo['url_args'])):
> + line = line + repo['url_args'][i] + ' '
> + line = line + '\n'
> + return line