Adding updates to quark-agent to fix race condition

These updates to redis_base allow quark-agent to check for a potential
race condition that can occur from the time quark-agent acknowledges a
security group rule set needs to be updated and when it acknowledges
that rule set has actually been applied. quark-agent will now store
security group rules before and after it has executed scripts on the
hypervisor. If the rule sets do not match, that means an update has
occurred while the hypervisor scripts were executing. quark-agent will
not ack those changes and allows the next cycle to pick them up.

Change-Id: Ieae13f7b22b8e463cba2ccce82fb94699838926a
Closes-Bug: 1713860
This commit is contained in:
Kyle Haley 2017-08-03 11:30:32 -07:00
parent 5546679525
commit 500817ab95
5 changed files with 261 additions and 55 deletions

View File

@ -88,7 +88,8 @@ def partition_vifs(xapi_client, interfaces, security_group_states):
continue
vif_has_groups = vif in security_group_states
if vif.tagged and vif_has_groups and security_group_states[vif]:
if vif.tagged and vif_has_groups and\
security_group_states[vif][sg_cli.SECURITY_GROUP_ACK]:
# Already ack'd these groups and VIF is tagged, reapply.
# If it's not tagged, fall through and have it self-heal
continue
@ -106,9 +107,46 @@ def partition_vifs(xapi_client, interfaces, security_group_states):
return added, updated, removed
def ack_groups(client, groups):
def ack_groups(client, groups, ack=True):
if len(groups) > 0:
client.update_group_states_for_vifs(groups, True)
client.update_group_states_for_vifs(groups, ack)
def get_groups_to_ack(groups_to_ack, init_sg_states, curr_sg_states):
"""Compares initial security group rules with current sg rules.
Given the groups that were successfully returned from
xapi_client.update_interfaces call, compare initial and current
security group rules to determine if an update occurred during
the window that the xapi_client.update_interfaces was executing.
Return a list of vifs whose security group rules have not changed.
"""
security_groups_changed = []
# Compare current security group rules with initial rules.
for vif in groups_to_ack:
initial_state = init_sg_states[vif][sg_cli.SECURITY_GROUP_HASH_ATTR]
current_state = curr_sg_states[vif][sg_cli.SECURITY_GROUP_HASH_ATTR]
bad_match_msg = ('security group rules were changed for vif "%s" while'
' executing xapi_client.update_interfaces.'
' Will not ack rule.' % vif)
# If lists are different lengths, they're automatically different.
if len(initial_state) != len(current_state):
security_groups_changed.append(vif)
LOG.info(bad_match_msg)
elif len(initial_state) > 0:
# Compare rules in equal length lists.
for rule in current_state:
if rule not in initial_state:
security_groups_changed.append(vif)
LOG.info(bad_match_msg)
break
# Only ack groups whose rules have not changed since update. If
# rules do not match, do not add them to ret so the change
# can be picked up on the next cycle.
ret = [group for group in groups_to_ack
if group not in security_groups_changed]
return ret
def run():
@ -139,6 +177,16 @@ def run():
sg_states)
xapi_client.update_interfaces(new_sg, updated_sg, removed_sg)
groups_to_ack = [v for v in new_sg + updated_sg if v.success]
# NOTE(quade): This solves a race condition where a security group
# rule may have changed between the time the sg_states were called
# and when they were officially ack'd. It functions as a compare
# and set. This is a fix until we get onto a proper messaging
# queue. NCP-2287
sg_sts_curr = groups_client.get_security_group_states(interfaces)
groups_to_ack = get_groups_to_ack(groups_to_ack, sg_states,
sg_sts_curr)
# This list will contain all the security group rules that do not
# match
ack_groups(groups_client, groups_to_ack)
except Exception:

View File

@ -160,6 +160,14 @@ class ClientBase(object):
values = pipe.execute()
return values
@handle_connection_error
def get_fields_all(self, keys):
with self._client.slave.pipeline() as pipe:
for key in keys:
pipe.hgetall(key)
values = pipe.execute()
return values
@handle_connection_error
def set_fields(self, keys, field, value):
with self._client.master.pipeline() as pipe:

View File

