[kimchi-devel][PATCHv3 0/6] Upload storage volume

From: Royce Lv <lvroyce@linux.vnet.ibm.com> known issue: When using api to send requests without setting up session, cherrypy server will report error after hundred of calls, because race condition below: https://bitbucket.org/cherrypy/cherrypy/pull-request/50/fix-race-condition-i... But it works well with session setup. v3>v1, Use libvirt upload api instead of r/w file Use lock to synchronize upload to prevent Royce Lv (6): Update docs and json schema of storage volume upload Update controller to make update accept formdata params Add lock facility for storage volume upload Update model for storage volume update Fix incomplete record when uploading update test case for storage volume upload docs/API.md | 7 ++-- src/kimchi/API.json | 19 +++++++++++ src/kimchi/control/base.py | 6 ++-- src/kimchi/i18n.py | 4 +++ src/kimchi/isoinfo.py | 5 ++- src/kimchi/mockmodel.py | 8 ++--- src/kimchi/model/storagevolumes.py | 61 +++++++++++++++------------------- src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++ tests/test_model.py | 67 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 177 insertions(+), 46 deletions(-) -- 2.1.0

From: Royce Lv <lvroyce@linux.vnet.ibm.com> File upload will use the same REST api as 'capacity' type when creating storage volume. Uploading will be implemented by following storage volume update. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- docs/API.md | 7 +++++-- src/kimchi/API.json | 19 +++++++++++++++++++ src/kimchi/i18n.py | 1 + 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/docs/API.md b/docs/API.md index 5c4ccd3..d61e9b1 100644 --- a/docs/API.md +++ b/docs/API.md @@ -474,13 +474,12 @@ A interface represents available network interface on VM. in the defined Storage Pool * **POST**: Create a new Storage Volume in the Storage Pool The return resource is a task resource * See Resource: Task * - Only one of 'file', 'capacity', 'url' can be specified. + Only one of 'capacity', 'url' can be specified. * name: The name of the Storage Volume * type: The type of the defined Storage Volume * capacity: The total space which can be used to store volumes The unit is bytes * format: The format of the defined Storage Volume - * file: File to be uploaded, passed through form data * url: URL to be downloaded ### Resource: Storage Volume @@ -508,6 +507,10 @@ A interface represents available network interface on VM. * **DELETE**: Remove the Storage Volume * **POST**: *See Storage Volume Actions* +* **PUT**: Upload storage volume chunk + * index: Chunk index of the slice in file. + * chunk_size: Chunk size of the slice in Bytes. + * chunk: Actual data of uploaded file **Actions (POST):** diff --git a/src/kimchi/API.json b/src/kimchi/API.json index 0cfa20c..52ce4c3 100644 --- a/src/kimchi/API.json +++ b/src/kimchi/API.json @@ -221,6 +221,25 @@ } } }, + "storagevolume_update": { + "type": "object", + "properties": { + "chunk": { + "description": "Upload storage volume chunk" + }, + "index": { + "description": "Chunk index of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + }, + "chunk_size": { + "description": "Chunk size of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + } + }, + "additionalProperties": false + }, "vms_create": { "type": "object", "error": "KCHVM0016E", diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 4eccc3e..0a31cd5 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -210,6 +210,7 @@ messages = { "KCHVOL0021E": _("Storage volume URL must be http://, https://, ftp:// or ftps://."), "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), + "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"), "KCHIFACE0001E": _("Interface %(name)s does not exist"), -- 2.1.0

