Validate parent_id is neither id nor a descendant

A device's parent_id represents the id of another device. A parent_id
cannot be a device's own id nor can it be the id of a descendant.

This commit adds a validator decorator to ensure that the parent_id
supplied meets the requirements before the database is updated. Failure
to properly validate the parent_id can cause the database to become
unresponsive.

If an invalid parent_id is supplied the API will return a 400 Bad
Request.

Change-Id: I81f6bff5bf64b7c441fef08a8e03731cc17bb2f1
Closes-bug: 1666536
This commit is contained in:
git-harry 2017-03-09 10:45:43 +00:00
parent cbb246111b
commit a690e1997c
7 changed files with 487 additions and 3 deletions

View File

@ -603,7 +603,10 @@ def hosts_update(context, host_id, values):
project_only=True)
query = query.filter_by(id=host_id)
host_ref = query.with_for_update().one()
host_ref.update(values)
try:
host_ref.update(values)
except exceptions.ParentIDError as e:
raise exceptions.BadRequest(message=str(e))
host_ref.save(session)
return host_ref
@ -900,7 +903,10 @@ def network_devices_update(context, network_device_id, values):
query = query.filter_by(type='network_devices')
query = query.filter_by(id=network_device_id)
network_device_ref = query.with_for_update().one()
network_device_ref.update(values)
try:
network_device_ref.update(values)
except exceptions.ParentIDError as e:
raise exceptions.BadRequest(message=str(e))
network_device_ref.save(session)
return network_device_ref

View File

@ -23,12 +23,13 @@ from sqlalchemy import (
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.declarative.api import _declarative_constructor
from sqlalchemy.orm import backref, object_mapper, relationship
from sqlalchemy.orm import backref, object_mapper, relationship, validates
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy_utils.types.ip_address import IPAddressType
from sqlalchemy_utils.types.json import JSONType
from sqlalchemy_utils.types.uuid import UUIDType
from craton import exceptions
from craton.db.api import Blame
@ -394,6 +395,25 @@ class Device(Base, VariableMixin):
children = relationship(
'Device', backref=backref('parent', remote_side=[id]))
@validates("parent_id")
def validate_parent_id(self, _, parent_id):
if parent_id is None:
return parent_id
elif parent_id == self.id:
msg = (
"A device cannot be its own parent. The id for '{name}'"
" cannot be used as its parent_id."
).format(name=self.name)
raise exceptions.ParentIDError(msg)
elif parent_id in (descendant.id for descendant in self.descendants):
msg = (
"A device cannot have a descendant as its parent. The"
" parent_id for '{name}' cannot be set to the id '{bad_id}'."
).format(name=self.name, bad_id=parent_id)
raise exceptions.ParentIDError(msg)
else:
return parent_id
@property
def ancestors(self):
lineage = []

View File

@ -81,3 +81,7 @@ class NotFound(Base):
class UnknownException(Base):
code = 500
class ParentIDError(ValueError):
pass

View File

@ -96,6 +96,101 @@ class APIV1HostTest(DeviceTestBase, APIV1ResourceWithVariablesTestCase):
"('updated_at' was unexpected)"]
self.assertEqual(host.json()['errors'], msg)
def test_create_with_parent_id(self):
parent = self.create_host(
name='test1',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.1',
)
child = self.create_host(
name='test2',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.2',
parent_id=parent['id'],
)
self.assertEqual(parent['id'], child['parent_id'])
def test_update_with_parent_id(self):
parent = self.create_host(
name='test1',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.1',
)
child = self.create_host(
name='test2',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.2',
)
self.assertIsNone(child['parent_id'])
url = '{}/v1/hosts/{}'.format(self.url, child['id'])
child_update_resp = self.put(
url, data={'parent_id': parent['id']}
)
self.assertEqual(200, child_update_resp.status_code)
child_update = child_update_resp.json()
self.assertEqual(parent['id'], child_update['parent_id'])
def test_update_with_parent_id_equal_id_fails(self):
host = self.create_host(
name='test1',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.1',
)
url = '{}/v1/hosts/{}'.format(self.url, host['id'])
host_update_resp = self.put(
url, data={'parent_id': host['id']}
)
self.assertEqual(400, host_update_resp.status_code)
def test_update_with_parent_id_equal_descendant_id_fails(self):
parent = self.create_host(
name='test1',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.1',
)
self.assertIsNone(parent['parent_id'])
child = self.create_host(
name='test2',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.2',
parent_id=parent['id'],
)
self.assertEqual(parent['id'], child['parent_id'])
grandchild = self.create_host(
name='test3',
cloud=self.cloud,
region=self.region,
hosttype='server',
ip_address='192.168.1.3',
parent_id=child['id'],
)
self.assertEqual(child['id'], grandchild['parent_id'])
url = '{}/v1/hosts/{}'.format(self.url, parent['id'])
parent_update_resp = self.put(
url, data={'parent_id': grandchild['id']}
)
self.assertEqual(400, parent_update_resp.status_code)
def test_get_all_hosts_with_details(self):
region_vars = {'x': 'y'}
region = self.create_region(name='region1', variables=region_vars)

