# Copyright 2012 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import io import sys from unittest import mock from oslo_utils import encodeutils from requests import Response # NOTE(jokke): simplified transition to py3, behaves like py2 xrange import testtools from glanceclient.common import utils REQUEST_ID = 'req-1234' def create_response_obj_with_req_id(req_id): resp = Response() resp.headers['x-openstack-request-id'] = req_id return resp class TestUtils(testtools.TestCase): def test_make_size_human_readable(self): self.assertEqual("106B", utils.make_size_human_readable(106)) self.assertEqual("1000kB", utils.make_size_human_readable(1024000)) self.assertEqual("1MB", utils.make_size_human_readable(1048576)) self.assertEqual("1.4GB", utils.make_size_human_readable(1476395008)) self.assertEqual("9.3MB", utils.make_size_human_readable(9761280)) self.assertEqual("0B", utils.make_size_human_readable(None)) def test_get_new_file_size(self): size = 98304 file_obj = io.StringIO('X' * size) try: self.assertEqual(size, utils.get_file_size(file_obj)) # Check that get_file_size didn't change original file position. self.assertEqual(0, file_obj.tell()) finally: file_obj.close() def test_get_consumed_file_size(self): size, consumed = 98304, 304 file_obj = io.StringIO('X' * size) file_obj.seek(consumed) try: self.assertEqual(size, utils.get_file_size(file_obj)) # Check that get_file_size didn't change original file position. self.assertEqual(consumed, file_obj.tell()) finally: file_obj.close() def test_prettytable(self): class Struct(object): def __init__(self, **entries): self.__dict__.update(entries) # test that the prettytable output is wellformatted (left-aligned) columns = ['ID', 'Name'] val = ['Name1', 'another', 'veeeery long'] images = [Struct(**{'id': i ** 16, 'name': val[i]}) for i in range(len(val))] saved_stdout = sys.stdout try: sys.stdout = output_list = io.StringIO() utils.print_list(images, columns) sys.stdout = output_dict = io.StringIO() utils.print_dict({'K': 'k', 'Key': 'veeeeeeeeeeeeeeeeeeeeeeee' 'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' 'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' 'eeeeeeeeeeeery long value'}, max_column_width=60) finally: sys.stdout = saved_stdout self.assertEqual('''\ +-------+--------------+ | ID | Name | +-------+--------------+ | | Name1 | | 1 | another | | 65536 | veeeery long | +-------+--------------+ ''', output_list.getvalue()) self.assertEqual('''\ +----------+--------------------------------------------------------------+ | Property | Value | +----------+--------------------------------------------------------------+ | K | k | | Key | veeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee | | | eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee | | | ery long value | +----------+--------------------------------------------------------------+ ''', output_dict.getvalue()) def test_print_list_with_list_no_unicode(self): class Struct(object): def __init__(self, **entries): self.__dict__.update(entries) # test for removing 'u' from lists in print_list output columns = ['ID', 'Tags'] images = [Struct(**{'id': 'b8e1c57e-907a-4239-aed8-0df8e54b8d2d', 'tags': ['Name1', 'Tag_123', 'veeeery long']})] saved_stdout = sys.stdout try: sys.stdout = output_list = io.StringIO() utils.print_list(images, columns) finally: sys.stdout = saved_stdout self.assertEqual('''\ +--------------------------------------+--------------------------------------+ | ID | Tags | +--------------------------------------+--------------------------------------+ | b8e1c57e-907a-4239-aed8-0df8e54b8d2d | ['Name1', 'Tag_123', 'veeeery long'] | +--------------------------------------+--------------------------------------+ ''', output_list.getvalue()) def test_print_image_virtual_size_available(self): image = {'id': '42', 'virtual_size': 1337} saved_stdout = sys.stdout try: sys.stdout = output_list = io.StringIO() utils.print_image(image) finally: sys.stdout = saved_stdout self.assertEqual('''\ +--------------+-------+ | Property | Value | +--------------+-------+ | id | 42 | | virtual_size | 1337 | +--------------+-------+ ''', output_list.getvalue()) def test_print_image_virtual_size_not_available(self): image = {'id': '42', 'virtual_size': None} saved_stdout = sys.stdout try: sys.stdout = output_list = io.StringIO() utils.print_image(image) finally: sys.stdout = saved_stdout self.assertEqual('''\ +--------------+---------------+ | Property | Value | +--------------+---------------+ | id | 42 | | virtual_size | Not available | +--------------+---------------+ ''', output_list.getvalue()) def test_schema_args_with_list_types(self): # NOTE(flaper87): Regression for bug # https://bugs.launchpad.net/python-glanceclient/+bug/1401032 def schema_getter(_type='string', enum=False): prop = { 'type': ['null', _type], 'description': 'Test schema', } prop_readonly = { 'type': ['null', _type], 'readOnly': True, 'description': 'Test schema read-only', } if enum: prop['enum'] = [None, 'opt-1', 'opt-2'] prop_readonly['enum'] = [None, 'opt-ro-1', 'opt-ro-2'] def actual_getter(): return { 'additionalProperties': False, 'required': ['name'], 'name': 'test_schema', 'properties': { 'test': prop, 'readonly-test': prop_readonly, } } return actual_getter def dummy_func(): pass decorated = utils.schema_args(schema_getter())(dummy_func) self.assertEqual(len(decorated.__dict__['arguments']), 1) arg, opts = decorated.__dict__['arguments'][0] self.assertIn('--test', arg) self.assertEqual(encodeutils.safe_decode, opts['type']) decorated = utils.schema_args(schema_getter('integer'))(dummy_func) arg, opts = decorated.__dict__['arguments'][0] self.assertIn('--test', arg) self.assertEqual(int, opts['type']) decorated = utils.schema_args(schema_getter(enum=True))(dummy_func) arg, opts = decorated.__dict__['arguments'][0] self.assertIn('--test', arg) self.assertEqual(encodeutils.safe_decode, opts['type']) self.assertIn('None, opt-1, opt-2', opts['help']) # Make sure we use strict checking for boolean values. decorated = utils.schema_args(schema_getter('boolean'))(dummy_func) arg, opts = decorated.__dict__['arguments'][0] type_function = opts['type'] self.assertEqual(type_function('False'), False) self.assertEqual(type_function('True'), True) self.assertRaises(ValueError, type_function, 'foo') def test_iterable_closes(self): # Regression test for bug 1461678. def _iterate(i): for chunk in i: raise(IOError) data = io.StringIO('somestring') data.close = mock.Mock() i = utils.IterableWithLength(data, 10) self.assertRaises(IOError, _iterate, i) data.close.assert_called_with() def test_safe_header(self): self.assertEqual(('somekey', 'somevalue'), utils.safe_header('somekey', 'somevalue')) self.assertEqual(('somekey', None), utils.safe_header('somekey', None)) for sensitive_header in utils.SENSITIVE_HEADERS: (name, value) = utils.safe_header( sensitive_header, encodeutils.safe_encode('somestring')) self.assertEqual(sensitive_header, name) self.assertTrue(value.startswith("{SHA1}")) (name, value) = utils.safe_header(sensitive_header, None) self.assertEqual(sensitive_header, name) self.assertIsNone(value) def test_generator_proxy(self): def _test_decorator(): i = 1 resp = create_response_obj_with_req_id(REQUEST_ID) while True: yield i, resp i += 1 gen_obj = _test_decorator() proxy = utils.GeneratorProxy(gen_obj) # Proxy object should succeed in behaving as the # wrapped object self.assertIsInstance(proxy, type(gen_obj)) # Initially request_ids should be empty self.assertEqual([], proxy.request_ids) # Only after we have started iterating we should # see non-empty `request_ids` property proxy.next() self.assertEqual([REQUEST_ID], proxy.request_ids) # Even after multiple iterations `request_ids` property # should only contain one request id proxy.next() proxy.next() self.assertEqual(1, len(proxy.request_ids)) def test_request_id_proxy(self): def test_data(val): resp = create_response_obj_with_req_id(REQUEST_ID) return val, resp # Object of any type except decorator can be passed to test_data proxy = utils.RequestIdProxy(test_data(11)) # Verify that proxy object has a property `request_ids` and it is # a list of one request id self.assertEqual([REQUEST_ID], proxy.request_ids)