@ -188,18 +188,29 @@ class SecurityGroupsClient(redis_base.ClientBase):
vif_keys = [self.vif_key(vif.device_id, vif.mac_address)
for vif in interfaces]
security_groups = self.get_fields(vif_keys, SECURITY_GROUP_ACK)
# Retrieve all fields associated with this key, which should be
# 'security groups ack' and 'security group rules'.
sec_grp_all = self.get_fields_all(vif_keys)
ret = {}
for vif, security_group_ack in zip(interfaces, security_groups):
if security_group_ack:
security_group_ack = security_group_ack.lower()
if "true" in security_group_ack:
ret[vif] = True
elif "false" in security_group_ack:
ret[vif] = False
# Associate the vif with the fields in a dictionary
for vif, group in zip(interfaces, sec_grp_all):
if group:
ret[vif] = {SECURITY_GROUP_ACK: None,
SECURITY_GROUP_HASH_ATTR: []}
temp_ack = group[SECURITY_GROUP_ACK].lower()
temp_rules = group[SECURITY_GROUP_HASH_ATTR]
if temp_rules:
temp_rules = json.loads(temp_rules)
ret[vif][SECURITY_GROUP_HASH_ATTR] = temp_rules["rules"]
if "true" in temp_ack:
ret[vif][SECURITY_GROUP_ACK] = True
elif "false" in temp_ack:
ret[vif][SECURITY_GROUP_ACK] = False
else:
LOG.debug("Skipping bad ack value %s" % security_group_ack)
ret.pop(vif, None)
LOG.debug("Skipping bad ack value %s" % temp_ack)
return ret
@utils.retry_loop(3)

View File