On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
File upload will use the same REST api as 'capacity' type when creating storage volume. Uploading will be implemented by following storage volume update.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- docs/API.md | 7 +++++-- src/kimchi/API.json | 19 +++++++++++++++++++ src/kimchi/i18n.py | 1 + 3 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/docs/API.md b/docs/API.md index 5c4ccd3..d61e9b1 100644 --- a/docs/API.md +++ b/docs/API.md @@ -474,13 +474,12 @@ A interface represents available network interface on VM. in the defined Storage Pool * **POST**: Create a new Storage Volume in the Storage Pool The return resource is a task resource * See Resource: Task * - Only one of 'file', 'capacity', 'url' can be specified. + Only one of 'capacity', 'url' can be specified.
What happen when I want to upload a file?
* name: The name of the Storage Volume * type: The type of the defined Storage Volume * capacity: The total space which can be used to store volumes The unit is bytes * format: The format of the defined Storage Volume - * file: File to be uploaded, passed through form data * url: URL to be downloaded
### Resource: Storage Volume @@ -508,6 +507,10 @@ A interface represents available network interface on VM.
* **DELETE**: Remove the Storage Volume * **POST**: *See Storage Volume Actions* +* **PUT**: Upload storage volume chunk + * index: Chunk index of the slice in file. + * chunk_size: Chunk size of the slice in Bytes. + * chunk: Actual data of uploaded file
Will the upload function only use PUT requests? The PUT request is to update an existing entry which will not be the case in first time.
**Actions (POST):**
diff --git a/src/kimchi/API.json b/src/kimchi/API.json index 0cfa20c..52ce4c3 100644 --- a/src/kimchi/API.json +++ b/src/kimchi/API.json @@ -221,6 +221,25 @@ } } }, + "storagevolume_update": { + "type": "object", + "properties": { + "chunk": { + "description": "Upload storage volume chunk" + }, + "index": { + "description": "Chunk index of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + }, + "chunk_size": { + "description": "Chunk size of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + } + },
I think all those data is required, right?
+ "additionalProperties": false + }, "vms_create": { "type": "object", "error": "KCHVM0016E", diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 4eccc3e..0a31cd5 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -210,6 +210,7 @@ messages = { "KCHVOL0021E": _("Storage volume URL must be http://, https://, ftp:// or ftps://."), "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), + "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
"KCHIFACE0001E": _("Interface %(name)s does not exist"),

On 02/09/2015 08:45 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
File upload will use the same REST api as 'capacity' type when creating storage volume. Uploading will be implemented by following storage volume update.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- docs/API.md | 7 +++++-- src/kimchi/API.json | 19 +++++++++++++++++++ src/kimchi/i18n.py | 1 + 3 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/docs/API.md b/docs/API.md index 5c4ccd3..d61e9b1 100644 --- a/docs/API.md +++ b/docs/API.md @@ -474,13 +474,12 @@ A interface represents available network interface on VM. in the defined Storage Pool * **POST**: Create a new Storage Volume in the Storage Pool The return resource is a task resource * See Resource: Task * - Only one of 'file', 'capacity', 'url' can be specified. + Only one of 'capacity', 'url' can be specified.
What happen when I want to upload a file?
Use a POST with "capacity" first, and then multiple "PUT" to transfer data to this volume. I will add this to commit msg next version.
* name: The name of the Storage Volume * type: The type of the defined Storage Volume * capacity: The total space which can be used to store volumes The unit is bytes * format: The format of the defined Storage Volume - * file: File to be uploaded, passed through form data * url: URL to be downloaded
### Resource: Storage Volume @@ -508,6 +507,10 @@ A interface represents available network interface on VM.
* **DELETE**: Remove the Storage Volume * **POST**: *See Storage Volume Actions* +* **PUT**: Upload storage volume chunk + * index: Chunk index of the slice in file. + * chunk_size: Chunk size of the slice in Bytes. + * chunk: Actual data of uploaded file
Will the upload function only use PUT requests? The PUT request is to update an existing entry which will not be the case in first time.
**Actions (POST):**
diff --git a/src/kimchi/API.json b/src/kimchi/API.json index 0cfa20c..52ce4c3 100644 --- a/src/kimchi/API.json +++ b/src/kimchi/API.json @@ -221,6 +221,25 @@ } } }, + "storagevolume_update": { + "type": "object", + "properties": { + "chunk": { + "description": "Upload storage volume chunk" + }, + "index": { + "description": "Chunk index of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + }, + "chunk_size": { + "description": "Chunk size of uploaded storage volume", + "type": "string", + "error": "KCHVOL0024E" + } + },
I think all those data is required, right?
ACK, will fix this.
+ "additionalProperties": false + }, "vms_create": { "type": "object", "error": "KCHVM0016E", diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 4eccc3e..0a31cd5 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -210,6 +210,7 @@ messages = { "KCHVOL0021E": _("Storage volume URL must be http://, https://, ftp:// or ftps://."), "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), + "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
"KCHIFACE0001E": _("Interface %(name)s does not exist"),

From: Royce Lv <lvroyce@linux.vnet.ibm.com> When update does not accept params in base class, cherrypy will raise error that extra params are provided in body. So allow update function to accept params. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/control/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kimchi/control/base.py b/src/kimchi/control/base.py index 484e9b9..1a40af8 100644 --- a/src/kimchi/control/base.py +++ b/src/kimchi/control/base.py @@ -151,7 +151,7 @@ class Resource(object): raise cherrypy.HTTPError(400, e.message) @cherrypy.expose - def index(self): + def index(self, *args, **kargs): method = validate_method(('GET', 'DELETE', 'PUT'), self.role_key, self.admin_methods) @@ -162,7 +162,7 @@ class Resource(object): return {'GET': self.get, 'DELETE': self.delete, - 'PUT': self.update}[method]() + 'PUT': self.update}[method](*args, **kargs) except InvalidOperation, e: raise cherrypy.HTTPError(400, e.message) except InvalidParameter, e: @@ -189,7 +189,7 @@ class Resource(object): return user_name in users or len(set(user_groups) & set(groups)) > 0 - def update(self): + def update(self, *args, **kargs): try: update = getattr(self.model, model_fn(self, 'update')) except AttributeError: -- 2.1.0

Reviewed-by: Aline Manera <alinefm@linux.vnet.ibm.com> On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
When update does not accept params in base class, cherrypy will raise error that extra params are provided in body. So allow update function to accept params.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/control/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/kimchi/control/base.py b/src/kimchi/control/base.py index 484e9b9..1a40af8 100644 --- a/src/kimchi/control/base.py +++ b/src/kimchi/control/base.py @@ -151,7 +151,7 @@ class Resource(object): raise cherrypy.HTTPError(400, e.message)
@cherrypy.expose - def index(self): + def index(self, *args, **kargs): method = validate_method(('GET', 'DELETE', 'PUT'), self.role_key, self.admin_methods)
@@ -162,7 +162,7 @@ class Resource(object):
return {'GET': self.get, 'DELETE': self.delete, - 'PUT': self.update}[method]() + 'PUT': self.update}[method](*args, **kargs) except InvalidOperation, e: raise cherrypy.HTTPError(400, e.message) except InvalidParameter, e: @@ -189,7 +189,7 @@ class Resource(object):
return user_name in users or len(set(user_groups) & set(groups)) > 0
- def update(self): + def update(self, *args, **kargs): try: update = getattr(self.model, model_fn(self, 'update')) except AttributeError:

From: Royce Lv <lvroyce@linux.vnet.ibm.com> This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 0a31cd5..af912bd 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"), + "KCHVOL0026E": _("Inconsistent upload count"), "KCHIFACE0001E": _("Interface %(name)s does not exist"), diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index 9896289..010966a 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@ import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock() def get_vm_name(vm_name, t_name, name_list): @@ -161,3 +167,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() + + def get_lock(self): + self.ref_cnt += 1 + return self.lock + + def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete + + +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock: + return vol_lock.get_lock() + if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E") + lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + + +def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release() -- 2.1.0

On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 0a31cd5..af912bd 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
+ "KCHVOL0026E": _("Inconsistent upload count"),
What does it mean for the user?
"KCHIFACE0001E": _("Interface %(name)s does not exist"),
diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index 9896289..010966a 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@
import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests
KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock()
def get_vm_name(vm_name, t_name, name_list): @@ -161,3 +167,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() +
+ def get_lock(self): + self.ref_cnt += 1 + return self.lock +
It always return the lock without checking it has the maximum number So what is it for?
+ def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete +
So first time I will use release_lock(), self.ref_cnt will be zero so self.ret_cnt -= 1 turns to -1 and it returns False.
+ +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock: + return vol_lock.get_lock() + if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E")
When the exception will be raised? From my view, it will always return the lock in the "if" right above Is the lock for the pool or the volume? It is confused to to review.
+ lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + + +def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release()

OK, let me explain this lock scheme. 1. Every uploading volume needs a lock (thread locking) to make sure no one else is manipulating this volume at the same time to avoid write competition. 2. Consider about we cannot spend too much bandwidth on upload, we need to control concurrent uploading volumes to a reasonable number (here it is assigned to be 5). If as 1. requires each volume created with a lock, but 2. we just have 5 active uploading volumes, this will be a resource waste. so we have a lock pool called "upload_lock_pool" to store active uploading locks. This is to say, a volume upload request comes in and allocate a lock, store it in this pool, if a lock for volume "A" is not used any more(all blocking PUT requests are finished and the lock "ref_cnt" is 0), it will be deleted to make room in "upload_lock_pool" for other volumes "PUT" requests. So scenarios are below: a. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool does not has this lock for this volume yet. Kimchi will allocate a volume lock for it to use. b. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool already has this lock for this volume already. This means others is using this lock now, then this thread just add lock ref_cnt, and wait on this lock until it can get it. c. An uploading volume request comes in, concurrent thread cnt exceeds 5, request blocks on semaphore for the pool. The reason lock pool need a lock is we need to allocate lock in and delete lock from pool, this is a write operation, we need a lock protect pool r/w competition. On 02/09/2015 09:49 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 0a31cd5..af912bd 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
+ "KCHVOL0026E": _("Inconsistent upload count"),
What does it mean for the user?
"KCHIFACE0001E": _("Interface %(name)s does not exist"),
diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index 9896289..010966a 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@
import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests
KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock()
def get_vm_name(vm_name, t_name, name_list): @@ -161,3 +167,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() +
+ def get_lock(self): + self.ref_cnt += 1 + return self.lock +
It always return the lock without checking it has the maximum number So what is it for?
+ def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete +
So first time I will use release_lock(), self.ref_cnt will be zero so self.ret_cnt -= 1 turns to -1 and it returns False.
+ +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock: + return vol_lock.get_lock() + if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E")
When the exception will be raised? From my view, it will always return the lock in the "if" right above
Is the lock for the pool or the volume? It is confused to to review.
+ lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + + +def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path) + if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release()

