diff --git a/designate/tests/test_utils.py b/designate/tests/test_utils.py deleted file mode 100644 index 7ffacee57..000000000 --- a/designate/tests/test_utils.py +++ /dev/null @@ -1,194 +0,0 @@ -# Copyright 2012 Managed I.T. -# -# Author: Kiall Mac Innes -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import os -import functools -import tempfile - -import six -import testtools -from mock import Mock -from jinja2 import Template - -from designate.tests import TestCase -from designate import exceptions -from designate import utils - - -class TestUtils(TestCase): - def test_resource_string(self): - name = ['templates', 'bind9-zone.jinja2'] - - resource_string = utils.resource_string(*name) - - self.assertIsNotNone(resource_string) - - def test_resource_string_missing(self): - name = 'invalid.jinja2' - - with testtools.ExpectedException(exceptions.ResourceNotFound): - utils.resource_string(name) - - def test_resource_string_empty_args(self): - with testtools.ExpectedException(ValueError): - utils.resource_string() - - def test_load_schema_missing(self): - with testtools.ExpectedException(exceptions.ResourceNotFound): - utils.load_schema('v1', 'missing') - - def test_load_template(self): - name = 'bind9-zone.jinja2' - - template = utils.load_template(name) - - self.assertIsInstance(template, Template) - - def test_load_template_keep_trailing_newline(self): - name = 'bind9-zone.jinja2' - template = utils.load_template(name) - self.assertTrue(template.environment.keep_trailing_newline) - - def test_load_template_missing(self): - name = 'invalid.jinja2' - - with testtools.ExpectedException(exceptions.ResourceNotFound): - utils.load_template(name) - - def test_render_template(self): - template = Template("Hello {{name}}") - - result = utils.render_template(template, name="World") - - self.assertEqual('Hello World', result) - - def test_render_template_to_file(self): - output_path = tempfile.mktemp() - - template = Template("Hello {{name}}") - - utils.render_template_to_file(template, output_path=output_path, - name="World") - - self.assertTrue(os.path.exists(output_path)) - - try: - with open(output_path, 'r') as fh: - self.assertEqual('Hello World', fh.read()) - finally: - os.unlink(output_path) - - def test_increment_serial(self): - ret_serial = utils.increment_serial(serial=20) - self.assertGreater(ret_serial, 20) - - def test_is_uuid_like(self): - uuid_str = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' - self.assertTrue(utils.is_uuid_like(uuid_str)) - uuid_str = '678' - self.assertFalse(utils.is_uuid_like(uuid_str)) - - def test_split_host_port(self): - host_port = "abc:abc" - host, port = utils.split_host_port(host_port) - self.assertEqual((host, port), ("abc:abc", 53)) - - host_port = "abc:25" - host, port = utils.split_host_port(host_port) - self.assertEqual((host, port), ("abc", 25)) - - def test_get_paging_params_invalid_limit(self): - context = Mock() - for value in [9223372036854775809, -1]: - with testtools.ExpectedException(exceptions.InvalidLimit): - utils.get_paging_params(context, {'limit': value}, []) - - def test_get_paging_params_max_limit(self): - context = Mock() - self.config(max_limit_v2=1000, group='service:api') - result = utils.get_paging_params(context, {'limit': "max"}, []) - self.assertEqual(result[1], 1000) - - def test_get_paging_params_invalid_sort_dir(self): - context = Mock() - with testtools.ExpectedException(exceptions.InvalidSortDir): - utils.get_paging_params(context, {'sort_dir': "dsc"}, []) - - def test_get_paging_params_invalid_sort_key(self): - context = Mock() - with testtools.ExpectedException(exceptions.InvalidSortKey): - utils.get_paging_params(context, {'sort_key': "dsc"}, - ['asc', 'desc']) - - -def def_method(f, *args, **kwargs): - @functools.wraps(f) - def new_method(self): - return f(self, *args, **kwargs) - return new_method - - -def parameterized_class(cls): - """A class decorator for running parameterized test cases. - Mark your class with @parameterized_class. - Mark your test cases with @parameterized. - """ - test_functions = { - k: v for k, v in vars(cls).items() if k.startswith('test') - } - for name, f in test_functions.items(): - if not hasattr(f, '_test_data'): - continue - - # remove the original test function from the class - delattr(cls, name) - - # add a new test function to the class for each entry in f._test_data - for tag, args in f._test_data.items(): - new_name = "{0}_{1}".format(f.__name__, tag) - if hasattr(cls, new_name): - raise Exception( - "Parameterized test case '{0}.{1}' created from '{0}.{2}' " - "already exists".format(cls.__name__, new_name, name)) - - # Using `def new_method(self): f(self, **args)` is not sufficient - # (all new_methods use the same args value due to late binding). - # Instead, use this factory function. - new_method = def_method(f, **args) - - # To add a method to a class, available for all instances: - # MyClass.method = types.MethodType(f, None, MyClass) - setattr(cls, new_name, six.create_unbound_method(new_method, cls)) - return cls - - -def parameterized(data): - """A function decorator for parameterized test cases. - Example: - @parameterized({ - 'zero': dict(val=0), - 'one': dict(val=1), - }) - def test_val(self, val): - self.assertEqual(self.get_val(), val) - The above will generate two test cases: - `test_val_zero` which runs with val=0 - `test_val_one` which runs with val=1 - :param data: A dictionary that looks like {tag: {arg1: val1, ...}} - """ - def wrapped(f): - f._test_data = data - return f - return wrapped diff --git a/designate/tests/unit/test_utils.py b/designate/tests/unit/test_utils.py index 14d69695a..0285e18eb 100644 --- a/designate/tests/unit/test_utils.py +++ b/designate/tests/unit/test_utils.py @@ -11,19 +11,407 @@ # under the License. import random +import jinja2 import mock import oslotest.base +from oslo_concurrency import processutils +from oslo_config import cfg +from oslo_config import fixture as cfg_fixture +from oslo_utils import timeutils +from designate import exceptions from designate import utils from designate.tests import fixtures +CONF = cfg.CONF -class TestSocket(oslotest.base.BaseTestCase): + +class TestUtils(oslotest.base.BaseTestCase): def setUp(self): - super(TestSocket, self).setUp() + super(TestUtils, self).setUp() self.stdlog = fixtures.StandardLogging() + self.useFixture(cfg_fixture.Config(CONF)) self.useFixture(self.stdlog) + @mock.patch('os.path.exists') + @mock.patch('os.path.abspath') + def test_find_config(self, mock_abspath, mock_path_exists): + CONF.set_override('pybasedir', '/tmp/workspace/designate') + + mock_path_exists.side_effect = [True, False, False, False, False] + mock_abspath.return_value = '/tmp/designate/designate.conf' + + config_files = utils.find_config('designate.conf') + + self.assertEqual(['/tmp/designate/designate.conf'], config_files) + mock_abspath.assert_called_once() + + @mock.patch.object(processutils, 'execute') + def test_execute(self, mock_execute): + mock_execute.return_value = ('designate.conf\npools.yaml\n', '') + + out, err = utils.execute( + '/bin/ls', '/etc/designate/', + run_as_root=False + ) + + mock_execute.assert_called_once_with( + '/bin/ls', '/etc/designate/', + root_helper='sudo designate-rootwrap /etc/designate/rootwrap.conf', + run_as_root=False + ) + + self.assertEqual('designate.conf\npools.yaml\n', out) + self.assertFalse(err) + + @mock.patch.object(processutils, 'execute') + def test_execute_with_rootwrap(self, mock_execute): + CONF.set_override('root_helper', 'sudo designate-test') + + mock_execute.return_value = ('designate.conf\npools.yaml\n', '') + + out, err = utils.execute( + '/bin/ls', '/etc/designate/', + run_as_root=True + ) + + mock_execute.assert_called_once_with( + '/bin/ls', '/etc/designate/', + root_helper='sudo designate-test', + run_as_root=True + ) + + self.assertEqual('designate.conf\npools.yaml\n', out) + self.assertFalse(err) + + def test_deep_dict_merge(self): + a = { + 'a': {'dns': 'record'}, + 'b': 'b', + 'c': 'c', + } + + b = { + 'a': {'domain': 'zone'}, + 'c': 1, + 'd': 'd', + } + + self.assertEqual( + { + 'a': { + 'dns': 'record', 'domain': 'zone' + }, + 'b': 'b', 'c': 1, 'd': 'd' + }, + utils.deep_dict_merge(a, b) + ) + + def test_deep_dict_merge_not_dict(self): + result = utils.deep_dict_merge(dict(), list()) + + self.assertIsInstance(result, list) + + def test_get_proxies(self): + CONF.set_override('no_proxy', 'example.com', 'proxy') + CONF.set_override('http_proxy', 'example.org', 'proxy') + CONF.set_override('https_proxy', 'example.net', 'proxy') + + result = utils.get_proxies() + + self.assertEqual(['example.com'], result.get('no_proxy')) + self.assertEqual('example.org', result.get('http')) + self.assertEqual('example.net', result.get('https')) + + def test_get_proxies_default_values(self): + result = utils.get_proxies() + + self.assertIsNone(result.get('no_proxy')) + self.assertIsNone(result.get('http')) + self.assertIsNone(result.get('https')) + + def test_get_proxies_with_no_proxy(self): + CONF.set_override('no_proxy', 'example.org', 'proxy') + + result = utils.get_proxies() + + self.assertEqual(['example.org'], result.get('no_proxy')) + self.assertIsNone(result.get('http')) + self.assertIsNone(result.get('https')) + + def test_get_proxies_with_http_proxy(self): + CONF.set_override('http_proxy', 'example.org', 'proxy') + + result = utils.get_proxies() + + self.assertIsNone(result.get('no_proxy')) + self.assertEqual('example.org', result.get('http')) + self.assertEqual('example.org', result.get('https')) + + def test_get_proxies_with_https_proxy(self): + CONF.set_override('https_proxy', 'example.org', 'proxy') + + result = utils.get_proxies() + + self.assertIsNone(result.get('no_proxy')) + self.assertIsNone(result.get('http')) + self.assertEqual('example.org', result.get('https')) + + def test_resource_string(self): + resource_string = utils.resource_string( + 'templates', 'bind9-zone.jinja2' + ) + + self.assertIsNotNone(resource_string) + + def test_resource_string_missing(self): + self.assertRaisesRegex( + exceptions.ResourceNotFound, + 'Could not find the requested resource', + utils.resource_string, 'invalid.jinja2' + ) + + def test_resource_string_empty_args(self): + self.assertRaises( + ValueError, + utils.resource_string + ) + + def test_load_schema_missing(self): + self.assertRaisesRegex( + exceptions.ResourceNotFound, + 'Could not find the requested resource', + utils.load_schema, 'v1', 'missing' + ) + + @mock.patch.object(utils, 'resource_string') + def test_load_template(self, mock_resource_string): + mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8') + + template = utils.load_template('bind9-zone.jinja2') + + self.assertIsInstance(template, jinja2.Template) + + @mock.patch.object(utils, 'resource_string') + def test_load_template_keep_trailing_newline(self, mock_resource_string): + mock_resource_string.return_value = 'Hello {{name}}'.encode('utf-8') + + template = utils.load_template('bind9-zone.jinja2') + + self.assertTrue(template.environment.keep_trailing_newline) + + def test_load_template_missing(self): + self.assertRaises( + exceptions.ResourceNotFound, + utils.load_template, 'invalid.jinja2' + ) + + def test_render_template(self): + template = jinja2.Template('Hello {{name}}') + + result = utils.render_template(template, name='World') + + self.assertEqual('Hello World', result) + + @mock.patch('six.moves.builtins.open', new_callable=mock.mock_open) + @mock.patch('os.path.exists') + def test_render_template_to_file(self, mock_exists, mock_open): + mock_exists.return_value = True + + output_path = '/tmp/designate/resources/templates/hello.jinja2' + + template = jinja2.Template('Hello {{name}}') + + utils.render_template_to_file( + template, makedirs=False, output_path=output_path, name='World' + ) + + mock_open.assert_called_once_with(output_path, 'w') + mock_open().write.assert_called_once_with('Hello World') + + @mock.patch('six.moves.builtins.open', new_callable=mock.mock_open) + @mock.patch('os.path.exists') + @mock.patch('os.makedirs') + def test_render_template_to_file_with_makedirs(self, mock_makedirs, + mock_exists, + mock_open): + mock_exists.return_value = False + + output_path = '/tmp/designate/resources/templates/hello.jinja2' + + template = jinja2.Template('Hello {{name}}') + + utils.render_template_to_file( + template, makedirs=True, output_path=output_path, name='World' + ) + + mock_makedirs.assert_called_once_with( + '/tmp/designate/resources/templates' + ) + mock_open.assert_called_once_with(output_path, 'w') + mock_open().write.assert_called_once_with('Hello World') + + @mock.patch.object(timeutils, 'utcnow_ts') + def test_increment_serial_lower_than_ts(self, mock_utcnow_ts): + mock_utcnow_ts.return_value = 1561698354 + + ret_serial = utils.increment_serial(serial=1) + + self.assertEqual(1561698354, ret_serial) + + @mock.patch.object(timeutils, 'utcnow_ts') + def test_increment_serial_higher_than_ts(self, mock_utcnow_ts): + mock_utcnow_ts.return_value = 1561698354 + + ret_serial = utils.increment_serial(serial=1561698354 * 2) + + self.assertEqual(1561698354 * 2 + 1, ret_serial) + + def test_is_uuid_like(self): + self.assertTrue( + utils.is_uuid_like('ce9fcd6b-d546-4397-8a49-8ceaec37cb64') + ) + + def test_is_not_uuid_like(self): + self.assertFalse(utils.is_uuid_like('678')) + + def test_split_host_port(self): + host, port = utils.split_host_port('abc:25') + self.assertEqual(('abc', 25), (host, port)) + + def test_split_host_port_with_invalid_port(self): + host, port = utils.split_host_port('abc:abc') + self.assertEqual(('abc:abc', 53), (host, port)) + + def test_get_paging_params(self): + CONF.set_override('default_limit_v2', 100, 'service:api') + + context = mock.Mock() + params = { + 'updated_at': None, + 'created_at': '2019-06-28T04:17:34.000000', + 'pattern': 'blacklisted.com.', + 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea', + } + + marker, limit, sort_key, sort_dir = utils.get_paging_params( + context, params, ['created_at', 'id', 'updated_at', 'pattern'] + ) + + self.assertIsNone(marker) + self.assertEqual(100, limit) + self.assertIsNone(sort_key) + self.assertIsNone(sort_dir) + + def test_get_paging_params_without_sort_keys(self): + CONF.set_override('default_limit_v2', 0, 'service:api') + + context = mock.Mock() + params = { + 'updated_at': None, + 'created_at': '2019-06-28T04:17:34.000000', + 'pattern': 'blacklisted.com.', + 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea', + } + + marker, limit, sort_key, sort_dir = utils.get_paging_params( + context, params, sort_keys=None + ) + + self.assertIsNone(marker) + self.assertEqual(0, limit) + self.assertIsNone(sort_key) + self.assertIsNone(sort_dir) + + def test_get_paging_params_sort_by_tenant_id(self): + CONF.set_override('default_limit_v2', 100, 'service:api') + + context = mock.Mock() + context.all_tenants = True + params = { + 'updated_at': None, + 'created_at': '2019-06-28T04:17:34.000000', + 'pattern': 'blacklisted.com.', + 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea', + 'sort_key': 'tenant_id', + } + + marker, limit, sort_key, sort_dir = utils.get_paging_params( + context, params, + ['created_at', 'id', 'updated_at', 'pattern', 'tenant_id'] + ) + + self.assertIsNone(marker) + self.assertEqual(100, limit) + self.assertEqual('tenant_id', sort_key) + self.assertIsNone(sort_dir) + + def test_get_paging_params_sort_tenant_without_all_tenants(self): + CONF.set_override('default_limit_v2', 100, 'service:api') + + context = mock.Mock() + context.all_tenants = False + params = { + 'updated_at': None, + 'created_at': '2019-06-28T04:17:34.000000', + 'pattern': 'blacklisted.com.', + 'id': 'f6663a98-281e-4cea-b0c3-3bc425e086ea', + 'sort_key': 'tenant_id', + } + + marker, limit, sort_key, sort_dir = utils.get_paging_params( + context, params, + ['created_at', 'id', 'updated_at', 'pattern', 'tenant_id'] + ) + + self.assertIsNone(marker) + self.assertEqual(100, limit) + self.assertIsNone(sort_key) + self.assertIsNone(sort_dir) + + def test_get_paging_params_invalid_limit(self): + context = mock.Mock() + + self.assertRaises( + exceptions.InvalidLimit, + utils.get_paging_params, + context, {'limit': 9223372036854775809}, [] + ) + + self.assertRaises( + exceptions.InvalidLimit, + utils.get_paging_params, + context, {'limit': -1}, [] + ) + + def test_get_paging_params_max_limit(self): + CONF.set_override('max_limit_v2', 1000, 'service:api') + + context = mock.Mock() + + result = utils.get_paging_params(context, {'limit': 'max'}, []) + + self.assertEqual(result[1], 1000) + + def test_get_paging_params_invalid_sort_dir(self): + context = mock.Mock() + + self.assertRaisesRegex( + exceptions.InvalidSortDir, + 'Unknown sort direction, must be', + utils.get_paging_params, context, {'sort_dir': 'dsc'}, [] + ) + + def test_get_paging_params_invalid_sort_key(self): + context = mock.Mock() + + self.assertRaisesRegex( + exceptions.InvalidSortKey, + 'sort key must be one of', + utils.get_paging_params, context, {'sort_key': 'dsc'}, + ['asc', 'desc'] + ) + @mock.patch('socket.socket') def test_bind_tcp(self, mock_sock_impl): mock_sock = mock.MagicMock() @@ -57,6 +445,19 @@ class TestSocket(oslotest.base.BaseTestCase): self.stdlog.logger.output ) + @mock.patch('socket.socket') + def test_bind_tcp_without_reuse_port(self, mock_sock_impl): + mock_sock = mock.MagicMock() + mock_sock_impl.return_value = mock_sock + mock_sock.setsockopt.side_effect = [None, None, AttributeError, None] + + utils.bind_tcp('127.0.0.1', 53, 100, 1) + + self.assertIn( + 'SO_REUSEPORT not available, ignoring.', + self.stdlog.logger.output + ) + @mock.patch('socket.socket') def test_bind_udp(self, mock_sock_impl): mock_sock = mock.MagicMock() @@ -87,3 +488,16 @@ class TestSocket(oslotest.base.BaseTestCase): 'Listening on UDP port %(port)d' % {'port': random_port}, self.stdlog.logger.output ) + + @mock.patch('socket.socket') + def test_bind_udp_without_reuse_port(self, mock_sock_impl): + mock_sock = mock.MagicMock() + mock_sock_impl.return_value = mock_sock + mock_sock.setsockopt.side_effect = [None, AttributeError] + + utils.bind_udp('127.0.0.1', 53) + + self.assertIn( + 'SO_REUSEPORT not available, ignoring.', + self.stdlog.logger.output + ) diff --git a/designate/tests/unit/utils.py b/designate/tests/unit/utils.py new file mode 100644 index 000000000..8226f62cf --- /dev/null +++ b/designate/tests/unit/utils.py @@ -0,0 +1,78 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import functools + +import six + + +def def_method(f, *args, **kwargs): + @functools.wraps(f) + def new_method(self): + return f(self, *args, **kwargs) + + return new_method + + +def parameterized_class(cls): + """A class decorator for running parameterized test cases. + Mark your class with @parameterized_class. + Mark your test cases with @parameterized. + """ + test_functions = { + k: v for k, v in vars(cls).items() if k.startswith('test') + } + for name, f in test_functions.items(): + if not hasattr(f, '_test_data'): + continue + + # remove the original test function from the class + delattr(cls, name) + + # add a new test function to the class for each entry in f._test_data + for tag, args in f._test_data.items(): + new_name = "{0}_{1}".format(f.__name__, tag) + if hasattr(cls, new_name): + raise Exception( + "Parameterized test case '{0}.{1}' created from '{0}.{2}' " + "already exists".format(cls.__name__, new_name, name)) + + # Using `def new_method(self): f(self, **args)` is not sufficient + # (all new_methods use the same args value due to late binding). + # Instead, use this factory function. + new_method = def_method(f, **args) + + # To add a method to a class, available for all instances: + # MyClass.method = types.MethodType(f, None, MyClass) + setattr(cls, new_name, six.create_unbound_method(new_method, cls)) + return cls + + +def parameterized(data): + """A function decorator for parameterized test cases. + Example: + @parameterized({ + 'zero': dict(val=0), + 'one': dict(val=1), + }) + def test_val(self, val): + self.assertEqual(self.get_val(), val) + The above will generate two test cases: + `test_val_zero` which runs with val=0 + `test_val_one` which runs with val=1 + :param data: A dictionary that looks like {tag: {arg1: val1, ...}} + """ + + def wrapped(f): + f._test_data = data + return f + + return wrapped diff --git a/designate/tests/unit/workers/test_zone_tasks.py b/designate/tests/unit/workers/test_zone_tasks.py index d1b4df5df..020792ea8 100644 --- a/designate/tests/unit/workers/test_zone_tasks.py +++ b/designate/tests/unit/workers/test_zone_tasks.py @@ -19,9 +19,9 @@ import oslotest.base from oslo_config import cfg from oslo_config import fixture as cfg_fixture -import designate.tests.test_utils as utils from designate import exceptions from designate import objects +from designate.tests.unit import utils from designate.worker import processing from designate.worker import utils as wutils from designate.worker.tasks import zone diff --git a/designate/utils.py b/designate/utils.py index fe32fb8f8..84e4f2ea3 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -140,55 +140,6 @@ def execute(*cmd, **kw): root_helper=root_helper, **kw) -def get_item_properties(item, fields, mixed_case_fields=None, formatters=None): - """Return a tuple containing the item properties. - - :param item: a single item resource (e.g. Server, Tenant, etc) - :param fields: tuple of strings with the desired field names - :param mixed_case_fields: tuple of field names to preserve case - :param formatters: dictionary mapping field names to callables - to format the values - """ - row = [] - mixed_case_fields = mixed_case_fields or [] - formatters = formatters or {} - - for field in fields: - if field in formatters: - row.append(formatters[field](item)) - else: - if field in mixed_case_fields: - field_name = field.replace(' ', '_') - else: - field_name = field.lower().replace(' ', '_') - if not hasattr(item, field_name) and \ - (isinstance(item, dict) and field_name in item): - data = item[field_name] - else: - data = getattr(item, field_name, '') - if data is None: - data = '' - row.append(data) - return tuple(row) - - -def get_columns(data): - """ - Some row's might have variable count of columns, ensure that we have the - same. - - :param data: Results in [{}, {]}] - """ - columns = set() - - def _seen(col): - columns.add(str(col)) - - six.moves.map(lambda item: six.moves.map(_seen, - list(six.iterkeys(item))), data) - return list(columns) - - def increment_serial(serial=0): # This provides for *roughly* unix timestamp based serial numbers new_serial = timeutils.utcnow_ts() @@ -199,44 +150,6 @@ def increment_serial(serial=0): return new_serial -def quote_string(string): - inparts = string.split(' ') - outparts = [] - tmp = None - - for part in inparts: - if part == '': - continue - elif part[0] == '"' and part[-1:] == '"' and part[-2:] != '\\"': - # Handle Quoted Words - outparts.append(part.strip('"')) - elif part[0] == '"': - # Handle Start of Quoted Sentance - tmp = part[1:] - elif tmp is not None and part[-1:] == '"' and part[-2:] != '\\"': - # Handle End of Quoted Sentance - tmp += " " + part.strip('"') - outparts.append(tmp) - tmp = None - elif tmp is not None: - # Handle Middle of Quoted Sentance - tmp += " " + part - else: - # Handle Standalone words - outparts.append(part) - - if tmp is not None: - # Handle unclosed quoted strings - outparts.append(tmp) - - # This looks odd, but both calls are necessary to ensure the end results - # is always consistent. - outparts = [o.replace('\\"', '"') for o in outparts] - outparts = [o.replace('"', '\\"') for o in outparts] - - return '"' + '" "'.join(outparts) + '"' - - def deep_dict_merge(a, b): if not isinstance(b, dict): return b @@ -324,14 +237,6 @@ def get_proxies(): return proxies -def extract_priority_from_data(recordset_type, record): - priority, data = None, record['data'] - if recordset_type in ('MX', 'SRV'): - priority, _, data = record['data'].partition(" ") - priority = int(priority) - return priority, data - - def cache_result(function): """A function decorator to cache the result of the first call, every additional call will simply return the cached value.