Netlink library to delete conntrack entries

This patch proposes Netlink library to manage conntrack
entries. Netlink library will be used by ConntrackNetlink Driver.

Partial-Bug: #1664294
Co-Authored-By: Nguyen Phuong An <AnNP@vn.fujitsu.com>
Change-Id: Ied00e1badaac87c017a77f782bcd8f3bab072c46
This commit is contained in:
Cuong Nguyen 2017-02-23 18:06:29 +07:00
parent 462511d09a
commit b03caed15a
4 changed files with 703 additions and 0 deletions

View File

@ -0,0 +1,63 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Some parts are based on python-conntrack:
# Copyright (c) 2009-2011,2015 Andrew Grigorev <andrew@ei-grad.ru>
import socket
CONNTRACK = 0
NFCT_O_PLAIN = 0
NFCT_OF_TIME_BIT = 1
NFCT_OF_TIME = 1 << NFCT_OF_TIME_BIT
NFCT_Q_DESTROY = 2
NFCT_Q_FLUSH = 4
NFCT_Q_DUMP = 5
NFCT_T_DESTROY_BIT = 2
NFCT_T_DESTROY = 1 << NFCT_T_DESTROY_BIT
ATTR_IPV4_SRC = 0
ATTR_IPV4_DST = 1
ATTR_IPV6_SRC = 4
ATTR_IPV6_DST = 5
ATTR_PORT_SRC = 8
ATTR_PORT_DST = 9
ATTR_ICMP_TYPE = 12
ATTR_ICMP_CODE = 13
ATTR_ICMP_ID = 14
ATTR_L3PROTO = 15
ATTR_L4PROTO = 17
NFCT_T_NEW_BIT = 0
NFCT_T_NEW = 1 << NFCT_T_NEW_BIT
NFCT_T_UPDATE_BIT = 1
NFCT_T_UPDATE = 1 << NFCT_T_UPDATE_BIT
NFCT_T_DESTROY_BIT = 2
NFCT_T_DESTROY = 1 << NFCT_T_DESTROY_BIT
NFCT_T_ALL = NFCT_T_NEW | NFCT_T_UPDATE | NFCT_T_DESTROY
NFCT_CB_CONTINUE = 1
NFCT_CB_FAILURE = -1
NFNL_SUBSYS_CTNETLINK = 0
BUFFER = 1024
IPVERSION_SOCKET = {4: socket.AF_INET, 6: socket.AF_INET6}

View File

