260 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
		
		
			
		
	
	
			260 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
|   | #!/usr/bin/env python | ||
|  | # | ||
|  | # Tests for internal snapshot. | ||
|  | # | ||
|  | # Copyright (C) 2013 IBM, Inc. | ||
|  | # | ||
|  | # Based on 055. | ||
|  | # | ||
|  | # This program is free software; you can redistribute it and/or modify | ||
|  | # it under the terms of the GNU General Public License as published by | ||
|  | # the Free Software Foundation; either version 2 of the License, or | ||
|  | # (at your option) any later version. | ||
|  | # | ||
|  | # This program is distributed in the hope that it will be useful, | ||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||
|  | # GNU General Public License for more details. | ||
|  | # | ||
|  | # You should have received a copy of the GNU General Public License | ||
|  | # along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||
|  | # | ||
|  | 
 | ||
|  | import time | ||
|  | import os | ||
|  | import iotests | ||
|  | from iotests import qemu_img, qemu_io | ||
|  | 
 | ||
|  | test_drv_base_name = 'drive' | ||
|  | 
 | ||
|  | class ImageSnapshotTestCase(iotests.QMPTestCase): | ||
|  |     image_len = 120 * 1024 * 1024 # MB | ||
|  | 
 | ||
|  |     def __init__(self, *args): | ||
|  |         self.expect = [] | ||
|  |         super(ImageSnapshotTestCase, self).__init__(*args) | ||
|  | 
 | ||
|  |     def _setUp(self, test_img_base_name, image_num): | ||
|  |         self.vm = iotests.VM() | ||
|  |         for i in range(0, image_num): | ||
|  |             filename = '%s%d' % (test_img_base_name, i) | ||
|  |             img = os.path.join(iotests.test_dir, filename) | ||
|  |             device = '%s%d' % (test_drv_base_name, i) | ||
|  |             qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len)) | ||
|  |             self.vm.add_drive(img) | ||
|  |             self.expect.append({'image': img, 'device': device, | ||
|  |                                 'snapshots': [], | ||
|  |                                 'snapshots_name_counter': 0}) | ||
|  |         self.vm.launch() | ||
|  | 
 | ||
|  |     def tearDown(self): | ||
|  |         self.vm.shutdown() | ||
|  |         for dev_expect in self.expect: | ||
|  |             os.remove(dev_expect['image']) | ||
|  | 
 | ||
|  |     def createSnapshotInTransaction(self, snapshot_num, abort = False): | ||
|  |         actions = [] | ||
|  |         for dev_expect in self.expect: | ||
|  |             num = dev_expect['snapshots_name_counter'] | ||
|  |             for j in range(0, snapshot_num): | ||
|  |                 name = '%s_sn%d' % (dev_expect['device'], num) | ||
|  |                 num = num + 1 | ||
|  |                 if abort == False: | ||
|  |                     dev_expect['snapshots'].append({'name': name}) | ||
|  |                     dev_expect['snapshots_name_counter'] = num | ||
|  |                 actions.append({ | ||
|  |                     'type': 'blockdev-snapshot-internal-sync', | ||
|  |                     'data': { 'device': dev_expect['device'], | ||
|  |                               'name': name }, | ||
|  |                 }) | ||
|  | 
 | ||
|  |         if abort == True: | ||
|  |             actions.append({ | ||
|  |                 'type': 'abort', | ||
|  |                 'data': {}, | ||
|  |             }) | ||
|  | 
 | ||
|  |         result = self.vm.qmp('transaction', actions = actions) | ||
|  | 
 | ||
|  |         if abort == True: | ||
|  |             self.assert_qmp(result, 'error/class', 'GenericError') | ||
|  |         else: | ||
|  |             self.assert_qmp(result, 'return', {}) | ||
|  | 
 | ||
|  |     def verifySnapshotInfo(self): | ||
|  |         result = self.vm.qmp('query-block') | ||
|  | 
 | ||
|  |         # Verify each expected result | ||
|  |         for dev_expect in self.expect: | ||
|  |             # 1. Find the returned image value and snapshot info | ||
|  |             image_result = None | ||
|  |             for device in result['return']: | ||
|  |                 if device['device'] == dev_expect['device']: | ||
|  |                     image_result = device['inserted']['image'] | ||
|  |                     break | ||
|  |             self.assertTrue(image_result != None) | ||
|  |             # Do not consider zero snapshot case now | ||
|  |             sn_list_result = image_result['snapshots'] | ||
|  |             sn_list_expect = dev_expect['snapshots'] | ||
|  | 
 | ||
|  |             # 2. Verify it with expect | ||
|  |             self.assertTrue(len(sn_list_result) == len(sn_list_expect)) | ||
|  | 
 | ||
|  |             for sn_expect in sn_list_expect: | ||
|  |                 sn_result = None | ||
|  |                 for sn in sn_list_result: | ||
|  |                     if sn_expect['name'] == sn['name']: | ||
|  |                         sn_result = sn | ||
|  |                         break | ||
|  |                 self.assertTrue(sn_result != None) | ||
|  |                 # Fill in the detail info | ||
|  |                 sn_expect.update(sn_result) | ||
|  | 
 | ||
|  |     def deleteSnapshot(self, device, id = None, name = None): | ||
|  |         sn_list_expect = None | ||
|  |         sn_expect = None | ||
|  | 
 | ||
|  |         self.assertTrue(id != None or name != None) | ||
|  | 
 | ||
|  |         # Fill in the detail info include ID | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  |         #find the expected snapshot list | ||
|  |         for dev_expect in self.expect: | ||
|  |             if dev_expect['device'] == device: | ||
|  |                 sn_list_expect = dev_expect['snapshots'] | ||
|  |                 break | ||
|  |         self.assertTrue(sn_list_expect != None) | ||
|  | 
 | ||
