python3: fix netlink_lib delete_entries

libc and netfilter_conntrack calls, as C bindings, do not work with
python3 strings.
This fixes netlink_lib by using bytes type for addresses manipulation.
Update corresponding unit test by removing str() conversions.

Closes-bug: #1779170
Change-Id: Id1ddc6cdf6b6c094eaac8284193606d765de4295
This commit is contained in:
Van Hung Pham 2018-07-11 14:05:07 +07:00
parent ca1c3b76c1
commit a3be36cd6e
2 changed files with 32 additions and 31 deletions

View File

@ -110,8 +110,9 @@ class ConntrackManager(object):
6: nfct.nfct_set_attr_u16},
'dport': {4: nfct.nfct_set_attr_u16,
6: nfct.nfct_set_attr_u16}, }
self.converters = {'src': str,
'dst': str,
self.converters = {'src': bytes,
'dst': bytes,
'ipversion': nl_constants.IPVERSION_SOCKET.get,
'protocol': constants.IP_PROTOCOL_MAP.get,
'code': int,
@ -169,7 +170,7 @@ class ConntrackManager(object):
dest = ctypes.create_string_buffer(
nl_constants.IPVERSION_BUFFER[addr_family])
libc.inet_pton(nl_constants.IPVERSION_SOCKET[addr_family],
source, dest)
source.encode('utf-8'), dest)
return dest.raw
def _set_attributes(self, conntrack, entry):

View File

@ -110,12 +110,12 @@ class NetlinkLibTestCase(base.BaseTestCase):
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_ENTRY['dst'], 4)),
]
nl_lib.nfct.nfct_set_attr.assert_has_calls(calls, any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
@ -152,12 +152,12 @@ class NetlinkLibTestCase(base.BaseTestCase):
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV6_SRC,
str(conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 6))),
conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 6)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV6_DST,
str(conntrack._convert_text_to_binary(
FAKE_ENTRY['dst'], 6))),
conntrack._convert_text_to_binary(
FAKE_ENTRY['dst'], 6)),
]
nl_lib.nfct.nfct_set_attr.assert_has_calls(calls, any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
@ -192,12 +192,12 @@ class NetlinkLibTestCase(base.BaseTestCase):
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4)),
]
nl_lib.nfct.nfct_set_attr.assert_has_calls(calls, any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
@ -232,12 +232,12 @@ class NetlinkLibTestCase(base.BaseTestCase):
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['dst'], 4)),
]
nl_lib.nfct.nfct_set_attr.assert_has_calls(calls, any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
@ -301,28 +301,28 @@ class NetlinkLibTestCase(base.BaseTestCase):
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_TCP_ENTRY['dst'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
str(conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 4))),
conntrack._convert_text_to_binary(
FAKE_ENTRY['src'], 4)),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
str(conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4))),
conntrack._convert_text_to_binary(
FAKE_UDP_ENTRY['dst'], 4)),
]
nl_lib.nfct.nfct_set_attr.assert_has_calls(calls, any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()