Don't include trust IDs in Alarm action output

It was originally envisaged that we would allow users to specify trust
IDs in trust+*:// alarm action URLs, and only create trusts internally
when none was specified. This made storage of trust IDs in the username
field of the URL part of the API interface, and it made sense that the
user would have access to the IDs both for trusts they had created
themselves and those created internally.

However, this proved to be a security hole that was closed by
cb90d3ad47, and now we forbid users from
passing in a trust ID. Any trust IDs that appear in the URL were created
internally.

This causes issues when updating alarms because aodhclient uses a
read-modify-write pattern to do so. The action URLs output from a GET
may contain trust IDs, while the input to a PUT is forbidden from
containing them. This means the data is not round-trippable, so updates
even to unrelated fields fail.

Since storage of the trust ID in action URLs is purely an internal
implementation detail at this point, scrub it out of data returned by
the REST API. This means the output of e.g. a GET will match the
original input data, as the user probably expects, and can be fed back
into a PUT successfully.

Change-Id: I230c0d80a9440c7d002b9a623048ef715a9df743
Closes-Bug: #1737186
Related-Bug: #1649333
(cherry picked from commit 92182de328)
This commit is contained in:
Zane Bitter 2018-03-22 17:12:43 -04:00
parent f549faea0e
commit 149d3ad219
2 changed files with 40 additions and 5 deletions

View File

@ -269,6 +269,19 @@ class Alarm(base.Base):
self.time_constraints = [AlarmTimeConstraint(**tc)
for tc in time_constraints]
@classmethod
def from_db_model_scrubbed(cls, m):
# Return an Alarm from a DB model with trust IDs scrubbed from actions
data = m.as_dict()
for field in ('ok_actions', 'alarm_actions',
'insufficient_data_actions'):
if data.get(field) is not None:
data[field] = [cls._scrub_action_url(action)
for action in data[field]]
return cls(**data)
@staticmethod
def validate(alarm):
Alarm.check_rule(alarm)
@ -380,6 +393,17 @@ class Alarm(base.Base):
def _is_trust_url(url):
return url.scheme.startswith('trust+')
@staticmethod
def _scrub_action_url(action):
"""Remove trust ID from a URL."""
url = netutils.urlsplit(action)
if Alarm._is_trust_url(url):
netloc = url.netloc.rsplit('@', 1)[-1]
url = urlparse.SplitResult(url.scheme, netloc,
url.path, url.query,
url.fragment)
return url.geturl()
def _get_existing_trust_ids(self):
for action in itertools.chain(self.ok_actions or [],
self.alarm_actions or [],
@ -590,7 +614,7 @@ class AlarmController(rest.RestController):
@wsme_pecan.wsexpose(Alarm)
def get(self):
"""Return this alarm."""
return Alarm.from_db_model(self._enforce_rbac('get_alarm'))
return Alarm.from_db_model_scrubbed(self._enforce_rbac('get_alarm'))
@wsme_pecan.wsexpose(Alarm, body=Alarm)
def put(self, data):
@ -642,7 +666,7 @@ class AlarmController(rest.RestController):
if v != old_alarm[k] and k not in
['timestamp', 'state_timestamp'])
self._record_change(change, now, on_behalf_of=alarm.project_id)
return Alarm.from_db_model(alarm)
return Alarm.from_db_model_scrubbed(alarm)
@wsme_pecan.wsexpose(None, status_code=204)
def delete(self):
@ -805,7 +829,7 @@ class AlarmsController(rest.RestController):
alarm = conn.create_alarm(alarm_in)
self._record_creation(conn, change, alarm.alarm_id, now)
v2_utils.set_resp_location_hdr("/alarms/" + alarm.alarm_id)
return Alarm.from_db_model(alarm)
return Alarm.from_db_model_scrubbed(alarm)
@wsme_pecan.wsexpose([Alarm], [base.Query], [str], int, str)
def get_all(self, q=None, sort=None, limit=None, marker=None):
@ -829,5 +853,5 @@ class AlarmsController(rest.RestController):
if sort or limit or marker:
kwargs['pagination'] = v2_utils.get_pagination_options(
sort, limit, marker, models.Alarm)
return [Alarm.from_db_model(m)
return [Alarm.from_db_model_scrubbed(m)
for m in pecan.request.storage.get_alarms(**kwargs)]

View File

@ -1207,6 +1207,10 @@ class TestAlarms(TestAlarmsBase):
else:
self.fail("Alarm not found")
data = self._get_alarm(alarm.alarm_id)
self.assertEqual(
['trust+http://my.server:1234/foo'], data['ok_actions'])
with mock.patch('aodh.keystone_client.get_client') as client:
client.return_value = mock.Mock(
auth_ref=mock.Mock(user_id='my_user'))
@ -1419,9 +1423,16 @@ class TestAlarms(TestAlarmsBase):
self.put_json('/alarms/%s' % data['alarm_id'],
params=data,
headers=self.auth_headers)
for alarm in list(self.alarm_conn.get_alarms()):
if alarm.alarm_id == data['alarm_id']:
self.assertEqual(
['trust+http://5678:delete@something/ok'],
alarm.ok_actions)
break
data = self._get_alarm('a')
self.assertEqual(
['trust+http://5678:delete@something/ok'], data['ok_actions'])
['trust+http://something/ok'], data['ok_actions'])
data.update({'ok_actions': ['http://no-trust-something/ok']})