View File

@ -0,0 +1,101 @@
from craton.tests.functional import DeviceTestBase
class APIV1NetworkDeviceTest(DeviceTestBase):
resource = 'network-devices'
def test_create_with_parent_id(self):
parent = self.create_network_device(
name='test1',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.1',
)
child = self.create_network_device(
name='test2',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.2',
parent_id=parent['id'],
)
self.assertEqual(parent['id'], child['parent_id'])
def test_update_with_parent_id(self):
parent = self.create_network_device(
name='test1',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.1',
)
child = self.create_network_device(
name='test2',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.2',
)
self.assertIsNone(child['parent_id'])
url = '{}/v1/network-devices/{}'.format(self.url, child['id'])
child_update_resp = self.put(
url, data={'parent_id': parent['id']}
)
self.assertEqual(200, child_update_resp.status_code)
child_update = child_update_resp.json()
self.assertEqual(parent['id'], child_update['parent_id'])
def test_update_with_parent_id_equal_id_fails(self):
network_device = self.create_network_device(
name='test1',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.1',
)
url = '{}/v1/network-devices/{}'.format(self.url, network_device['id'])
network_device_update_resp = self.put(
url, data={'parent_id': network_device['id']}
)
self.assertEqual(400, network_device_update_resp.status_code)
def test_update_with_parent_id_equal_descendant_id_fails(self):
parent = self.create_network_device(
name='test1',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.1',
)
self.assertIsNone(parent['parent_id'])
child = self.create_network_device(
name='test2',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.2',
parent_id=parent['id'],
)
self.assertEqual(parent['id'], child['parent_id'])
grandchild = self.create_network_device(
name='test3',
cloud=self.cloud,
region=self.region,
device_type='switch',
ip_address='192.168.1.3',
parent_id=child['id'],
)
self.assertEqual(child['id'], grandchild['parent_id'])
url = '{}/v1/network-devices/{}'.format(self.url, parent['id'])
parent_update_resp = self.put(
url, data={'parent_id': grandchild['id']}
)
self.assertEqual(400, parent_update_resp.status_code)

View File