On 03/03/2015 04:50, Royce Lv wrote:
OK, let me explain this lock scheme.
1. Every uploading volume needs a lock (thread locking) to make sure no one else is manipulating this volume at the same time to avoid write competition. 2. Consider about we cannot spend too much bandwidth on upload, we need to control concurrent uploading volumes to a reasonable number (here it is assigned to be 5).
If as 1. requires each volume created with a lock, but 2. we just have 5 active uploading volumes, this will be a resource waste.
so we have a lock pool called "upload_lock_pool" to store active uploading locks. This is to say, a volume upload request comes in and allocate a lock, store it in this pool, if a lock for volume "A" is not used any more(all blocking PUT requests are finished and the lock "ref_cnt" is 0), it will be deleted to make room in "upload_lock_pool" for other volumes "PUT" requests.
So scenarios are below: a. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool does not has this lock for this volume yet. Kimchi will allocate a volume lock for it to use. b. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool already has this lock for this volume already. This means others is using this lock now, then this thread just add lock ref_cnt, and wait on this lock until it can get it. c. An uploading volume request comes in, concurrent thread cnt exceeds 5, request blocks on semaphore for the pool.
The reason lock pool need a lock is we need to allocate lock in and delete lock from pool, this is a write operation, we need a lock protect pool r/w competition.
Ok - Thanks for explaining! I think I have a better idea on it now. See my comments below.
On 02/09/2015 09:49 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 0a31cd5..af912bd 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
+ "KCHVOL0026E": _("Inconsistent upload count"),
What does it mean for the user?
If this message will be shown to user we need to detail more it.
"KCHIFACE0001E": _("Interface %(name)s does not exist"),
diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index 9896289..010966a 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@
import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests
KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock()
def get_vm_name(vm_name, t_name, name_list): @@ -161,3 +167,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() +
+ def get_lock(self): + self.ref_cnt += 1 + return self.lock +
It always return the lock without checking it has the maximum number So what is it for?
The max number verification must be done here and don't return the lock when it is reached.
+ def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete +
So first time I will use release_lock(), self.ref_cnt will be zero so self.ret_cnt -= 1 turns to -1 and it returns False.
+ +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock: + return vol_lock.get_lock()
+ if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E")
This max number verification must be the first thing to do when returning the lock. As I suggested above we should do that in the get_lock() function.
When the exception will be raised? From my view, it will always return the lock in the "if" right above
Is the lock for the pool or the volume? It is confused to to review.
+ lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + +
+def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
If, for any reason, I call this function prior to get the lock it will fail as vol_lock will be None. I suggest to do this kind of verification to avoid problems.
+ if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release()

