We are going to drop group file. Define group in tests as a preparatory
step.
The patch is generated by
    cd tests/qemu-iotests
    grep '^[0-9]\{3\} ' group | while read line; do
        file=$(awk '{print $1}' <<< "$line");
        groups=$(sed -e 's/^... //' <<< "$line");
        awk "NR==2{print \"# group: $groups\"}1" $file > tmp;
        cat tmp > $file;
    done
Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-Id: <20210116134424.82867-7-vsementsov@virtuozzo.com>
Signed-off-by: Eric Blake <eblake@redhat.com>
		
	
		
			
				
	
	
		
			262 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			262 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# group: rw
 | 
						|
#
 | 
						|
# Tests for internal snapshot.
 | 
						|
#
 | 
						|
# Copyright (C) 2013 IBM, Inc.
 | 
						|
#
 | 
						|
# Based on 055.
 | 
						|
#
 | 
						|
# This program is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License as published by
 | 
						|
# the Free Software Foundation; either version 2 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# This program is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
# GNU General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU General Public License
 | 
						|
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
#
 | 
						|
 | 
						|
import time
 | 
						|
import os
 | 
						|
import iotests
 | 
						|
from iotests import qemu_img, qemu_io
 | 
						|
 | 
						|
test_drv_base_name = 'drive'
 | 
						|
 | 
						|
class ImageSnapshotTestCase(iotests.QMPTestCase):
 | 
						|
    image_len = 120 * 1024 * 1024 # MB
 | 
						|
 | 
						|
    def __init__(self, *args):
 | 
						|
        self.expect = []
 | 
						|
        super(ImageSnapshotTestCase, self).__init__(*args)
 | 
						|
 | 
						|
    def _setUp(self, test_img_base_name, image_num):
 | 
						|
        self.vm = iotests.VM()
 | 
						|
        for i in range(0, image_num):
 | 
						|
            filename = '%s%d' % (test_img_base_name, i)
 | 
						|
            img = os.path.join(iotests.test_dir, filename)
 | 
						|
            device = '%s%d' % (test_drv_base_name, i)
 | 
						|
            qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len))
 | 
						|
            self.vm.add_drive(img)
 | 
						|
            self.expect.append({'image': img, 'device': device,
 | 
						|
                                'snapshots': [],
 | 
						|
                                'snapshots_name_counter': 0})
 | 
						|
        self.vm.launch()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.vm.shutdown()
 | 
						|
        for dev_expect in self.expect:
 | 
						|
            os.remove(dev_expect['image'])
 | 
						|
 | 
						|
    def createSnapshotInTransaction(self, snapshot_num, abort = False):
 | 
						|
        actions = []
 | 
						|
        for dev_expect in self.expect:
 | 
						|
            num = dev_expect['snapshots_name_counter']
 | 
						|
            for j in range(0, snapshot_num):
 | 
						|
                name = '%s_sn%d' % (dev_expect['device'], num)
 | 
						|
                num = num + 1
 | 
						|
                if abort == False:
 | 
						|
                    dev_expect['snapshots'].append({'name': name})
 | 
						|
                    dev_expect['snapshots_name_counter'] = num
 | 
						|
                actions.append({
 | 
						|
                    'type': 'blockdev-snapshot-internal-sync',
 | 
						|
                    'data': { 'device': dev_expect['device'],
 | 
						|
                              'name': name },
 | 
						|
                })
 | 
						|
 | 
						|
        if abort == True:
 | 
						|
            actions.append({
 | 
						|
                'type': 'abort',
 | 
						|
                'data': {},
 | 
						|
            })
 | 
						|
 | 
						|
        result = self.vm.qmp('transaction', actions = actions)
 | 
						|
 | 
						|
        if abort == True:
 | 
						|
            self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
        else:
 | 
						|
            self.assert_qmp(result, 'return', {})
 | 
						|
 | 
						|
    def verifySnapshotInfo(self):
 | 
						|
        result = self.vm.qmp('query-block')
 | 
						|
 | 
						|
        # Verify each expected result
 | 
						|
        for dev_expect in self.expect:
 | 
						|
            # 1. Find the returned image value and snapshot info
 | 
						|
            image_result = None
 | 
						|
            for device in result['return']:
 | 
						|
                if device['device'] == dev_expect['device']:
 | 
						|
                    image_result = device['inserted']['image']
 | 
						|
                    break
 | 
						|
            self.assertTrue(image_result != None)
 | 
						|
            # Do not consider zero snapshot case now
 | 
						|
            sn_list_result = image_result['snapshots']
 | 
						|
            sn_list_expect = dev_expect['snapshots']
 | 
						|
 | 
						|
            # 2. Verify it with expect
 | 
						|
            self.assertTrue(len(sn_list_result) == len(sn_list_expect))
 | 
						|
 | 
						|
            for sn_expect in sn_list_expect:
 | 
						|
                sn_result = None
 | 
						|
                for sn in sn_list_result:
 | 
						|
                    if sn_expect['name'] == sn['name']:
 | 
						|
                        sn_result = sn
 | 
						|
                        break
 | 
						|
                self.assertTrue(sn_result != None)
 | 
						|
                # Fill in the detail info
 | 
						|
                sn_expect.update(sn_result)
 | 
						|
 | 
						|
    def deleteSnapshot(self, device, id = None, name = None):
 | 
						|
        sn_list_expect = None
 | 
						|
        sn_expect = None
 | 
						|
 | 
						|
        self.assertTrue(id != None or name != None)
 | 
						|
 | 
						|
        # Fill in the detail info include ID
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
        #find the expected snapshot list
 | 
						|
        for dev_expect in self.expect:
 | 
						|
            if dev_expect['device'] == device:
 | 
						|
                sn_list_expect = dev_expect['snapshots']
 | 
						|
                break
 | 
						|
        self.assertTrue(sn_list_expect != None)
 | 
						|
 | 
						|
        if id != None and name != None:
 | 
						|
            for sn in sn_list_expect:
 | 
						|
                if sn['id'] == id and sn['name'] == name:
 | 
						|
                    sn_expect = sn
 | 
						|
                    result = \
 | 
						|
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                                      device = device,
 | 
						|
                                      id = id,
 | 
						|
                                      name = name)
 | 
						|
                    break
 | 
						|
        elif id != None:
 | 
						|
            for sn in sn_list_expect:
 | 
						|
                if sn['id'] == id:
 | 
						|
                    sn_expect = sn
 | 
						|
                    result = \
 | 
						|
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                                      device = device,
 | 
						|
                                      id = id)
 | 
						|
                    break
 | 
						|
        else:
 | 
						|
            for sn in sn_list_expect:
 | 
						|
                if sn['name'] == name:
 | 
						|
                    sn_expect = sn
 | 
						|
                    result = \
 | 
						|
                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                                      device = device,
 | 
						|
                                      name = name)
 | 
						|
                    break
 | 
						|
 | 
						|
        self.assertTrue(sn_expect != None)
 | 
						|
 | 
						|
        self.assert_qmp(result, 'return', sn_expect)
 | 
						|
        sn_list_expect.remove(sn_expect)
 | 
						|
 | 
						|
