
From: Royce Lv <lvroyce@linux.vnet.ibm.com> This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index c012a62..b712c16 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -216,6 +216,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"), + "KCHVOL0026E": _("Inconsistent upload count"), "KCHIFACE0001E": _("Interface %(name)s does not exist"), diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index b2739b2..b089be5 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@ import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock() def get_vm_name(vm_name, t_name, name_list): @@ -162,3 +168,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() + + def get_lock(self): + self.ref_cnt += 1 + return self.lock + + def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete + + +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock: + return vol_lock.get_lock() + if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E") + lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + + +def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release() -- 2.1.0