@ -0,0 +1,249 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Some parts are based on python-conntrack:
# Copyright (c) 2009-2011,2015 Andrew Grigorev <andrew@ei-grad.ru>
import ctypes
from ctypes import util
from oslo_log import log as logging
from neutron_lib import constants
from neutron_fwaas._i18n import _, _LW
from neutron_fwaas import privileged
from neutron_fwaas.privileged import netlink_constants as nl_constants
from neutron_fwaas.privileged import utils as fwaas_utils
LOG = logging.getLogger(__name__)
nfct = ctypes.CDLL(util.find_library('netfilter_conntrack'))
libc = ctypes.CDLL(util.find_library('libc.so.6'))
IP_VERSIONS = [constants.IP_VERSION_4, constants.IP_VERSION_6]
DATA_CALLBACK = None
ATTR_POSITIONS = {
'icmp': [('type', 6), ('code', 7), ('src', 4), ('dst', 5), ('id', 8)],
'tcp': [('sport', 7), ('dport', 8), ('src', 5), ('dst', 6)],
'udp': [('sport', 6), ('dport', 7), ('src', 4), ('dst', 5)]
}
TARGET = {'src': {4: nl_constants.ATTR_IPV4_SRC,
6: nl_constants.ATTR_IPV6_SRC},
'dst': {4: nl_constants.ATTR_IPV4_DST,
6: nl_constants.ATTR_IPV6_DST},
'ipversion': {4: nl_constants.ATTR_L3PROTO,
6: nl_constants.ATTR_L3PROTO},
'protocol': {4: nl_constants.ATTR_L4PROTO,
6: nl_constants.ATTR_L4PROTO},
'code': {4: nl_constants.ATTR_ICMP_CODE,
6: nl_constants.ATTR_ICMP_CODE},
'type': {4: nl_constants.ATTR_ICMP_TYPE,
6: nl_constants.ATTR_ICMP_TYPE},
'id': {4: nl_constants.ATTR_ICMP_ID,
6: nl_constants.ATTR_ICMP_ID},
'sport': {4: nl_constants.ATTR_PORT_SRC,
6: nl_constants.ATTR_PORT_SRC},
'dport': {4: nl_constants.ATTR_PORT_DST,
6: nl_constants.ATTR_PORT_DST}}
NFCT_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int,
ctypes.c_void_p, ctypes.c_void_p)
class ConntrackOpenFailedExit(SystemExit):
"""Raised if we fail to open a new conntrack or conntrack handler"""
class ConntrackManager(object):
def __init__(self, family_socket=None):
self.family_socket = family_socket
self.set_functions = {
'src': {4: nfct.nfct_set_attr_u32,
6: nfct.nfct_set_attr_u64},
'dst': {4: nfct.nfct_set_attr_u32,
6: nfct.nfct_set_attr_u64},
'ipversion': {4: nfct.nfct_set_attr_u8,
6: nfct.nfct_set_attr_u8},
'protocol': {4: nfct.nfct_set_attr_u8,
6: nfct.nfct_set_attr_u8},
'type': {4: nfct.nfct_set_attr_u8,
6: nfct.nfct_set_attr_u8},
'code': {4: nfct.nfct_set_attr_u8,
6: nfct.nfct_set_attr_u8},
'id': {4: nfct.nfct_set_attr_u16,
6: nfct.nfct_set_attr_u16},
'sport': {4: nfct.nfct_set_attr_u16,
6: nfct.nfct_set_attr_u16},
'dport': {4: nfct.nfct_set_attr_u16,
6: nfct.nfct_set_attr_u16}, }
self.converters = {'src': libc.inet_addr,
'dst': libc.inet_addr,
'ipversion': nl_constants.IPVERSION_SOCKET.get,
'protocol': constants.IP_PROTOCOL_MAP.get,
'code': int,
'type': int,
'id': libc.htons,
'sport': libc.htons,
'dport': libc.htons, }
def list_entries(self):
entries = []
raw_entry = ctypes.create_string_buffer(nl_constants.BUFFER)
@NFCT_CALLBACK
def callback(type_, conntrack, data):
nfct.nfct_snprintf(raw_entry, nl_constants.BUFFER,
conntrack, type_,
nl_constants.NFCT_O_PLAIN,
nl_constants.NFCT_OF_TIME)
entries.append(raw_entry.value)
return nl_constants.NFCT_CB_CONTINUE
self._callback_register(nl_constants.NFCT_T_ALL,
callback, DATA_CALLBACK)
data_ref = self._get_ref(self.family_socket or
nl_constants.IPVERSION_SOCKET[4])
self._query(nl_constants.NFCT_Q_DUMP, data_ref)
return entries
def delete_entries(self, entries):
conntrack = nfct.nfct_new()
try:
for entry in entries:
self._set_attributes(conntrack, entry)
self._query(nl_constants.NFCT_Q_DESTROY, conntrack)
except Exception as e:
msg = _("Failed to delete conntrack entries %s") % e
LOG.critical(msg)
raise ConntrackOpenFailedExit(msg)
finally:
nfct.nfct_destroy(conntrack)
def flush_entries(self):
data_ref = self._get_ref(self.family_socket or
nl_constants.IPVERSION_SOCKET[4])
self._query(nl_constants.NFCT_Q_FLUSH, data_ref)
def _query(self, query_type, query_data):
result = nfct.nfct_query(self.conntrack_handler, query_type,
query_data)
if result == nl_constants.NFCT_CB_FAILURE:
LOG.warning(_LW("Netlink query failed"))
def _set_attributes(self, conntrack, entry):
ipversion = entry.get('ipversion', 4)
for attr, value in entry.items():
set_function = self.set_functions[attr][ipversion]
target = TARGET[attr][ipversion]
converter = self.converters[attr]
set_function(conntrack, target, converter(value))
def _callback_register(self, message_type, callback_func, data):
nfct.nfct_callback_register(self.conntrack_handler,
message_type, callback_func, data)
def _get_ref(self, data):
return ctypes.byref(ctypes.c_int(data))
def __enter__(self):
self.conntrack_handler = nfct.nfct_open(
nl_constants.CONNTRACK,
nl_constants.NFNL_SUBSYS_CTNETLINK)
if not self.conntrack_handler:
msg = _("Failed to open new conntrack handler")
LOG.critical(msg)
raise ConntrackOpenFailedExit(msg)
return self
def __exit__(self, *args):
nfct.nfct_close(self.conntrack_handler)
def _parse_entry(entry, ipversion):
"""Parse entry from text to Python tuple
:param entry: conntrack entry in text
:param ipversion: ipversion 4 or 6
:return: conntrack entry in Python tuple
example: (4, 'tcp', '1', '2', '1.1.1.1', '2.2.2.2')
The attributes are ordered to be easy to compare with other entries
and compare with firewall rule
"""
protocol = entry[1]
parsed_entry = [ipversion, protocol]
for attr, position in ATTR_POSITIONS[protocol]:
val = entry[position].partition('=')[2]
parsed_entry.append(int(val) if attr in ['sport', 'dport', 'type',
'code', 'id'] else val)
return tuple(parsed_entry)
@privileged.default.entrypoint
def flush_entries(namespace=None):
"""Delete all conntrack entries
:param namespace: namespace to delete conntrack entries
:return: None
"""
with fwaas_utils.in_namespace(namespace):
for ipversion in IP_VERSIONS:
with ConntrackManager(nl_constants.IPVERSION_SOCKET[ipversion]) \
as conntrack:
conntrack.flush_entries()
@privileged.default.entrypoint
def list_entries(namespace=None):
"""List and parse all conntrack entries
:param namespace: namespace to get conntrack entries
:return: sorted list of conntrack entries in Python tuple
example: [(4, 'icmp', '8', '0', '1.1.1.1', '2.2.2.2', '1234'),
(4, 'tcp', '1', '2', '1.1.1.1', '2.2.2.2')]
"""
parsed_entries = []
with fwaas_utils.in_namespace(namespace):
for ipversion in IP_VERSIONS:
with ConntrackManager(nl_constants.IPVERSION_SOCKET[ipversion]) \
as conntrack:
raw_entries = conntrack.list_entries()
for raw_entry in raw_entries:
parsed_entry = _parse_entry(raw_entry.split(), ipversion)
parsed_entries.append(parsed_entry)
return sorted(parsed_entries)
@privileged.default.entrypoint
def delete_entries(entries, namespace=None):
"""Delete selected entries
:param entries: list of parsed (as tuple) entries to delete
:param namespace: namespace to delete conntrack entries
:return: None
"""
entry_args = []
for entry in entries:
entry_arg = {'ipversion': entry[0], 'protocol': entry[1]}
for idx, attr in enumerate(ATTR_POSITIONS[entry_arg['protocol']]):
entry_arg[attr[0]] = entry[idx + 2]
entry_args.append(entry_arg)
with fwaas_utils.in_namespace(namespace):
with ConntrackManager() as conntrack:
conntrack.delete_entries(entry_args)