@ -13,40 +13,173 @@
# License for the specific language governing permissions and limitations
#
import contextlib
import mock
from quark.agent import agent
from quark.agent import xapi
from quark.cache import security_groups_client as sg
from quark.tests import test_base
class TestAgentPartitionVifs(test_base.TestBase):
@mock.patch("quark.agent.xapi.XapiClient._session")
def test_partition_vifs(self, sess):
def _vif_rec(mac, tagged):
rec = {"MAC": mac, "other_config": {}}
if tagged:
rec["other_config"] = {"security_groups": "enabled"}
return rec
def setUp(self):
self.vif_recs = [self._vif_rec(1, False), self._vif_rec(2, True),
self._vif_rec(3, True), self._vif_rec(4, False),
self._vif_rec(5, False)]
self.interfaces = [xapi.VIF("added", self.vif_recs[0], "added_ref"),
xapi.VIF("updated", self.vif_recs[1],
"updated_ref"),
xapi.VIF("removed", self.vif_recs[2],
"removed_ref"),
xapi.VIF("no groups", self.vif_recs[3],
"no groups ref"),
xapi.VIF("self heal", self.vif_recs[4],
"self heal ref")]
self.ack = sg.SECURITY_GROUP_ACK
self.rules = sg.SECURITY_GROUP_HASH_ATTR
self.sg_states = {self.interfaces[0]: {self.ack: False,
self.rules: []},
self.interfaces[1]: {self.ack: False,
self.rules: []},
self.interfaces[4]: {self.ack: True,
self.rules: []}}
self.client = sg.SecurityGroupsClient()
vif_recs = [_vif_rec(1, False), _vif_rec(2, True), _vif_rec(3, True),
_vif_rec(4, False), _vif_rec(5, False)]
@contextlib.contextmanager
def _stubs(self, interfaces, sg_states, vif_recs):
with mock.patch("quark.cache.security_groups_client."
"SecurityGroupsClient.get_security_group_states")\
as sec_grp_client,\
mock.patch("quark.agent.xapi.XapiClient._session"):
sec_grp_client.side_effect = sg_states
yield interfaces, sg_states, vif_recs
interfaces = [xapi.VIF("added", vif_recs[0], "added_ref"),
xapi.VIF("updated", vif_recs[1], "updated_ref"),
xapi.VIF("removed", vif_recs[2], "removed_ref"),
xapi.VIF("no groups", vif_recs[3], "no groups ref"),
xapi.VIF("self heal", vif_recs[4], "self heal ref")]
def _vif_rec(self, mac, tagged):
rec = {"MAC": mac, "other_config": {}}
if tagged:
rec["other_config"] = {"security_groups": "enabled"}
return rec
sg_states = {interfaces[0]: False, interfaces[1]: False,
interfaces[4]: True}
def test_partition_vifs(self):
with self._stubs(self.interfaces, self.sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
xapi_client = xapi.XapiClient()
added, updated, removed = agent.partition_vifs(xapi_client,
interfaces,
sg_states)
self.assertEqual(added, [interfaces[0], interfaces[4]])
self.assertEqual(updated, [interfaces[1]])
self.assertEqual(removed, [interfaces[2]])
xapi_client = xapi.XapiClient()
def test_get_groups_to_ack_rule_mismatch_init_empty(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules: []}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual([], gta)
added, updated, removed = agent.partition_vifs(xapi_client,
interfaces,
sg_states)
def test_get_groups_to_ack_rule_mismatch_init_populated(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"}]}},
{self.interfaces[0]: {self.ack: False, self.rules: []}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual([], gta)
self.assertEqual(added, [interfaces[0], interfaces[4]])
self.assertEqual(updated, [interfaces[1]])
self.assertEqual(removed, [interfaces[2]])
def test_get_groups_to_ack_rule_mismatch_both_populated(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"}]}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blech": "blah"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual([], gta)
def test_get_groups_to_ack_rule_mismatch_both_populated_multi(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"},
{"blech": "blah"}]}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blech": "blah"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual([], gta)
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blech": "blah"}]}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"},
{"blech": "blah"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual([], gta)
def test_get_groups_to_ack_rule_match_both_empty(self):
sg_states = [{self.interfaces[0]: {self.ack: False, self.rules: []}},
{self.interfaces[0]: {self.ack: False, self.rules: []}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
self.assertEqual(curr_grps, init_grps)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual(groups_to_ack, gta)
def test_get_groups_to_ack_rule_match_both_populated(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"}]}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
self.assertEqual(curr_grps, init_grps)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual(groups_to_ack, gta)
def test_get_groups_to_ack_rule_match_both_populated_multi(self):
sg_states = [
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blah": "blech"},
{"blech": "blah"}]}},
{self.interfaces[0]: {self.ack: False, self.rules:
[{"blech": "blah"},
{"blah": "blech"}]}}]
with self._stubs(self.interfaces, sg_states, self.vif_recs)\
as (interfaces, sg_states, vif_recs):
init_grps = self.client.get_security_group_states(interfaces)
groups_to_ack = [self.interfaces[0]]
curr_grps = self.client.get_security_group_states(interfaces)
gta = agent.get_groups_to_ack(groups_to_ack, init_grps, curr_grps)
self.assertEqual(groups_to_ack, gta)

View File

@ -266,38 +266,44 @@ class TestRedisForAgent(test_base.TestBase):
self.addCleanup(patch.stop)
@mock.patch(
"quark.cache.security_groups_client.SecurityGroupsClient.get_fields")
def test_get_security_group_states_empty(self, mock_get_fields):
"quark.cache.security_groups_client."
"SecurityGroupsClient.get_fields_all")
def test_get_security_group_states_empty(self, get_fields_all):
rc = sg_client.SecurityGroupsClient()
mock_get_fields.return_value = []
get_fields_all.return_value = []
group_states = rc.get_security_group_states([])
mock_get_fields.assert_called_once_with([],
sg_client.SECURITY_GROUP_ACK)
self.assertEqual(group_states, {})
@mock.patch(
"quark.cache.security_groups_client.SecurityGroupsClient.get_fields")
"quark.cache.security_groups_client."
"SecurityGroupsClient.get_fields_all")
def test_get_security_group_states_nonempty(self, mock_get_fields):
rc = sg_client.SecurityGroupsClient()
mock_get_fields.return_value = [
None,
'{}',
'{"%s": False}' % sg_client.SECURITY_GROUP_ACK,
'{"%s": True}' % sg_client.SECURITY_GROUP_ACK,
'{"%s": "1-2-3"}' % sg_client.SECURITY_GROUP_ACK]
{},
{sg_client.SECURITY_GROUP_ACK: "False",
sg_client.SECURITY_GROUP_HASH_ATTR:
'{"rules":[{"direction": "up"}]}'},
{sg_client.SECURITY_GROUP_ACK: "True",
sg_client.SECURITY_GROUP_HASH_ATTR: []},
{sg_client.SECURITY_GROUP_ACK: '1-2-3',
sg_client.SECURITY_GROUP_HASH_ATTR: []}]
recs = [{"MAC": 2}, {"MAC": 4}, {"MAC": 6}, {"MAC": 8}, {"MAC": 0}]
new_interfaces = ([VIF(1, recs[0], 9), VIF(3, recs[1], 0),
VIF(5, recs[2], 1), VIF(7, recs[3], 2),
new_interfaces = ([VIF(1, recs[0], 9),
VIF(3, recs[1], 0),
VIF(5, recs[2], 1),
VIF(7, recs[3], 2),
VIF(9, recs[4], 3)])
valid_response = {new_interfaces[2]:
{sg_client.SECURITY_GROUP_ACK: False,
sg_client.SECURITY_GROUP_HASH_ATTR:
[{"direction": "up"}]},
new_interfaces[3]:
{sg_client.SECURITY_GROUP_ACK: True,
sg_client.SECURITY_GROUP_HASH_ATTR: []}}
group_states = rc.get_security_group_states(new_interfaces)
mock_get_fields.assert_called_once_with(
["1.000000000002", "3.000000000004", "5.000000000006",
"7.000000000008", "9.000000000000"],
sg_client.SECURITY_GROUP_ACK)
self.assertEqual(group_states, {new_interfaces[2]: False,
new_interfaces[3]: True})
self.assertEqual(valid_response, group_states)