On 03/03/2015 12:07 PM, Aline Manera wrote:
On 03/03/2015 04:50, Royce Lv wrote:
OK, let me explain this lock scheme.
1. Every uploading volume needs a lock (thread locking) to make sure no one else is manipulating this volume at the same time to avoid write competition. 2. Consider about we cannot spend too much bandwidth on upload, we need to control concurrent uploading volumes to a reasonable number (here it is assigned to be 5).
If as 1. requires each volume created with a lock, but 2. we just have 5 active uploading volumes, this will be a resource waste.
so we have a lock pool called "upload_lock_pool" to store active uploading locks. This is to say, a volume upload request comes in and allocate a lock, store it in this pool, if a lock for volume "A" is not used any more(all blocking PUT requests are finished and the lock "ref_cnt" is 0), it will be deleted to make room in "upload_lock_pool" for other volumes "PUT" requests.
So scenarios are below: a. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool does not has this lock for this volume yet. Kimchi will allocate a volume lock for it to use. b. An uploading volume request comes in, concurrent thread cnt is less than 5, lock pool already has this lock for this volume already. This means others is using this lock now, then this thread just add lock ref_cnt, and wait on this lock until it can get it. c. An uploading volume request comes in, concurrent thread cnt exceeds 5, request blocks on semaphore for the pool.
The reason lock pool need a lock is we need to allocate lock in and delete lock from pool, this is a write operation, we need a lock protect pool r/w competition.
Ok - Thanks for explaining! I think I have a better idea on it now.
See my comments below.
On 02/09/2015 09:49 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
This lock facility guarentees 5 concurrent volume upload and make sure locks are created and reclaimed when needed.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index 0a31cd5..af912bd 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"),
+ "KCHVOL0026E": _("Inconsistent upload count"),
What does it mean for the user?
If this message will be shown to user we need to detail more it.
It is an internal error, see comments below.
"KCHIFACE0001E": _("Interface %(name)s does not exist"),
diff --git a/src/kimchi/model/utils.py b/src/kimchi/model/utils.py index 9896289..010966a 100644 --- a/src/kimchi/model/utils.py +++ b/src/kimchi/model/utils.py @@ -19,6 +19,7 @@
import libvirt import socket +import threading import urlparse from lxml import etree, objectify from lxml.builder import E, ElementMaker @@ -28,6 +29,11 @@ from kimchi.model.featuretests import FeatureTests
KIMCHI_META_URL = "https://github.com/kimchi-project/kimchi" KIMCHI_NAMESPACE = "kimchi" +UPLOAD_THREADS = 5 # Concurrent upload volume counts at the same time + +upload_semaphore = threading.BoundedSemaphore(value=UPLOAD_THREADS) +upload_lock_pool = dict() +pool_lock = threading.Lock()
def get_vm_name(vm_name, t_name, name_list): @@ -161,3 +167,43 @@ def get_metadata_node(dom, tag, metadata_support, mode="current"): if node is not None: return etree.tostring(node) return "" + + +class UpdateLock(object): + def __init__(self): + self.ref_cnt = 0 + self.lock = threading.Lock() +
+ def get_lock(self): + self.ref_cnt += 1 + return self.lock +
It always return the lock without checking it has the maximum number So what is it for?
The max number verification must be done here and don't return the lock when it is reached.
The upper limit is checked by upload_semaphore.acquire() before calling get_lock, this ref_cnt is to decide if the volume lock need to be reclaim or not.
+ def release_lock(self): + delete = False + self.ref_cnt -= 1 + if (self.ref_cnt == 0): + delete = True + return delete +
So first time I will use release_lock(), self.ref_cnt will be zero so self.ret_cnt -= 1 turns to -1 and it returns False.
+ +def get_vol_update_lock(vol_path): + # upload_semaphore controls the max upload count + upload_semaphore.acquire() + + # pool lock make sure lock list get/store action is atomic + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
+ if vol_lock: + return vol_lock.get_lock()
+ if len(upload_lock_pool.keys()) > (UPLOAD_THREADS - 1): + raise OperationFailed("KCHVOL0026E")
This max number verification must be the first thing to do when returning the lock. As I suggested above we should do that in the get_lock() function.
This to check if for some reason, the cnt in upload_semaphore is less than upper limit but the lock cnt is more than it. It means some one called "get_vol_update_lock", but forget to call release_vol_update_lock or un-handled exception raised before release_vol_update_lock.
When the exception will be raised? From my view, it will always return the lock in the "if" right above
Is the lock for the pool or the volume? It is confused to to review.
+ lock = upload_lock_pool[vol_path] = UpdateLock() + return lock.get_lock() + +
+def release_vol_update_lock(vol_path): + with pool_lock: + vol_lock = upload_lock_pool.get(vol_path)
If, for any reason, I call this function prior to get the lock it will fail as vol_lock will be None. I suggest to do this kind of verification to avoid problems.
ACK.
+ if vol_lock.release_lock(): + upload_lock_pool.pop(vol_path, None) + upload_semaphore.release()

