Merge "Impl multitenancy support"

This commit is contained in:
Jenkins 2013-10-03 14:38:25 +00:00 committed by Gerrit Code Review
commit 12b470ab2b
7 changed files with 82 additions and 55 deletions

View File

@ -75,6 +75,7 @@ class ConductorManager(db_base.Base):
return
for node_group in node_groups:
node_group["tenant_id"] = context.tenant_id
self._populate_node_group(context, node_group)
def _cleanup_node_group(self, node_group):
@ -161,6 +162,7 @@ class ConductorManager(db_base.Base):
"""Create a Node Group from the values dictionary."""
values = copy.deepcopy(values)
self._populate_node_group(context, values)
values['tenant_id'] = context.tenant_id
return self.db.node_group_add(context, cluster, values)
def node_group_update(self, context, node_group, values):
@ -178,6 +180,7 @@ class ConductorManager(db_base.Base):
"""Create an Instance from the values dictionary."""
values = copy.deepcopy(values)
values = _apply_defaults(values, INSTANCE_DEFAULTS)
values['tenant_id'] = context.tenant_id
return self.db.instance_add(context, node_group, values)
def instance_update(self, context, instance, values):

View File

@ -172,7 +172,7 @@ class NodeGroupTemplateResource(Resource, objects.NodeGroupTemplate):
class InstanceResource(Resource, objects.Instance):
_filter_fields = ['node_group_id']
_filter_fields = ['tenant_id', 'node_group_id']
class NodeGroupResource(Resource, objects.NodeGroup):
@ -181,7 +181,7 @@ class NodeGroupResource(Resource, objects.NodeGroup):
'node_group_template': (NodeGroupTemplateResource, None)
}
_filter_fields = ['id', 'cluster_id', 'cluster_template_id']
_filter_fields = ['id', 'tenant_id', 'cluster_id', 'cluster_template_id']
class ClusterTemplateResource(Resource, objects.ClusterTemplate):

View File

