On 22/04/2015 05:02, lvroyce(a)linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce(a)linux.vnet.ibm.com>
This lock facility guarentees 5 concurrent volume upload
and make sure locks are created and reclaimed when needed.
Signed-off-by: Royce Lv <lvroyce(a)linux.vnet.ibm.com>
---
src/kimchi/i18n.py | 1 +
src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 47 insertions(+)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py
index c012a62..b712c16 100644
--- a/src/kimchi/i18n.py
+++ b/src/kimchi/i18n.py
@@ -216,6 +216,7 @@ messages = {
"KCHVOL0022E": _("Unable to access file %(url)s. Please, check
it."),
"KCHVOL0023E": _("Unable to clone storage volume '%(name)s'
in pool '%(pool)s'. Details: %(err)s"),
"KCHVOL0024E": _("Upload volume chunk index, size and total size
must be integer"),
+ "KCHVOL0026E": _("Inconsistent upload count"),
A more detailed message would be useful to we really know what is going
wrong.
"KCHIFACE0001E": _("Interface %(name)s does not
exist"),
diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py
index b2739b2..b089be5 100644
--- a/src/kimchi/model/utils.py
+++ b/src/kimchi/model/utils.py
The code below is only related to storage volume upload feature so I
suggest to move it to model/storagevolumes.py
The model/utils.py should be used only for generic matters.
@@ -19,6 +19,7 @@
import libvirt
import socket
+import threading
import urlparse
from lxml import etree, objectify
from lxml.builder import E, ElementMaker
@@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests
KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi"
KIMCHI_NAMESPACE = "kimchi"
+UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time
+
+upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS)
+upload_lock_pool = dict()
+pool_lock = threading.Lock()
def get_vm_name(vm_name, t_name, name_list):
@@ -162,3 +168,43 @@ def get_metadata_node(dom, tag, metadata_support,
mode="current"):
if node is not None:
return etree.tostring(node)
return ""
+
+
+class UpdateLock(object):
+ def __init__(self):
+ self.ref_cnt = 0
+ self.lock = threading.Lock()
+
+ def get_lock(self):
+ self.ref_cnt += 1
+ return self.lock
+
+ def release_lock(self):
+ delete = False
+ self.ref_cnt -= 1
+ if (self.ref_cnt == 0):
+ delete = True
+ return delete
+
+
+def get_vol_update_lock(vol_path):
+ # upload_semaphore controls the max upload count
+ upload_semaphore.acquire()
+
+ # pool lock make sure lock list get/store action is atomic
+ with pool_lock:
+ vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock:
+ return vol_lock.get_lock()
+ if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1):
+ raise OperationFailed("KCHVOL0026E")
+ lock = upload_lock_pool[vol_path] = UpdateLock()
+ return lock.get_lock()
+
+
+def release_vol_update_lock(vol_path):
+ with pool_lock:
+ vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock.release_lock():
+ upload_lock_pool.pop(vol_path, None)
+ upload_semaphore.release()