From: Royce Lv <lvroyce@linux.vnet.ibm.com> Storage update abandoned async task because mem copy is not long lasting task and does not need to use task to track its status. So update just do the data upload, and following storage volume lookup will take care of current upload progress. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/storagevolumes.py | 61 +++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af912bd..af0dc4f 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"), + "KCHVOL0025E": _("Upload volume fails. Details: %(err)s"), "KCHVOL0026E": _("Inconsistent upload count"), "KCHIFACE0001E": _("Interface %(name)s does not exist"), diff --git a/src/kimchi/model/storagevolumes.py b/src/kimchi/model/storagevolumes.py index cb79966..ff99972 100644 --- a/src/kimchi/model/storagevolumes.py +++ b/src/kimchi/model/storagevolumes.py @@ -34,6 +34,7 @@ from kimchi.isoinfo import IsoImage from kimchi.model.diskutils import get_disk_ref_cnt from kimchi.model.storagepools import StoragePoolModel from kimchi.model.tasks import TaskModel +from kimchi.model.utils import get_vol_update_lock, release_vol_update_lock from kimchi.utils import add_task, get_next_clone_name, get_unique_file_name from kimchi.utils import kimchi_log from kimchi.xmlutils.utils import xpath_get_text @@ -58,7 +59,7 @@ class StorageVolumesModel(object): self.task = TaskModel(**kargs) def create(self, pool_name, params): - vol_source = ['file', 'url', 'capacity'] + vol_source = ['url', 'capacity'] name = params.get('name') @@ -89,9 +90,7 @@ class StorageVolumesModel(object): # if 'name' is omitted - except for the methods listed in # 'REQUIRE_NAME_PARAMS' - the default volume name will be the # file/URL basename. - if create_param == 'file': - name = os.path.basename(params['file'].filename) - elif create_param == 'url': + if create_param == 'url': name = os.path.basename(params['url']) else: name = 'upload-%s' % int(time.time()) @@ -120,36 +119,6 @@ class StorageVolumesModel(object): taskid = add_task(targeturi, create_func, self.objstore, params) return self.task.lookup(taskid) - def _create_volume_with_file(self, cb, params): - pool_name = params.pop('pool') - dir_path = StoragePoolModel( - conn=self.conn, objstore=self.objstore).lookup(pool_name)['path'] - file_path = os.path.join(dir_path, params['name']) - if os.path.exists(file_path): - raise InvalidParameter('KCHVOL0001E', {'name': params['name']}) - - upload_file = params['file'] - f_len = upload_file.fp.length - try: - size = 0 - with open(file_path, 'wb') as f: - while True: - data = upload_file.file.read(READ_CHUNK_SIZE) - if not data: - break - size += len(data) - f.write(data) - cb('%s/%s' % (size, f_len)) - except Exception as e: - raise OperationFailed('KCHVOL0007E', - {'name': params['name'], - 'pool': pool_name, - 'err': e.message}) - - # Refresh to make sure volume can be found in following lookup - StoragePoolModel.get_storagepool(pool_name, self.conn).refresh(0) - cb('OK', True) - def _create_volume_with_capacity(self, cb, params): pool_name = params.pop('pool') vol_xml = """ @@ -464,6 +433,30 @@ class StorageVolumeModel(object): cb('OK', True) + def update(self, pool, name, params): + vol = StorageVolumeModel.get_storagevolume(pool, name, self.conn) + vol_path = vol.path() + + chunk_size = int(params['chunk_size']) + index = int(params['index']) + pos = chunk_size * index + lock = get_vol_update_lock(vol_path) + try: + with lock: + st = self.conn.get().newStream(0) + vol.upload(st, pos, chunk_size) + st.send(params['chunk'].fullvalue()) + st.finish() + except Exception as e: + st and st.abort() + raise OperationFailed('KCHVOL0025E', + {'err': e.message}) + finally: + release_vol_update_lock(vol_path) + + # Refresh to make sure volume can be found in following lookup + StoragePoolModel.get_storagepool(pool, self.conn).refresh(0) + class IsoVolumesModel(object): def __init__(self, **kargs): -- 2.1.0