class TestSingleTransaction(ImageSnapshotTestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self._setUp('test_a.img', 1)
 | 
						|
 | 
						|
    def test_create(self):
 | 
						|
        self.createSnapshotInTransaction(1)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
    def test_error_name_empty(self):
 | 
						|
        actions = [{'type': 'blockdev-snapshot-internal-sync',
 | 
						|
                    'data': { 'device': self.expect[0]['device'],
 | 
						|
                              'name': '' },
 | 
						|
                  }]
 | 
						|
        result = self.vm.qmp('transaction', actions = actions)
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
    def test_error_device(self):
 | 
						|
        actions = [{'type': 'blockdev-snapshot-internal-sync',
 | 
						|
                    'data': { 'device': 'drive_error',
 | 
						|
                              'name': 'a' },
 | 
						|
                  }]
 | 
						|
        result = self.vm.qmp('transaction', actions = actions)
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
    def test_error_exist(self):
 | 
						|
        self.createSnapshotInTransaction(1)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        actions = [{'type': 'blockdev-snapshot-internal-sync',
 | 
						|
                    'data': { 'device': self.expect[0]['device'],
 | 
						|
                              'name': self.expect[0]['snapshots'][0] },
 | 
						|
                  }]
 | 
						|
        result = self.vm.qmp('transaction', actions = actions)
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
class TestMultipleTransaction(ImageSnapshotTestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self._setUp('test_b.img', 2)
 | 
						|
 | 
						|
    def test_create(self):
 | 
						|
        self.createSnapshotInTransaction(3)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
    def test_abort(self):
 | 
						|
        self.createSnapshotInTransaction(2)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        self.createSnapshotInTransaction(3, abort = True)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
class TestSnapshotDelete(ImageSnapshotTestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self._setUp('test_c.img', 1)
 | 
						|
 | 
						|
    def test_delete_with_id(self):
 | 
						|
        self.createSnapshotInTransaction(2)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        self.deleteSnapshot(self.expect[0]['device'],
 | 
						|
                            id = self.expect[0]['snapshots'][0]['id'])
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
    def test_delete_with_name(self):
 | 
						|
        self.createSnapshotInTransaction(3)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        self.deleteSnapshot(self.expect[0]['device'],
 | 
						|
                            name = self.expect[0]['snapshots'][1]['name'])
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
    def test_delete_with_id_and_name(self):
 | 
						|
        self.createSnapshotInTransaction(4)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        self.deleteSnapshot(self.expect[0]['device'],
 | 
						|
                            id = self.expect[0]['snapshots'][2]['id'],
 | 
						|
                            name = self.expect[0]['snapshots'][2]['name'])
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
 | 
						|
 | 
						|
    def test_error_device(self):
 | 
						|
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                              device = 'drive_error',
 | 
						|
                              id = '0')
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
    def test_error_no_id_and_name(self):
 | 
						|
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                              device = self.expect[0]['device'])
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
    def test_error_snapshot_not_exist(self):
 | 
						|
        self.createSnapshotInTransaction(2)
 | 
						|
        self.verifySnapshotInfo()
 | 
						|
        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
 | 
						|
                              device = self.expect[0]['device'],
 | 
						|
                              id = self.expect[0]['snapshots'][0]['id'],
 | 
						|
                              name = self.expect[0]['snapshots'][1]['name'])
 | 
						|
        self.assert_qmp(result, 'error/class', 'GenericError')
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    iotests.main(supported_fmts=['qcow2'],
 | 
						|
                 supported_protocols=['file'])
 |