[OVN] Fix overlapping security group objects not correctly applied

If a security group object modified an Access Control List in OVN, it
could not be modified by another object. This change prevents the driver
from associating more ACLs than needed if the network log object created
is to log the dropped traffic. Functional tests consider the existence
of events now.

Closes-Bug: #1956763
Change-Id: Ib392b46308aaf76386a747870c5600790a268d51
This commit is contained in:
Elvira García 2021-12-17 12:29:44 +01:00
parent 69ccb24be4
commit 522837a97f
2 changed files with 81 additions and 4 deletions

View File

@ -215,9 +215,23 @@ class OVNDriver(base.DriverBase):
"""
if not log_obj.resource_id and not log_obj.target_id:
# No sg, no port: return all pgs
return self._pgs_all()
# No sg, no port, ALL: return all pgs
if log_obj.event == log_const.ALL_EVENT:
return self._pgs_all()
try:
pg_drop = self.ovn_nb.lookup("Port_Group",
ovn_const.OVN_DROP_PORT_GROUP_NAME)
# No sg, no port, DROP: return DROP pg
if log_obj.event == log_const.DROP_EVENT:
return [{"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]}]
# No sg, no port, ACCEPT: return all except DROP pg
pgs = self._pgs_all()
pgs.remove({"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]})
return pgs
except idlutils.RowNotFound:
pass
pgs = []
# include special pg_drop to log DROP and ALL actions
if not log_obj.event or log_obj.event in (log_const.DROP_EVENT,
@ -229,6 +243,8 @@ class OVNDriver(base.DriverBase):
"acls": [r.uuid for r in pg.acls]})
except idlutils.RowNotFound:
pass
if log_obj.event == log_const.DROP_EVENT:
return pgs
if log_obj.resource_id:
try:

View File

@ -41,7 +41,8 @@ class LogApiTestCaseBase(functional_base.TestOVNFunctionalBase):
'resource_type': 'security_group',
'description': 'test net log',
'name': 'logme',
'enabled': enabled}
'enabled': enabled,
'event': log_const.ALL_EVENT}
if sg_id:
log_data['resource_id'] = sg_id
if port_id:
@ -152,6 +153,14 @@ class LogApiTestCaseComplex(LogApiTestCaseBase):
self.assertEqual(is_enabled, acl.log)
return acl
def _check_acl_log_drop(self, is_enabled=True):
acls = self.nb_api.get_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME).acls
self.assertTrue(acls)
for acl in acls:
self.assertEqual(is_enabled, acl.log)
return acls
def _check_sgrs(self, sgrs=None, is_enabled=True):
if not sgrs:
sgrs = self.sgrs
@ -277,3 +286,55 @@ class LogApiTestCaseComplex(LogApiTestCaseBase):
self._check_sgrs(is_enabled=False)
self.assertEqual([],
self.nb_api.meter_list().execute(check_error=True))
def _add_logs_then_remove(self, event1, event2, sg=None, sgrs=None):
# Events were previously not correctly applied on ACLs. This test
# ensures that each event log only the necessary acls
drop_true_events = (log_const.DROP_EVENT, log_const.ALL_EVENT)
accept_true_events = (log_const.ALL_EVENT, log_const.ACCEPT_EVENT)
# Check there are no acls with their logging active
self._check_sgrs(sgrs=sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=False)
# Add first log object
log_data1 = self._log_data(sg_id=sg)
log_data1['log']['event'] = event1
log_obj1 = self.log_plugin.create_log(self.ctxt, log_data1)
self._check_acl_log_drop(is_enabled=event1 in drop_true_events)
self._check_sgrs(sgrs=sgrs, is_enabled=event1 in accept_true_events)
# Add second log object
log_data2 = self._log_data(sg_id=sg)
log_data2['log']['event'] = event2
log_obj2 = self.log_plugin.create_log(self.ctxt, log_data2)
self._check_acl_log_drop(is_enabled=(event1 in drop_true_events or
event2 in drop_true_events))
self._check_sgrs(sgrs=sgrs, is_enabled=(event1 in accept_true_events or
event2 in accept_true_events))
# Delete second log object
self.log_plugin.delete_log(self.ctxt, log_obj2['id'])
self._check_acl_log_drop(is_enabled=event1 in drop_true_events)
self._check_sgrs(sgrs=sgrs, is_enabled=event1 in accept_true_events)
# Delete first log object
self.log_plugin.delete_log(self.ctxt, log_obj1['id'])
self._check_sgrs(sgrs=sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=False)
def test_events_all_sg(self):
self._add_logs_then_remove(log_const.DROP_EVENT, log_const.ALL_EVENT)
self._add_logs_then_remove(
log_const.ACCEPT_EVENT, log_const.DROP_EVENT)
self._add_logs_then_remove(
log_const.DROP_EVENT, log_const.ACCEPT_EVENT)
def test_events_one_sg(self):
self._add_logs_then_remove(log_const.DROP_EVENT, log_const.ALL_EVENT,
sg=self.sg1, sgrs=self.sg1rs)
self._add_logs_then_remove(
log_const.ACCEPT_EVENT, log_const.DROP_EVENT, sg=self.sg2,
sgrs=self.sg2rs)
self._add_logs_then_remove(
log_const.DROP_EVENT, log_const.ACCEPT_EVENT, sg=self.sg3,
sgrs=self.sg3rs)