On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
Storage update abandoned async task because mem copy is not long lasting task and does not need to use task to track its status. So update just do the data upload, and following storage volume lookup will take care of current upload progress.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/model/storagevolumes.py | 61 +++++++++++++++++--------------------- 2 files changed, 28 insertions(+), 34 deletions(-)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af912bd..af0dc4f 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -211,6 +211,7 @@ messages = { "KCHVOL0022E": _("Unable to access file %(url)s. Please, check it."), "KCHVOL0023E": _("Unable to clone storage volume '%(name)s' in pool '%(pool)s'. Details: %(err)s"), "KCHVOL0024E": _("Upload volume chunk index, size and total size must be integer"), + "KCHVOL0025E": _("Upload volume fails. Details: %(err)s"), "KCHVOL0026E": _("Inconsistent upload count"),
"KCHIFACE0001E": _("Interface %(name)s does not exist"), diff --git a/src/kimchi/model/storagevolumes.py b/src/kimchi/model/storagevolumes.py index cb79966..ff99972 100644 --- a/src/kimchi/model/storagevolumes.py +++ b/src/kimchi/model/storagevolumes.py @@ -34,6 +34,7 @@ from kimchi.isoinfo import IsoImage from kimchi.model.diskutils import get_disk_ref_cnt from kimchi.model.storagepools import StoragePoolModel from kimchi.model.tasks import TaskModel +from kimchi.model.utils import get_vol_update_lock, release_vol_update_lock from kimchi.utils import add_task, get_next_clone_name, get_unique_file_name from kimchi.utils import kimchi_log from kimchi.xmlutils.utils import xpath_get_text @@ -58,7 +59,7 @@ class StorageVolumesModel(object): self.task = TaskModel(**kargs)
def create(self, pool_name, params): - vol_source = ['file', 'url', 'capacity'] + vol_source = ['url', 'capacity']
name = params.get('name')
@@ -89,9 +90,7 @@ class StorageVolumesModel(object): # if 'name' is omitted - except for the methods listed in # 'REQUIRE_NAME_PARAMS' - the default volume name will be the # file/URL basename. - if create_param == 'file': - name = os.path.basename(params['file'].filename) - elif create_param == 'url': + if create_param == 'url': name = os.path.basename(params['url']) else: name = 'upload-%s' % int(time.time()) @@ -120,36 +119,6 @@ class StorageVolumesModel(object): taskid = add_task(targeturi, create_func, self.objstore, params) return self.task.lookup(taskid)
- def _create_volume_with_file(self, cb, params): - pool_name = params.pop('pool') - dir_path = StoragePoolModel( - conn=self.conn, objstore=self.objstore).lookup(pool_name)['path'] - file_path = os.path.join(dir_path, params['name']) - if os.path.exists(file_path): - raise InvalidParameter('KCHVOL0001E', {'name': params['name']}) - - upload_file = params['file'] - f_len = upload_file.fp.length - try: - size = 0 - with open(file_path, 'wb') as f: - while True: - data = upload_file.file.read(READ_CHUNK_SIZE) - if not data: - break - size += len(data) - f.write(data) - cb('%s/%s' % (size, f_len)) - except Exception as e: - raise OperationFailed('KCHVOL0007E', - {'name': params['name'], - 'pool': pool_name, - 'err': e.message}) - - # Refresh to make sure volume can be found in following lookup - StoragePoolModel.get_storagepool(pool_name, self.conn).refresh(0) - cb('OK', True) - def _create_volume_with_capacity(self, cb, params): pool_name = params.pop('pool') vol_xml = """ @@ -464,6 +433,30 @@ class StorageVolumeModel(object):
cb('OK', True)
+ def update(self, pool, name, params):
Prior to the PUT requests we must have a POST to create the volume.
+ vol = StorageVolumeModel.get_storagevolume(pool, name, self.conn) + vol_path = vol.path() + + chunk_size = int(params['chunk_size']) + index = int(params['index']) + pos = chunk_size * index + lock = get_vol_update_lock(vol_path) + try: + with lock: + st = self.conn.get().newStream(0) + vol.upload(st, pos, chunk_size) + st.send(params['chunk'].fullvalue()) + st.finish() + except Exception as e: + st and st.abort() + raise OperationFailed('KCHVOL0025E', + {'err': e.message}) + finally: + release_vol_update_lock(vol_path) + + # Refresh to make sure volume can be found in following lookup + StoragePoolModel.get_storagepool(pool, self.conn).refresh(0) +
class IsoVolumesModel(object): def __init__(self, **kargs):

From: Royce Lv <lvroyce@linux.vnet.ibm.com> When uploading an ISO file, the ISO distro record may be incomplete because uploading in progress. "call_stack":"Traceback (most recent call last): File \"./src/kimchi/model/storagevolumes.py\", line 304, in lookup iso_img = IsoImage(path) File \"./src/kimchi/isoinfo.py\", line 149, in __init__ self._scan() File \"./src/kimchi/isoinfo.py\", line 437, in _scan self._scan_el_torito(data) File \"./src/kimchi/isoinfo.py\", line 218, in _scan_el_torito ident, csum, key55, keyAA) = self._unpack(fmt, tmp_data) File \"./src/kimchi/isoinfo.py\", line 181, in _unpack return s.unpack(data[:s.size]) error: unpack requires a string argument of length 32\n" So wrap reading record with error handling so that error will not occur in uploading. Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/isoinfo.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af0dc4f..00e4a7d 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -67,6 +67,7 @@ messages = { "to file access control lists for '%(user)s' user if possible, or add the " "'%(user)s' to the ISO path group, or (not recommended) 'chmod -R o+x 'path_to_iso'." "Details: %(err)s" ), + "KCHISO0009E": _("Incomplete record while reading ISO %(filename)s."), "KCHIMG0001E": _("An error occurred when probing image OS information."), "KCHIMG0002E": _("No OS information found in given image."), diff --git a/src/kimchi/isoinfo.py b/src/kimchi/isoinfo.py index 1e36a6d..18aa38e 100644 --- a/src/kimchi/isoinfo.py +++ b/src/kimchi/isoinfo.py @@ -147,7 +147,10 @@ class IsoImage(object): self.remote = self._is_iso_remote() self.volume_id = None self.bootable = False - self._scan() + try: + self._scan() + except: + raise IsoFormatError('KCHISO0009E', {'filename': self.path}) def _is_iso_remote(self): if os.path.isfile(self.path): -- 2.1.0

On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
When uploading an ISO file, the ISO distro record may be incomplete because uploading in progress. "call_stack":"Traceback (most recent call last): File \"./src/kimchi/model/storagevolumes.py\", line 304, in lookup iso_img = IsoImage(path) File \"./src/kimchi/isoinfo.py\", line 149, in __init__ self._scan() File \"./src/kimchi/isoinfo.py\", line 437, in _scan self._scan_el_torito(data) File \"./src/kimchi/isoinfo.py\", line 218, in _scan_el_torito ident, csum, key55, keyAA) = self._unpack(fmt, tmp_data) File \"./src/kimchi/isoinfo.py\", line 181, in _unpack return s.unpack(data[:s.size]) error: unpack requires a string argument of length 32\n" So wrap reading record with error handling so that error will not occur in uploading.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/isoinfo.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af0dc4f..00e4a7d 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -67,6 +67,7 @@ messages = { "to file access control lists for '%(user)s' user if possible, or add the " "'%(user)s' to the ISO path group, or (not recommended) 'chmod -R o+x 'path_to_iso'." "Details: %(err)s" ),
+ "KCHISO0009E": _("Incomplete record while reading ISO %(filename)s."),
Why and when will the user get this error message? If it is uploading a file and want to see its progress we should be smart enough to do not flood the user with errors.
"KCHIMG0001E": _("An error occurred when probing image OS information."), "KCHIMG0002E": _("No OS information found in given image."), diff --git a/src/kimchi/isoinfo.py b/src/kimchi/isoinfo.py index 1e36a6d..18aa38e 100644 --- a/src/kimchi/isoinfo.py +++ b/src/kimchi/isoinfo.py @@ -147,7 +147,10 @@ class IsoImage(object): self.remote = self._is_iso_remote() self.volume_id = None self.bootable = False - self._scan() + try: + self._scan() + except: + raise IsoFormatError('KCHISO0009E', {'filename': self.path})
def _is_iso_remote(self): if os.path.isfile(self.path):

On 02/09/2015 09:52 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
When uploading an ISO file, the ISO distro record may be incomplete because uploading in progress. "call_stack":"Traceback (most recent call last): File \"./src/kimchi/model/storagevolumes.py\", line 304, in lookup iso_img = IsoImage(path) File \"./src/kimchi/isoinfo.py\", line 149, in __init__ self._scan() File \"./src/kimchi/isoinfo.py\", line 437, in _scan self._scan_el_torito(data) File \"./src/kimchi/isoinfo.py\", line 218, in _scan_el_torito ident, csum, key55, keyAA) = self._unpack(fmt, tmp_data) File \"./src/kimchi/isoinfo.py\", line 181, in _unpack return s.unpack(data[:s.size]) error: unpack requires a string argument of length 32\n" So wrap reading record with error handling so that error will not occur in uploading.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/isoinfo.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af0dc4f..00e4a7d 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -67,6 +67,7 @@ messages = { "to file access control lists for '%(user)s' user if possible, or add the " "'%(user)s' to the ISO path group, or (not recommended) 'chmod -R o+x 'path_to_iso'." "Details: %(err)s" ),
+ "KCHISO0009E": _("Incomplete record while reading ISO %(filename)s."),
Why and when will the user get this error message?
This is when self._scan() trying to read the el_torito and primary volume but only part of data has been written.
If it is uploading a file and want to see its progress we should be smart enough to do not flood the user with errors.
User won't be flooded with this error, previous error is raised when calling storagevolume lookup, when wrapped with ISOFormatError, storagevolume lookup will ignore this error and report this volume distro and version as "unknown". try: iso_img = IsoImage(path) os_distro, os_version = iso_img.probe() bootable = True except IsoFormatError: bootable = False
"KCHIMG0001E": _("An error occurred when probing image OS information."), "KCHIMG0002E": _("No OS information found in given image."), diff --git a/src/kimchi/isoinfo.py b/src/kimchi/isoinfo.py index 1e36a6d..18aa38e 100644 --- a/src/kimchi/isoinfo.py +++ b/src/kimchi/isoinfo.py @@ -147,7 +147,10 @@ class IsoImage(object): self.remote = self._is_iso_remote() self.volume_id = None self.bootable = False - self._scan() + try: + self._scan() + except: + raise IsoFormatError('KCHISO0009E', {'filename': self.path})
def _is_iso_remote(self): if os.path.isfile(self.path):

On 03/03/2015 05:13, Royce Lv wrote:
On 02/09/2015 09:52 AM, Aline Manera wrote:
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
When uploading an ISO file, the ISO distro record may be incomplete because uploading in progress. "call_stack":"Traceback (most recent call last): File \"./src/kimchi/model/storagevolumes.py\", line 304, in lookup iso_img = IsoImage(path) File \"./src/kimchi/isoinfo.py\", line 149, in __init__ self._scan() File \"./src/kimchi/isoinfo.py\", line 437, in _scan self._scan_el_torito(data) File \"./src/kimchi/isoinfo.py\", line 218, in _scan_el_torito ident, csum, key55, keyAA) = self._unpack(fmt, tmp_data) File \"./src/kimchi/isoinfo.py\", line 181, in _unpack return s.unpack(data[:s.size]) error: unpack requires a string argument of length 32\n" So wrap reading record with error handling so that error will not occur in uploading.
Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/i18n.py | 1 + src/kimchi/isoinfo.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/src/kimchi/i18n.py b/src/kimchi/i18n.py index af0dc4f..00e4a7d 100644 --- a/src/kimchi/i18n.py +++ b/src/kimchi/i18n.py @@ -67,6 +67,7 @@ messages = { "to file access control lists for '%(user)s' user if possible, or add the " "'%(user)s' to the ISO path group, or (not recommended) 'chmod -R o+x 'path_to_iso'." "Details: %(err)s" ),
+ "KCHISO0009E": _("Incomplete record while reading ISO %(filename)s."),
Why and when will the user get this error message?
This is when self._scan() trying to read the el_torito and primary volume but only part of data has been written.
If it is uploading a file and want to see its progress we should be smart enough to do not flood the user with errors.
User won't be flooded with this error, previous error is raised when calling storagevolume lookup, when wrapped with ISOFormatError, storagevolume lookup will ignore this error and report this volume distro and version as "unknown". try: iso_img = IsoImage(path) os_distro, os_version = iso_img.probe() bootable = True except IsoFormatError: bootable = False
Good! Thanks for explaining! =)
"KCHIMG0001E": _("An error occurred when probing image OS information."), "KCHIMG0002E": _("No OS information found in given image."), diff --git a/src/kimchi/isoinfo.py b/src/kimchi/isoinfo.py index 1e36a6d..18aa38e 100644 --- a/src/kimchi/isoinfo.py +++ b/src/kimchi/isoinfo.py @@ -147,7 +147,10 @@ class IsoImage(object): self.remote = self._is_iso_remote() self.volume_id = None self.bootable = False - self._scan() + try: + self._scan() + except: + raise IsoFormatError('KCHISO0009E', {'filename': self.path})
def _is_iso_remote(self): if os.path.isfile(self.path):