|  |         if id != None and name != None: | ||
|  |             for sn in sn_list_expect: | ||
|  |                 if sn['id'] == id and sn['name'] == name: | ||
|  |                     sn_expect = sn | ||
|  |                     result = \ | ||
|  |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                                       device = device, | ||
|  |                                       id = id, | ||
|  |                                       name = name) | ||
|  |                     break | ||
|  |         elif id != None: | ||
|  |             for sn in sn_list_expect: | ||
|  |                 if sn['id'] == id: | ||
|  |                     sn_expect = sn | ||
|  |                     result = \ | ||
|  |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                                       device = device, | ||
|  |                                       id = id) | ||
|  |                     break | ||
|  |         else: | ||
|  |             for sn in sn_list_expect: | ||
|  |                 if sn['name'] == name: | ||
|  |                     sn_expect = sn | ||
|  |                     result = \ | ||
|  |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                                       device = device, | ||
|  |                                       name = name) | ||
|  |                     break | ||
|  | 
 | ||
|  |         self.assertTrue(sn_expect != None) | ||
|  | 
 | ||
|  |         self.assert_qmp(result, 'return', sn_expect) | ||
|  |         sn_list_expect.remove(sn_expect) | ||
|  | 
 | ||
|  | class TestSingleTransaction(ImageSnapshotTestCase): | ||
|  |     def setUp(self): | ||
|  |         self._setUp('test_a.img', 1) | ||
|  | 
 | ||
|  |     def test_create(self): | ||
|  |         self.createSnapshotInTransaction(1) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  |     def test_error_name_empty(self): | ||
|  |         actions = [{'type': 'blockdev-snapshot-internal-sync', | ||
|  |                     'data': { 'device': self.expect[0]['device'], | ||
|  |                               'name': '' }, | ||
|  |                   }] | ||
|  |         result = self.vm.qmp('transaction', actions = actions) | ||
|  |         self.assert_qmp(result, 'error/class', 'GenericError') | ||
|  | 
 | ||
|  |     def test_error_device(self): | ||
|  |         actions = [{'type': 'blockdev-snapshot-internal-sync', | ||
|  |                     'data': { 'device': 'drive_error', | ||
|  |                               'name': 'a' }, | ||
|  |                   }] | ||
|  |         result = self.vm.qmp('transaction', actions = actions) | ||
|  |         self.assert_qmp(result, 'error/class', 'DeviceNotFound') | ||
|  | 
 | ||
|  |     def test_error_exist(self): | ||
|  |         self.createSnapshotInTransaction(1) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         actions = [{'type': 'blockdev-snapshot-internal-sync', | ||
|  |                     'data': { 'device': self.expect[0]['device'], | ||
|  |                               'name': self.expect[0]['snapshots'][0] }, | ||
|  |                   }] | ||
|  |         result = self.vm.qmp('transaction', actions = actions) | ||
|  |         self.assert_qmp(result, 'error/class', 'GenericError') | ||
|  | 
 | ||
|  | class TestMultipleTransaction(ImageSnapshotTestCase): | ||
|  |     def setUp(self): | ||
|  |         self._setUp('test_b.img', 2) | ||
|  | 
 | ||
|  |     def test_create(self): | ||
|  |         self.createSnapshotInTransaction(3) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  |     def test_abort(self): | ||
|  |         self.createSnapshotInTransaction(2) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         self.createSnapshotInTransaction(3, abort = True) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  | class TestSnapshotDelete(ImageSnapshotTestCase): | ||
|  |     def setUp(self): | ||
|  |         self._setUp('test_c.img', 1) | ||
|  | 
 | ||
|  |     def test_delete_with_id(self): | ||
|  |         self.createSnapshotInTransaction(2) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         self.deleteSnapshot(self.expect[0]['device'], | ||
|  |                             id = self.expect[0]['snapshots'][0]['id']) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  |     def test_delete_with_name(self): | ||
|  |         self.createSnapshotInTransaction(3) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         self.deleteSnapshot(self.expect[0]['device'], | ||
|  |                             name = self.expect[0]['snapshots'][1]['name']) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  |     def test_delete_with_id_and_name(self): | ||
|  |         self.createSnapshotInTransaction(4) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         self.deleteSnapshot(self.expect[0]['device'], | ||
|  |                             id = self.expect[0]['snapshots'][2]['id'], | ||
|  |                             name = self.expect[0]['snapshots'][2]['name']) | ||
|  |         self.verifySnapshotInfo() | ||
|  | 
 | ||
|  | 
 | ||
|  |     def test_error_device(self): | ||
|  |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                               device = 'drive_error', | ||
|  |                               id = '0') | ||
|  |         self.assert_qmp(result, 'error/class', 'DeviceNotFound') | ||
|  | 
 | ||
|  |     def test_error_no_id_and_name(self): | ||
|  |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                               device = self.expect[0]['device']) | ||
|  |         self.assert_qmp(result, 'error/class', 'GenericError') | ||
|  | 
 | ||
|  |     def test_error_snapshot_not_exist(self): | ||
|  |         self.createSnapshotInTransaction(2) | ||
|  |         self.verifySnapshotInfo() | ||
|  |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | ||
|  |                               device = self.expect[0]['device'], | ||
|  |                               id = self.expect[0]['snapshots'][0]['id'], | ||
|  |                               name = self.expect[0]['snapshots'][1]['name']) | ||
|  |         self.assert_qmp(result, 'error/class', 'GenericError') | ||
|  | 
 | ||
|  | if __name__ == '__main__': | ||
|  |     iotests.main(supported_fmts=['qcow2']) |