View File

@ -0,0 +1,115 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import neutron_fwaas.privileged.netlink_lib as nl_lib
from neutron.agent.linux import utils as linux_utils
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
CONNTRACK_CMDS = (
['conntrack', '-I', '-p', 'tcp',
'-s', '1.1.1.1', '-d', '2.2.2.2',
'--sport', '1', '--dport', '2',
'--state', 'ESTABLISHED', '--timeout', '1234'],
['conntrack', '-I', '-p', 'udp',
'-s', '1.1.1.1', '-d', '2.2.2.2',
'--sport', '1', '--dport', '2',
'--timeout', '1234'],
['conntrack', '-I', '-p', 'icmp',
'-s', '1.1.1.1', '-d', '2.2.2.2',
'--icmp-type', '8', '--icmp-code', '0', '--icmp-id', '3333',
'--timeout', '1234'],
)
class NetlinkLibTestCase(functional_base.BaseSudoTestCase):
"""Functional test for netlink_lib: List, delete, flush conntrack entries.
For each function, first we add a specific namespace, then create real
conntrack entries. netlink_lib function will do list, delete and flush
these entries. This class will test this netlink_lib function work
as expected.
"""
def _create_entries(self, namespace, conntrack_cmds):
for cmd in conntrack_cmds:
exec_cmd = ['ip', 'netns', 'exec', namespace] + cmd
try:
linux_utils.execute(exec_cmd,
run_as_root=True,
check_exit_code=True,
extra_ok_codes=[1])
except RuntimeError:
raise Exception('Error while creating entry')
def test_list_entries(self):
namespace = self.useFixture(net_helpers.NamespaceFixture()).name
self._create_entries(namespace, CONNTRACK_CMDS)
expected = (
(4, 'icmp', 8, 0, '1.1.1.1', '2.2.2.2', 3333),
(4, 'tcp', 1, 2, '1.1.1.1', '2.2.2.2'),
(4, 'udp', 1, 2, '1.1.1.1', '2.2.2.2')
)
entries_list = nl_lib.list_entries(namespace=namespace)
self.assertEqual(expected, entries_list)
def _delete_entry(self, delete_entries, remain_entries):
namespace = self.useFixture(net_helpers.NamespaceFixture()).name
self._create_entries(namespace, CONNTRACK_CMDS)
nl_lib.delete_entries(namespace=namespace, entries=delete_entries)
entries_list = nl_lib.list_entries(namespace)
self.assertEqual(remain_entries, entries_list)
def test_delete_icmp_entry(self):
icmp_entry = [(4, 'icmp', 8, 0, '1.1.1.1', '2.2.2.2', 3333)]
remain_entries = (
(4, 'tcp', 1, 2, '1.1.1.1', '2.2.2.2'),
(4, 'udp', 1, 2, '1.1.1.1', '2.2.2.2'),
)
self._delete_entry(icmp_entry, remain_entries)
def test_delete_tcp_entry(self):
tcp_entry = [(4, 'tcp', 1, 2, '1.1.1.1', '2.2.2.2')]
remain_entries = (
(4, 'icmp', 8, 0, '1.1.1.1', '2.2.2.2', 3333),
(4, 'udp', 1, 2, '1.1.1.1', '2.2.2.2')
)
self._delete_entry(tcp_entry, remain_entries)
def test_delete_udp_entry(self):
udp_entry = [(4, 'udp', 1, 2, '1.1.1.1', '2.2.2.2')]
remain_entries = (
(4, 'icmp', 8, 0, '1.1.1.1', '2.2.2.2', 3333),
(4, 'tcp', 1, 2, '1.1.1.1', '2.2.2.2')
)
self._delete_entry(udp_entry, remain_entries)
def test_delete_multiple_entries(self):
delete_entries = (
(4, 'icmp', 8, 0, '1.1.1.1', '2.2.2.2', 3333),
(4, 'tcp', 1, 2, '1.1.1.1', '2.2.2.2'),
(4, 'udp', 1, 2, '1.1.1.1', '2.2.2.2')
)
remain_entries = ()
self._delete_entry(delete_entries, remain_entries)
def test_flush_entries(self):
namespace = self.useFixture(net_helpers.NamespaceFixture()).name
self._create_entries(namespace, CONNTRACK_CMDS)
nl_lib.flush_entries(namespace)
entries_list = nl_lib.list_entries(namespace)
self.assertEqual((), entries_list)

