Fix behaviour of enable/disable in OVN network log

Previously, only the first log object created that associated to a
certain ACL would be able to make changes to the True/False property of
that ACL. This patch makes the driver to take in consideration each log
object created to enable or disable an ACL logging status. A functional
test is added so as to ensure correct behaviour of this feature.

Closes-Bug: #1996780
Change-Id: Ib9663495f30562f79babef163729a0c43812089d
Signed-off-by: Elvira García <egarciar@redhat.com>
This commit is contained in:
Elvira García 2022-11-10 00:47:53 +01:00
parent 970f9fbafa
commit f629b77d3c
2 changed files with 102 additions and 4 deletions

View File

@ -302,6 +302,46 @@ class OVNDriver(base.DriverBase):
if not self.network_logging_supported(self.ovn_nb):
raise LoggingNotSupported()
def _unset_disabled_acls(self, context, log_obj, ovn_txn):
"""Check if we need to disable any ACLs after an update.
Will return True if there were more logs, and False if there was
nothing to check.
:param context: current running context information
:param log_obj: a log_object which was updated
:returns: True if there were other logs enabled, otherwise False.
"""
if log_obj.enabled:
return False
pgs = self._pgs_from_log_obj(context, log_obj)
other_logs = [log for log in self._get_logs(context)
if log.id != log_obj.id and log.enabled]
if not other_logs:
return False
if log_obj.event == log_const.ALL_EVENT:
acls_to_check = pgs[0]["acls"].copy()
if not acls_to_check:
return True
for log in other_logs:
for acl in self._pgs_from_log_obj(context, log)[0]["acls"]:
if acl in acls_to_check:
acls_to_check.remove(acl)
if not acls_to_check:
return True
acls_to_remove = [{"name": pgs[0]["name"], "acls": acls_to_check}]
self._remove_acls_log(acls_to_remove, ovn_txn)
else:
all_events = set([log.event for log in other_logs
if (not log.resource_id or
log.resource_id == log_obj.resource_id)])
if (log_const.ALL_EVENT not in all_events and
log_obj.event not in all_events):
self._remove_acls_log(pgs, ovn_txn)
return True
def update_log(self, context, log_obj):
"""Update a log_obj invocation.
@ -311,11 +351,13 @@ class OVNDriver(base.DriverBase):
"""
LOG.debug("Update_log %s", log_obj)
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._set_acls_log(pgs, ovn_txn, actions_enabled,
utils.ovn_name(log_obj.id))
if not self._unset_disabled_acls(context, log_obj, ovn_txn):
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
self._set_acls_log(pgs, ovn_txn, actions_enabled,
utils.ovn_name(log_obj.id))
def delete_log(self, context, log_obj):
"""Delete a log_obj invocation.

View File

@ -338,3 +338,59 @@ class LogApiTestCaseComplex(LogApiTestCaseBase):
self._add_logs_then_remove(
log_const.DROP_EVENT, log_const.ACCEPT_EVENT, sg=self.sg3,
sgrs=self.sg3rs)
def test_disable_logs(self):
# This test ensures that acls are correctly disabled when having
# multiple log objects.
# Check there are no acls with their logging active
sgrs = self.sg1rs
self._check_sgrs(sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=False)
# Add accept log object
log_data1 = self._log_data(sg_id=self.sg1)
event1 = log_const.ACCEPT_EVENT
log_data1['log']['event'] = event1
log_obj1 = self.log_plugin.create_log(self.ctxt, log_data1)
self._check_acl_log_drop(is_enabled=False)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
# Add drop log object
log_data2 = self._log_data(sg_id=self.sg1)
event2 = log_const.DROP_EVENT
log_data2['log']['event'] = event2
log_obj2 = self.log_plugin.create_log(self.ctxt, log_data2)
self._check_acl_log_drop(is_enabled=True)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
# Disable drop log object and check it worked correctly
log_data2['log']['enabled'] = False
self.log_plugin.update_log(self.ctxt, log_obj2['id'], log_data2)
self._check_acl_log_drop(is_enabled=False)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
# Enable drop log and create all log object
log_data2['log']['enabled'] = True
self.log_plugin.update_log(self.ctxt, log_obj2['id'], log_data2)
self._check_acl_log_drop(is_enabled=True)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
log_data3 = self._log_data(sg_id=self.sg1)
log_data3['log']['event'] = log_const.ALL_EVENT
log_obj3 = self.log_plugin.create_log(self.ctxt, log_data3)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
self._check_acl_log_drop(is_enabled=True)
# Disable all log object and check all acls are still enabled (because
# of the other objects)
log_data3['log']['enabled'] = False
self.log_plugin.update_log(self.ctxt, log_obj3['id'], log_data3)
self._check_sgrs(sgrs=sgrs, is_enabled=True)
self._check_acl_log_drop(is_enabled=True)
# Disable accept log object and only drop traffic gets logged
log_data1['log']['enabled'] = False
self.log_plugin.update_log(self.ctxt, log_obj1['id'], log_data1)
self._check_sgrs(sgrs=sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=True)