From: Royce Lv <lvroyce@linux.vnet.ibm.com> Signed-off-by: Royce Lv <lvroyce@linux.vnet.ibm.com> --- src/kimchi/mockmodel.py | 8 ++---- tests/test_model.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/kimchi/mockmodel.py b/src/kimchi/mockmodel.py index 413ac5d..3d9a919 100644 --- a/src/kimchi/mockmodel.py +++ b/src/kimchi/mockmodel.py @@ -274,17 +274,13 @@ class MockModel(Model): conn.storagePoolDefineXML(ET.tostring(root), 0) def _mock_storagevolumes_create(self, pool, params): - vol_source = ['file', 'url', 'capacity'] + vol_source = ['url', 'capacity'] index_list = list(i for i in range(len(vol_source)) if vol_source[i] in params) create_param = vol_source[index_list[0]] name = params.get('name') if name is None: - if create_param == 'file': - name = os.path.basename(params['file'].filename) - del params['file'] - params['capacity'] = 1024 - elif create_param == 'url': + if create_param == 'url': name = os.path.basename(params['url']) del params['url'] params['capacity'] = 1024 diff --git a/tests/test_model.py b/tests/test_model.py index a1aa612..a153d03 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -18,6 +18,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +from cherrypy._cpreqbody import Entity import grp import os import platform @@ -1613,6 +1614,72 @@ class ModelTests(unittest.TestCase): # remove files creates inst.repository_delete(repo_id) + @unittest.skipUnless(utils.running_as_root(), 'Must be run as root') + def test_volume_upload(self): + class fake_header(object): + def elements(self, key): + return None + + def __getattr__(self, key): + try: + return self.__getattribute__(key) + except AttributeError: + return lambda x, y: None + + inst = model.Model(objstore_loc=self.tmp_store) + vol_path = os.path.abspath('./run_tests.sh') + + def do_upload(tmp_file, vol_path, url): + index = 0 + chunk_size = 2 * 1000 + with open(vol_path, 'rb') as fd: + while True: + with open(tmp_file, 'wb') as tmp_fd: + fd.seek(index * chunk_size) + data = fd.read(chunk_size) + tmp_fd.write(data) + + # only file open for read can be handled by cherrypy + with open(tmp_file, 'rb') as tmp_fd: + # Hack cherrypy entity object and pass it to volume upload + entity = Entity(None, fake_header()) + entity.file = tmp_fd + param = {'index': str(index), + 'chunk_size': str(chunk_size), + 'chunk': entity} + inst.storagevolume_update(pool, params['name'], param) + vol_info = inst.storagevolume_lookup(pool, params['name']) + index = index + 1 + if len(data) < chunk_size: + return vol_info + + # Create a volume with raw format first, following upload will override it. + params = {'capacity': os.path.getsize(vol_path), + 'format': 'raw', + 'name': os.path.basename(vol_path)} + pool = 'default' + + with RollbackContext() as rollback: + task_response = inst.storagevolumes_create(pool, params) + rollback.prependDefer(inst.storagevolume_delete, pool, + params['name']) + taskid = task_response['id'] + vol_uri = task_response['target_uri'] + inst.task_wait(taskid) + self.assertEquals('finished', inst.task_lookup(taskid)['status']) + + f = tempfile.NamedTemporaryFile(delete=False) + rollback.prependDefer(os.remove, f.name) + resp = do_upload(f.name, vol_path, vol_uri) + + with open(vol_path) as vol_file: + vol_content = vol_file.read() + + with open(resp['path']) as copy_file: + cp_content = copy_file.read() + + self.assertEquals(vol_content, cp_content) + class BaseModelTests(unittest.TestCase): class FoosModel(object): -- 2.1.0