@ -37,20 +37,20 @@ def get_backend():
return sys.modules[__name__]
def model_query(model, context, session=None, project_only=None):
def model_query(model, context, session=None, project_only=True):
"""Query helper.
:param model: base model to query
:param context: context to query under
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
query to match the context's tenant_id.
"""
session = session or get_session()
query = session.query(model)
if project_only:
query = query.filter_by(tenant_id=context.project_id)
if project_only and not context.is_admin:
query = query.filter_by(tenant_id=context.tenant_id)
return query
@ -188,7 +188,6 @@ def cluster_destroy(context, cluster_id):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if not cluster:
raise ex.NotFoundException(cluster_id,
"Cluster id '%s' not found!")
@ -207,6 +206,11 @@ def node_group_add(context, cluster_id, values):
session = get_session()
with session.begin():
cluster = _cluster_get(context, session, cluster_id)
if not cluster:
raise ex.NotFoundException(cluster_id,
"Cluster id '%s' not found!")
node_group = m.NodeGroup()
node_group.update({"cluster_id": cluster_id})
node_group.update(values)
@ -219,14 +223,18 @@ def node_group_update(context, node_group_id, values):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
node_group.update(values)
def node_group_remove(context, node_group_id):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
@ -245,6 +253,11 @@ def instance_add(context, node_group_id, values):
session = get_session()
with session.begin():
node_group = _node_group_get(context, session, node_group_id)
if not node_group:
raise ex.NotFoundException(node_group_id,
"Node Group id '%s' not found!")
instance = m.Instance()
instance.update({"node_group_id": node_group_id})
instance.update(values)
@ -260,6 +273,10 @@ def instance_update(context, instance_id, values):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.update(values)
@ -267,7 +284,6 @@ def instance_remove(context, instance_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
@ -285,6 +301,10 @@ def append_volume(context, instance_id, volume_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.volumes.append(volume_id)
@ -292,6 +312,10 @@ def remove_volume(context, instance_id, volume_id):
session = get_session()
with session.begin():
instance = _instance_get(context, session, instance_id)
if not instance:
raise ex.NotFoundException(instance_id,
"Instance id '%s' not found!")
instance.volumes.remove(volume_id)
@ -344,7 +368,6 @@ def cluster_template_destroy(context, cluster_template_id):
with session.begin():
cluster_template = _cluster_template_get(context, session,
cluster_template_id)
if not cluster_template:
raise ex.NotFoundException(cluster_template_id,
"Cluster Template id '%s' not found!")
@ -387,7 +410,6 @@ def node_group_template_destroy(context, node_group_template_id):
with session.begin():
node_group_template = _node_group_template_get(context, session,
node_group_template_id)
if not node_group_template:
raise ex.NotFoundException(
node_group_template_id,
@ -429,7 +451,6 @@ def data_source_destroy(context, data_source_id):
session = get_session()
with session.begin():
data_source = _data_source_get(context, session, data_source_id)
if not data_source:
raise ex.NotFoundException(data_source_id,
"Data Source id '%s' not found!")
@ -490,7 +511,6 @@ def job_execution_destroy(context, job_execution_id):
session = get_session()
with session.begin():
job_ex = _job_execution_get(context, session, job_execution_id)
if not job_ex:
raise ex.NotFoundException(job_execution_id,
"JobExecution id '%s' not found!")
@ -514,6 +534,14 @@ def job_get_all(context):
return query.all()
def _append_job_binaries(context, session, from_list, to_list):
for job_binary_id in from_list:
job_binary = model_query(
m.JobBinary, context, session).filter_by(id=job_binary_id).first()
if job_binary is not None:
to_list.append(job_binary)
def job_create(context, values):
mains = values.pop("mains", [])
libs = values.pop("libs", [])
@ -528,19 +556,8 @@ def job_create(context, values):
job.mains = []
job.libs = []
try:
for main in mains:
query = model_query(m.JobBinary,
context, session).filter_by(id=main)
job_binary = query.first()
if job_binary is not None:
job.mains.append(job_binary)
for lib in libs:
query = model_query(m.JobBinary,
context, session).filter_by(id=lib)
job_binary = query.first()
if job_binary is not None:
job.libs.append(job_binary)
_append_job_binaries(context, session, mains, job.mains)
_append_job_binaries(context, session, libs, job.libs)
job.save(session=session)
except db_exc.DBDuplicateEntry as e:
@ -567,15 +584,19 @@ def job_destroy(context, job_id):
session = get_session()
with session.begin():
job = _job_get(context, session, job_id)
if not job:
raise ex.NotFoundException(job_id,
"Job id '%s' not found!")
session.delete(job)
## JobBinary ops
def _job_binary_get(context, session, job_binary_id):
query = model_query(m.JobBinary, context, session)
return query.filter_by(id=job_binary_id).first()
def job_binary_get_all(context):
"""Returns JobBinary objects that do not contain a data field
@ -591,8 +612,7 @@ def job_binary_get(context, job_binary_id):
The data column uses deferred loadling.
"""
query = model_query(m.JobBinary, context).filter_by(id=job_binary_id)
return query.first()
return _job_binary_get(context, get_session(), job_binary_id)
def job_binary_create(context, values):
@ -612,35 +632,37 @@ def job_binary_create(context, values):
return job_binary
def _check_job_binary_referenced(session, id):
def _check_job_binary_referenced(ctx, session, job_binary_id):
args = {"JobBinary_id": job_binary_id}
mains = model_query(m.mains_association, ctx, session,
project_only=False).filter_by(**args)
libs = model_query(m.libs_association, ctx, session,
project_only=False).filter_by(**args)
args = {"JobBinary_id": id}
return model_query(m.mains_association,
None, session).filter_by(**args).first() is not None or\
model_query(m.libs_association,
None, session).filter_by(**args).first() is not None
return mains.first() is not None or libs.first() is not None
def job_binary_destroy(context, job_binary_id):
session = get_session()
with session.begin():
job_binary = model_query(m.JobBinary,
context,
session).filter_by(id=job_binary_id).first()
job_binary = _job_binary_get(context, session, job_binary_id)
if not job_binary:
raise ex.NotFoundException(job_binary_id,
"JobBinary id '%s' not found!")
if _check_job_binary_referenced(session, job_binary.id):
if _check_job_binary_referenced(context, session, job_binary_id):
raise ex.DeletionFailed("JobBinary is referenced"
"and cannot be deleted")
session.delete(job_binary)
## JobBinaryInternal ops
def _job_binary_internal_get(context, session, job_binary_internal_id):
query = model_query(m.JobBinaryInternal, context, session)
return query.filter_by(id=job_binary_internal_id).first()
def job_binary_internal_get_all(context):
"""Returns JobBinaryInternal objects that do not contain a data field
@ -656,15 +678,14 @@ def job_binary_internal_get(context, job_binary_internal_id):
The data column uses deferred loadling.
"""
query = model_query(m.JobBinaryInternal, context).filter_by(
id=job_binary_internal_id)
return query.first()
return _job_binary_internal_get(context, get_session(),
job_binary_internal_id)
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
"""Returns only the data field for the specified JobBinaryInternal."""
query = model_query(m.JobBinaryInternal, context).options(
sa.orm.undefer("data"))
query = model_query(m.JobBinaryInternal, context)
query = query.options(sa.orm.undefer("data"))
res = query.filter_by(id=job_binary_internal_id).first()
if res is not None:
res = res.data
@ -691,13 +712,10 @@ def job_binary_internal_create(context, values):
def job_binary_internal_destroy(context, job_binary_internal_id):
session = get_session()
with session.begin():
b_intrnl = model_query(m.JobBinaryInternal,
context
).filter_by(id=job_binary_internal_id).first()
if not b_intrnl:
job_binary_internal = _job_binary_internal_get(context, session,
job_binary_internal_id)
if not job_binary_internal:
raise ex.NotFoundException(job_binary_internal_id,
"JobBinaryInternal id '%s' not found!")
session.delete(b_intrnl)
session.delete(job_binary_internal)

View File

@ -85,6 +85,7 @@ class NodeGroup(mb.SavannaBase):
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
tenant_id = sa.Column(sa.String(36))
flavor_id = sa.Column(sa.String(36), nullable=False)
image_id = sa.Column(sa.String(36))
node_processes = sa.Column(st.JsonListType())
@ -121,6 +122,7 @@ class Instance(mb.SavannaBase):
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
node_group_id = sa.Column(sa.String(36), sa.ForeignKey('node_groups.id'))
instance_id = sa.Column(sa.String(36))
instance_name = sa.Column(sa.String(80), nullable=False)
@ -194,6 +196,7 @@ class TemplatesRelation(mb.SavannaBase):
__tablename__ = 'templates_relations'
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
flavor_id = sa.Column(sa.String(36), nullable=False)
image_id = sa.Column(sa.String(36))

View File

@ -106,7 +106,7 @@ def scale_cluster(cluster, node_group_id_map, plugin):
_await_networks(instances)
cluster = conductor.cluster_get(context, cluster)
cluster = conductor.cluster_get(ctx, cluster)
volumes.attach_to_instances(get_instances(cluster, instance_ids))

View File

@ -109,6 +109,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
ng.pop("volumes_size")
ng.pop("volumes_per_node")
ng.pop("floating_ip_pool")
ng.pop("tenant_id")
self.assertListEqual(SAMPLE_CLUSTER["node_groups"],
cl_db_obj["node_groups"])
@ -212,6 +213,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
if ng["id"] != ng_id:
continue
ng.pop('tenant_id')
self.assertEqual(count + 1, ng["count"])
self.assertEqual("additional_vm",
ng["instances"][0]["instance_name"])
@ -225,7 +227,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
instance_id = self._add_instance(ctx, ng_id)
self.api.instance_update(context, instance_id,
self.api.instance_update(ctx, instance_id,
{"management_ip": "1.1.1.1"})
cluster_db_obj = self.api.cluster_get(ctx, _id)

View File

@ -174,6 +174,7 @@ class ClusterTemplates(test_base.ConductorManagerTestCase):
ng.pop("created_at")
ng.pop("updated_at")
ng.pop("id")
ng.pop("tenant_id")
self.assertEqual(ng.pop("cluster_template_id"), clt_db_obj_id)
ng.pop("image_id")
ng.pop("node_configs")