View File

@ -0,0 +1,276 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from neutron_lib import constants
from neutron_fwaas.privileged import netlink_constants as nl_constants
from neutron_fwaas.privileged import netlink_lib as nl_lib
from neutron_fwaas.tests import base
FAKE_ENTRY = {'ipversion': 4, 'protocol': 'icmp',
'type': '8', 'code': '0', 'id': 1234,
'src': '1.1.1.1', 'dst': '2.2.2.2'}
FAKE_TCP_ENTRY = {'ipversion': 4, 'protocol': 'tcp',
'sport': 1, 'dport': 2,
'src': '1.1.1.1', 'dst': '2.2.2.2'}
FAKE_UDP_ENTRY = {'ipversion': 4, 'protocol': 'udp',
'sport': 1, 'dport': 2,
'src': '1.1.1.1', 'dst': '2.2.2.2'}
class NetlinkLibTestCase(base.BaseTestCase):
def setUp(self):
super(NetlinkLibTestCase, self).setUp()
nl_lib.nfct = mock.Mock()
nl_lib.libc = mock.Mock()
def test_open_new_conntrack_handler_failed(self):
nl_lib.nfct.nfct_open.return_value = None
with testtools.ExpectedException(nl_lib.ConntrackOpenFailedExit):
with nl_lib.ConntrackManager():
nl_lib.nfct.nfct_open.assert_called_once()
nl_lib.nfct.nfct_close.assert_not_called()
def test_open_new_conntrack_handler_pass(self):
with nl_lib.ConntrackManager():
nl_lib.nfct.nfct_open.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_list_entries(self):
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.list_entries()
nl_lib.nfct.nfct_callback_register.assert_called_once()
nl_lib.nfct.nfct_query.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_flush_entries(self):
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.flush_entries()
nl_lib.nfct.nfct_query.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_new_failed(self):
nl_lib.nfct.nfct_new.return_value = None
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_ENTRY])
nl_lib.nfct.nfct_new.assert_called_once()
nl_lib.nfct.nfct_destroy.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_delete_icmp_entry(self):
conntrack_filter = mock.Mock()
nl_lib.nfct.nfct_new.return_value = conntrack_filter
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_ENTRY])
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['icmp']),
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_CODE,
int(FAKE_ENTRY['code'])),
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_TYPE,
int(FAKE_ENTRY['type']))
]
nl_lib.nfct.nfct_set_attr_u8.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_ID,
nl_lib.libc.htons(FAKE_ENTRY['id'])),
]
nl_lib.nfct.nfct_set_attr_u16.assert_has_calls(calls)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_ENTRY['dst'])),
]
nl_lib.nfct.nfct_set_attr_u32.assert_has_calls(calls,
any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_delete_udp_entry(self):
conntrack_filter = mock.Mock()
nl_lib.nfct.nfct_new.return_value = conntrack_filter
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_UDP_ENTRY])
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['udp'])
]
nl_lib.nfct.nfct_set_attr_u8.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_SRC,
nl_lib.libc.htons(FAKE_UDP_ENTRY['sport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_DST,
nl_lib.libc.htons(FAKE_UDP_ENTRY['dport']))
]
nl_lib.nfct.nfct_set_attr_u16.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_UDP_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_UDP_ENTRY['dst'])),
]
nl_lib.nfct.nfct_set_attr_u32.assert_has_calls(calls,
any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_delete_tcp_entry(self):
conntrack_filter = mock.Mock()
nl_lib.nfct.nfct_new.return_value = conntrack_filter
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_TCP_ENTRY])
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['tcp'])
]
nl_lib.nfct.nfct_set_attr_u8.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_SRC,
nl_lib.libc.htons(FAKE_TCP_ENTRY['sport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_DST,
nl_lib.libc.htons(FAKE_TCP_ENTRY['dport']))
]
nl_lib.nfct.nfct_set_attr_u16.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_TCP_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_TCP_ENTRY['dst'])),
]
nl_lib.nfct.nfct_set_attr_u32.assert_has_calls(calls,
any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()
def test_conntrack_delete_entries(self):
conntrack_filter = mock.Mock()
nl_lib.nfct.nfct_new.return_value = conntrack_filter
with nl_lib.ConntrackManager() as conntrack:
nl_lib.nfct.nfct_open.assert_called_once()
conntrack.delete_entries([FAKE_ENTRY,
FAKE_TCP_ENTRY,
FAKE_UDP_ENTRY])
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['tcp']),
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['udp']),
mock.call(conntrack_filter,
nl_constants.ATTR_L3PROTO,
nl_constants.IPVERSION_SOCKET[4]),
mock.call(conntrack_filter,
nl_constants.ATTR_L4PROTO,
constants.IP_PROTOCOL_MAP['icmp']),
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_CODE,
int(FAKE_ENTRY['code'])),
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_TYPE,
int(FAKE_ENTRY['type']))
]
nl_lib.nfct.nfct_set_attr_u8.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_SRC,
nl_lib.libc.htons(FAKE_TCP_ENTRY['sport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_DST,
nl_lib.libc.htons(FAKE_TCP_ENTRY['dport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_SRC,
nl_lib.libc.htons(FAKE_UDP_ENTRY['sport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_PORT_DST,
nl_lib.libc.htons(FAKE_UDP_ENTRY['dport'])),
mock.call(conntrack_filter,
nl_constants.ATTR_ICMP_ID,
nl_lib.libc.htons(FAKE_ENTRY['id'])),
]
nl_lib.nfct.nfct_set_attr_u16.assert_has_calls(calls,
any_order=True)
calls = [
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_TCP_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_TCP_ENTRY['dst'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_UDP_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_UDP_ENTRY['dst'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_SRC,
nl_lib.libc.inet_addr(FAKE_ENTRY['src'])),
mock.call(conntrack_filter,
nl_constants.ATTR_IPV4_DST,
nl_lib.libc.inet_addr(FAKE_ENTRY['dst'])),
]
nl_lib.nfct.nfct_set_attr_u32.assert_has_calls(calls,
any_order=True)
nl_lib.nfct.nfct_destroy.assert_called_once()
nl_lib.nfct.nfct_close.assert_called_once()