Could you exemplify the REST API calls to get an file uploaded? I suppose they are: POST /storagepools/<pool>/storagevolumes/ {filename: <name>, size: <total size>} And then several PUT requests PUT /storagepools/<pool>/storagevolumes/<name> {chunk: ...} On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
known issue: When using api to send requests without setting up session, cherrypy server will report error after hundred of calls, because race condition below: https://bitbucket.org/cherrypy/cherrypy/pull-request/50/fix-race-condition-i... But it works well with session setup. v3>v1, Use libvirt upload api instead of r/w file Use lock to synchronize upload to prevent Royce Lv (6): Update docs and json schema of storage volume upload Update controller to make update accept formdata params Add lock facility for storage volume upload Update model for storage volume update Fix incomplete record when uploading update test case for storage volume upload
docs/API.md | 7 ++-- src/kimchi/API.json | 19 +++++++++++ src/kimchi/control/base.py | 6 ++-- src/kimchi/i18n.py | 4 +++ src/kimchi/isoinfo.py | 5 ++- src/kimchi/mockmodel.py | 8 ++--- src/kimchi/model/storagevolumes.py | 61 +++++++++++++++------------------- src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++ tests/test_model.py | 67 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 177 insertions(+), 46 deletions(-)

Royce, Could you rebase, apply the reviews and resend this patch set? I want to have it for Kimchi 1.5 Thanks, Aline Manera On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
known issue: When using api to send requests without setting up session, cherrypy server will report error after hundred of calls, because race condition below: https://bitbucket.org/cherrypy/cherrypy/pull-request/50/fix-race-condition-i... But it works well with session setup. v3>v1, Use libvirt upload api instead of r/w file Use lock to synchronize upload to prevent Royce Lv (6): Update docs and json schema of storage volume upload Update controller to make update accept formdata params Add lock facility for storage volume upload Update model for storage volume update Fix incomplete record when uploading update test case for storage volume upload
docs/API.md | 7 ++-- src/kimchi/API.json | 19 +++++++++++ src/kimchi/control/base.py | 6 ++-- src/kimchi/i18n.py | 4 +++ src/kimchi/isoinfo.py | 5 ++- src/kimchi/mockmodel.py | 8 ++--- src/kimchi/model/storagevolumes.py | 61 +++++++++++++++------------------- src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++ tests/test_model.py | 67 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 177 insertions(+), 46 deletions(-)

On 04/02/2015 11:25 PM, Aline Manera wrote:
Royce,
Could you rebase, apply the reviews and resend this patch set? I want to have it for Kimchi 1.5
Thanks, Aline Manera I will, thanks, Aline.
On 28/01/2015 11:20, lvroyce@linux.vnet.ibm.com wrote:
From: Royce Lv <lvroyce@linux.vnet.ibm.com>
known issue: When using api to send requests without setting up session, cherrypy server will report error after hundred of calls, because race condition below: https://bitbucket.org/cherrypy/cherrypy/pull-request/50/fix-race-condition-i... But it works well with session setup. v3>v1, Use libvirt upload api instead of r/w file Use lock to synchronize upload to prevent Royce Lv (6): Update docs and json schema of storage volume upload Update controller to make update accept formdata params Add lock facility for storage volume upload Update model for storage volume update Fix incomplete record when uploading update test case for storage volume upload
docs/API.md | 7 ++-- src/kimchi/API.json | 19 +++++++++++ src/kimchi/control/base.py | 6 ++-- src/kimchi/i18n.py | 4 +++ src/kimchi/isoinfo.py | 5 ++- src/kimchi/mockmodel.py | 8 ++--- src/kimchi/model/storagevolumes.py | 61 +++++++++++++++------------------- src/kimchi/model/utils.py | 46 ++++++++++++++++++++++++++ tests/test_model.py | 67 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 177 insertions(+), 46 deletions(-)
participants (3)
-
Aline Manera
-
lvroyce@linux.vnet.ibm.com
-
Royce Lv