Merge "Don't include trust IDs in Alarm action output"

This commit is contained in:
Zuul 2018-04-23 13:17:10 +00:00 committed by Gerrit Code Review
commit 2ecb6e63f5
2 changed files with 40 additions and 5 deletions

View File

@ -269,6 +269,19 @@ class Alarm(base.Base):
self.time_constraints = [AlarmTimeConstraint(**tc)
for tc in time_constraints]
@classmethod
def from_db_model_scrubbed(cls, m):
# Return an Alarm from a DB model with trust IDs scrubbed from actions
data = m.as_dict()
for field in ('ok_actions', 'alarm_actions',
'insufficient_data_actions'):
if data.get(field) is not None:
data[field] = [cls._scrub_action_url(action)
for action in data[field]]
return cls(**data)
@staticmethod
def validate(alarm):
Alarm.check_rule(alarm)
@ -380,6 +393,17 @@ class Alarm(base.Base):
def _is_trust_url(url):
return url.scheme.startswith('trust+')
@staticmethod
def _scrub_action_url(action):
"""Remove trust ID from a URL."""
url = netutils.urlsplit(action)
if Alarm._is_trust_url(url):
netloc = url.netloc.rsplit('@', 1)[-1]
url = urlparse.SplitResult(url.scheme, netloc,
url.path, url.query,
url.fragment)
return url.geturl()
def _get_existing_trust_ids(self):
for action in itertools.chain(self.ok_actions or [],
self.alarm_actions or [],
@ -590,7 +614,7 @@ class AlarmController(rest.RestController):
@wsme_pecan.wsexpose(Alarm)
def get(self):
"""Return this alarm."""
return Alarm.from_db_model(self._enforce_rbac('get_alarm'))
return Alarm.from_db_model_scrubbed(self._enforce_rbac('get_alarm'))
@wsme_pecan.wsexpose(Alarm, body=Alarm)
def put(self, data):
@ -642,7 +666,7 @@ class AlarmController(rest.RestController):
if v != old_alarm[k] and k not in
['timestamp', 'state_timestamp'])
self._record_change(change, now, on_behalf_of=alarm.project_id)
return Alarm.from_db_model(alarm)
return Alarm.from_db_model_scrubbed(alarm)
@wsme_pecan.wsexpose(None, status_code=204)
def delete(self):
@ -805,7 +829,7 @@ class AlarmsController(rest.RestController):
alarm = conn.create_alarm(alarm_in)
self._record_creation(conn, change, alarm.alarm_id, now)
v2_utils.set_resp_location_hdr("/alarms/" + alarm.alarm_id)
return Alarm.from_db_model(alarm)
return Alarm.from_db_model_scrubbed(alarm)
@wsme_pecan.wsexpose([Alarm], [base.Query], [str], int, str)
def get_all(self, q=None, sort=None, limit=None, marker=None):
@ -829,5 +853,5 @@ class AlarmsController(rest.RestController):
if sort or limit or marker:
kwargs['pagination'] = v2_utils.get_pagination_options(
sort, limit, marker, models.Alarm)
return [Alarm.from_db_model(m)
return [Alarm.from_db_model_scrubbed(m)
for m in pecan.request.storage.get_alarms(**kwargs)]

View File

@ -1207,6 +1207,10 @@ class TestAlarms(TestAlarmsBase):
else:
self.fail("Alarm not found")
data = self._get_alarm(alarm.alarm_id)
self.assertEqual(
['trust+http://my.server:1234/foo'], data['ok_actions'])
with mock.patch('aodh.keystone_client.get_client') as client:
client.return_value = mock.Mock(
auth_ref=mock.Mock(user_id='my_user'))
@ -1419,9 +1423,16 @@ class TestAlarms(TestAlarmsBase):
self.put_json('/alarms/%s' % data['alarm_id'],
params=data,
headers=self.auth_headers)
for alarm in list(self.alarm_conn.get_alarms()):
if alarm.alarm_id == data['alarm_id']:
self.assertEqual(
['trust+http://5678:delete@something/ok'],
alarm.ok_actions)
break
data = self._get_alarm('a')
self.assertEqual(
['trust+http://5678:delete@something/ok'], data['ok_actions'])
['trust+http://something/ok'], data['ok_actions'])
data.update({'ok_actions': ['http://no-trust-something/ok']})