@ -544,3 +544,131 @@ class HostsDBTestCase(BaseDevicesDBTestCase):
res, _ = dbapi.hosts_get_all(self.context, filters,
default_pagination)
self.assertEqual(len(res), 0)
def test_hosts_create_sets_parent_id(self):
project_id = self.make_project('project_1')
cloud_id = self.make_cloud(project_id, 'cloud_1')
region_id = self.make_region(project_id, cloud_id, 'region_1')
parent_id = self.make_host(
project_id, cloud_id, region_id, '1.www.example.com',
IPAddress(u'10.1.2.101'), 'server'
)
child = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '2.www.example.com',
'ip_address': IPAddress(u'10.1.2.102'),
'device_type': 'server',
'parent_id': parent_id,
}
)
self.assertEqual(parent_id, child.parent_id)
def test_hosts_update_sets_parent_id(self):
project_id = self.make_project('project_1')
cloud_id = self.make_cloud(project_id, 'cloud_1')
region_id = self.make_region(project_id, cloud_id, 'region_1')
parent_id = self.make_host(
project_id, cloud_id, region_id, '1.www.example.com',
IPAddress(u'10.1.2.101'), 'server'
)
child = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '2.www.example.com',
'ip_address': IPAddress(u'10.1.2.102'),
'device_type': 'server',
'parent_id': None,
}
)
self.assertIsNone(child.parent_id)
child_update = dbapi.hosts_update(
self.context,
child.id,
{
'parent_id': parent_id,
}
)
self.assertEqual(parent_id, child_update.parent_id)
def test_hosts_update_fails_when_parent_id_set_to_own_id(self):
project_id = self.make_project('project_1')
cloud_id = self.make_cloud(project_id, 'cloud_1')
region_id = self.make_region(project_id, cloud_id, 'region_1')
host1 = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '1.www.example.com',
'ip_address': IPAddress(u'10.1.2.101'),
'device_type': 'server',
'parent_id': None,
}
)
self.assertRaises(
exceptions.BadRequest,
dbapi.hosts_update,
self.context,
host1.id,
{
'parent_id': host1.id,
}
)
def test_hosts_update_fails_when_parent_set_to_descendant(self):
project_id = self.make_project('project_1')
cloud_id = self.make_cloud(project_id, 'cloud_1')
region_id = self.make_region(project_id, cloud_id, 'region_1')
parent = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '1.www.example.com',
'ip_address': IPAddress(u'10.1.2.101'),
'device_type': 'server',
'parent_id': None,
}
)
child = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '2.www.example.com',
'ip_address': IPAddress(u'10.1.2.102'),
'device_type': 'server',
'parent_id': parent.id,
}
)
grandchild = dbapi.hosts_create(
self.context,
{
'project_id': project_id,
'cloud_id': cloud_id,
'region_id': region_id,
'hostname': '3.www.example.com',
'ip_address': IPAddress(u'10.1.2.103'),
'device_type': 'server',
'parent_id': child.id,
}
)
self.assertRaises(
exceptions.BadRequest,
dbapi.hosts_update,
self.context,
parent.id,
{
'parent_id': grandchild.id,
}
)

View File

@ -327,6 +327,136 @@ class NetworkDevicesDBTestCase(base.DBTestCase):
ndevice = dbapi.network_devices_get_by_id(self.context, ndevice.id)
self.assertEqual(ndevice.labels, {"jerry"})
def test_network_devices_create_sets_parent_id(self):
parent = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '1.www.example.com',
'ip_address': '10.1.2.102',
'device_type': 'switch',
}
)
child = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '2.www.example.com',
'ip_address': '10.1.2.102',
'device_type': 'switch',
'parent_id': parent.id,
}
)
self.assertEqual(parent.id, child.parent_id)
def test_network_devices_update_sets_parent_id(self):
parent = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '1.www.example.com',
'ip_address': '10.1.2.102',
'device_type': 'switch',
}
)
child = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '2.www.example.com',
'ip_address': '10.1.2.102',
'device_type': 'switch',
'parent_id': None,
}
)
self.assertIsNone(child.parent_id)
child_update = dbapi.network_devices_update(
self.context,
child.id,
{
'parent_id': parent.id,
}
)
self.assertEqual(parent.id, child_update.parent_id)
def test_network_devices_update_fails_when_parent_id_set_to_own_id(self):
network_device1 = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '1.www.example.com',
'ip_address': '10.1.2.101',
'device_type': 'switch',
'parent_id': None,
}
)
self.assertRaises(
exceptions.BadRequest,
dbapi.network_devices_update,
self.context,
network_device1.id,
{
'parent_id': network_device1.id,
}
)
def test_network_devices_update_fails_when_parent_set_to_descendant(self):
parent = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '1.www.example.com',
'ip_address': '10.1.2.101',
'device_type': 'switch',
'parent_id': None,
}
)
child = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '2.www.example.com',
'ip_address': '10.1.2.102',
'device_type': 'switch',
'parent_id': parent.id,
}
)
grandchild = dbapi.network_devices_create(
self.context,
{
'project_id': project_id1,
'cloud_id': cloud_id1,
'region_id': 1,
'name': '3.www.example.com',
'ip_address': '10.1.2.103',
'device_type': 'switch',
'parent_id': child.id,
}
)
self.assertRaises(
exceptions.BadRequest,
dbapi.network_devices_update,
self.context,
parent.id,
{
'parent_id': grandchild.id,
}
)
class NetworkInterfacesDBTestCase(base.DBTestCase):