Migration_Scripts for Resource_sync.

Create two tables sync_job and resource_sync.
Sync_job contains job details of which id is the primary-key.
id is taken as a foreign-key and is also a part of composite primary key
along with Resource,Region off the resource_sync table.
Also created the api-calls with which interaction between the
application and the database occurs.
Added Test_cases for the same.

Change-Id: Iba1c789715aa7c7d00a7a2ec23a449c72a757ac1
This commit is contained in:
ubuntu 2017-01-30 18:11:11 +05:30 committed by Goutham Pratapa
parent d3cf6a46eb
commit ac58c2a3e4
8 changed files with 464 additions and 7 deletions

View File

@ -36,6 +36,10 @@ NEUTRON_QUOTA_FIELDS = ("network",
"security_group_rule",
)
SYNC_STATUS = "IN_PROGRESS"
RPC_API_VERSION = "1.0"
TOPIC_KB_ENGINE = "kingbird-engine"
JOB_SUCCESS = "SUCCESS"

View File

@ -93,6 +93,10 @@ class QuotaClassNotFound(NotFound):
message = _("Quota class %(class_name) doesn't exist.")
class JobNotFound(NotFound):
message = _("Job doesn't exist.")
class ConnectionRefused(KingbirdException):
message = _("Connection to the service endpoint is refused")

View File

@ -148,3 +148,42 @@ def service_get(context, service_id):
def service_get_all(context):
return IMPL.service_get_all(context)
def sync_job_create(context, job_id):
return IMPL.sync_job_create(context, job_id)
def sync_job_list(context, action=None):
return IMPL.sync_job_list(context, action)
def sync_job_status(context, job_id):
return IMPL.sync_job_status(context, job_id)
def sync_job_update(context, job_id, status):
return IMPL.sync_job_status(context, job_id, status)
def sync_job_delete(context, job_id):
return IMPL.sync_job_delete(context, job_id)
def resource_sync_create(context, job, region, resource,
resource_type):
return IMPL.resource_sync_create(context, job, region, resource,
resource_type)
def resource_sync_update(context, job_id, region, resource, status):
return IMPL.resource_sync_update(context, job_id, region, resource,
status)
def resource_sync_status(context, job_id):
return IMPL.resource_sync_status(context, job_id)
def resource_sync_list_by_job(context, job_id):
return IMPL.resource_sync_list_by_job(context, job_id)

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
'''
"""
Implementation of SQLAlchemy backend.
'''
"""
import sys
import threading
@ -28,6 +28,7 @@ from oslo_utils import timeutils
from sqlalchemy.orm import joinedload_all
from kingbird.common import consts
from kingbird.common import exceptions as exception
from kingbird.common.i18n import _
from kingbird.db.sqlalchemy import migration
@ -83,7 +84,7 @@ def _session(context):
def is_admin_context(context):
"""Indicates if the request context is an administrator."""
"""Indicate if the request context is an administrator."""
if not context:
LOG.warning(_('Use of empty request context is deprecated'),
DeprecationWarning)
@ -92,7 +93,7 @@ def is_admin_context(context):
def is_user_context(context):
"""Indicates if the request context is a normal user."""
"""Indicate if the request context is a normal user."""
if not context:
return False
if context.is_admin:
@ -107,7 +108,6 @@ def require_admin_context(f):
The first argument to the wrapped function must be the context.
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]):
raise exception.AdminRequired()
@ -123,8 +123,8 @@ def require_context(f):
:py:func:`authorize_project_context` and
:py:func:`authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
"""
def wrapper(*args, **kwargs):
if not is_admin_context(args[0]) and not is_user_context(args[0]):
raise exception.NotAuthorized()
@ -382,3 +382,144 @@ def service_get(context, service_id):
def service_get_all(context):
return model_query(context, models.Service).all()
##########################
@require_context
def sync_job_create(context, job_id):
with write_session() as session:
sjc = models.SyncJob()
sjc.id = job_id
sjc.user_id = context.user
sjc.project_id = context.project
session.add(sjc)
return sjc
@require_context
def sync_job_list(context, action=None):
if action == 'active':
rows = model_query(context, models.SyncJob). \
filter_by(user_id=context.user, project_id=context.project,
sync_status=consts.SYNC_STATUS). \
all()
else:
rows = model_query(context, models.SyncJob). \
filter_by(user_id=context.user, project_id=context.project). \
all()
output = list()
for row in rows:
result = dict()
result['id'] = row.id
result['sync_status'] = row.sync_status
result['updated_at'] = row.updated_at
output.append(result)
return output
@require_context
def sync_job_status(context, job_id):
row = model_query(context, models.SyncJob).\
filter_by(id=job_id, user_id=context.user,
project_id=context.project).first()
if not row:
raise exception.JobNotFound()
status = row.sync_status
return status
@require_context
def sync_job_update(context, job_id, status):
with write_session() as session:
sync_job_ref = session.query(models.SyncJob). \
filter_by(id=job_id).first()
if not sync_job_ref:
raise exception.JobNotFound()
values = dict()
values['sync_status'] = status
sync_job_ref.update(values)
@require_context
def sync_job_delete(context, job_id):
with write_session() as session:
parent_job = model_query(context, models.SyncJob). \
filter_by(id=job_id, user_id=context.user,
project_id=context.project).first()
if parent_job:
child_jobs = model_query(context, models.ResourceSync). \
filter_by(job_id=parent_job.id).all()
if not child_jobs:
raise exception.JobNotFound()
for child_job in child_jobs:
session.delete(child_job)
session.delete(parent_job)
else:
raise exception.JobNotFound()
##########################
@require_context
def resource_sync_create(context, job, region, resource,
resource_type):
if not job:
raise exception.JobNotFound()
with write_session() as session:
rsc = models.ResourceSync()
rsc.sync_job = job
rsc.resource = resource
rsc.region = region
rsc.resource_type = resource_type
session.add(rsc)
return rsc
@require_context
def resource_sync_update(context, job_id, region, resource, status):
with write_session() as session:
resource_sync_ref = session.query(models.ResourceSync).\
filter_by(job_id=job_id, region=region, resource=resource).\
first()
if not resource_sync_ref:
raise exception.JobNotFound()
values = dict()
values['sync_status'] = status
resource_sync_ref.update(values)
return resource_sync_ref
@require_context
def resource_sync_status(context, job_id):
rows = model_query(context, models.ResourceSync).\
filter_by(job_id=job_id).all()
output = list()
if not rows:
raise exception.JobNotFound()
for row in rows:
output.append(row.sync_status)
return output
@require_context
def resource_sync_list_by_job(context, job_id):
parent_row = model_query(context, models.SyncJob).\
filter_by(id=job_id, user_id=context.user,
project_id=context.project).first()
if not parent_row:
raise exception.JobNotFound()
rows = model_query(context, models.ResourceSync).\
filter_by(job_id=parent_row.id).all()
output = list()
if not rows:
raise exception.JobNotFound()
for row in rows:
result = dict()
result['region'] = row.region
result['resource'] = row.resource
result['resource_type'] = row.resource_type
result['sync_status'] = row.sync_status
result['updated_at'] = row.updated_at.isoformat()
result['created_at'] = row.created_at.isoformat()
output.append(result)
return output

View File

@ -0,0 +1,62 @@
# Copyright (c) 2017 Ericsson AB.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from kingbird.common import consts
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
sync_job = sqlalchemy.Table(
'sync_job', meta,
sqlalchemy.Column('id', sqlalchemy.String(36),
primary_key=True),
sqlalchemy.Column('sync_status', sqlalchemy.String(length=36),
default=consts.SYNC_STATUS, nullable=False),
sqlalchemy.Column('user_id', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('project_id', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Integer),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
resource_sync = sqlalchemy.Table(
'resource_sync', meta,
sqlalchemy.Column('job_id', sqlalchemy.String(36),
sqlalchemy.ForeignKey('sync_job.id'),
primary_key=True),
sqlalchemy.Column('region', sqlalchemy.String(36),
primary_key=True),
sqlalchemy.Column('resource', sqlalchemy.String(36),
primary_key=True),
sqlalchemy.Column('resource_type', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('sync_status', sqlalchemy.String(36),
default=consts.SYNC_STATUS, nullable=False),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted', sqlalchemy.Integer),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
sync_job.create()
resource_sync.create()

View File

@ -21,6 +21,10 @@ from oslo_db.sqlalchemy import models
from sqlalchemy.orm import session as orm_session
from sqlalchemy import (Column, Integer, String, Boolean, schema)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from kingbird.common import consts
BASE = declarative_base()
@ -35,6 +39,7 @@ class KingbirdBase(models.ModelBase,
models.SoftDeleteMixin,
models.TimestampMixin):
"""Base class for Kingbird Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
def expire(self, session=None, attrs=None):
@ -142,3 +147,38 @@ class Service(BASE, KingbirdBase):
disabled = Column(Boolean, default=False)
disabled_reason = Column(String(255))
class SyncJob(BASE, KingbirdBase):
"""Kingbird Sync Job registry"""
__tablename__ = 'sync_job'
id = Column('id', String(36), primary_key=True)
sync_status = Column(String(36), default=consts.SYNC_STATUS,
nullable=False)
job_relation = relationship('ResourceSync', backref='sync_job')
user_id = Column('user_id', String(36), nullable=False)
project_id = Column('project_id', String(36), nullable=False)
class ResourceSync(BASE, KingbirdBase):
"""ResourceSync_registry"""
__tablename__ = 'resource_sync'
job_id = Column('job_id', String(36),
ForeignKey('sync_job.id'), primary_key=True)
region = Column('region', String(36), primary_key=True)
resource = Column('resource', String(36), primary_key=True)
resource_type = Column('resource_type', String(36), nullable=False)
sync_status = Column(String(36), default=consts.SYNC_STATUS,
nullable=False)

View File

@ -0,0 +1,167 @@
# Copyright (c) 2017 Ericsson AB
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_db
import sqlalchemy
from oslo_config import cfg
from oslo_db import options
from kingbird.common import config
from kingbird.common import consts
from kingbird.common import exceptions
from kingbird.db import api as api
from kingbird.db.sqlalchemy import api as db_api
from kingbird.tests import base
from kingbird.tests import utils
config.register_options()
get_engine = api.get_engine
UUID1 = utils.UUID1
UUID2 = utils.UUID2
class DBAPIResourceSyncTest(base.KingbirdTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://",
sqlite_db='kingbird.db')
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
@staticmethod
def sync_job_create(ctxt, **kwargs):
return db_api.sync_job_create(ctxt, **kwargs)
@staticmethod
def resource_sync_create(ctxt, **kwargs):
return db_api.resource_sync_create(ctxt, **kwargs)
def setUp(self):
super(DBAPIResourceSyncTest, self).setUp()
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctx = utils.dummy_context()
def test_create_sync_job(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
self.assertEqual(consts.SYNC_STATUS, job.sync_status)
created_job = db_api.sync_job_list(self.ctx, "active")
self.assertEqual(consts.SYNC_STATUS, created_job[0].get('sync_status'))
def test_primary_key_sync_job(self):
self.sync_job_create(self.ctx, job_id=UUID1)
self.assertRaises(oslo_db.exception.DBDuplicateEntry,
self.sync_job_create, self.ctx, job_id=UUID1)
def test_sync_job_update(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
db_api.sync_job_update(self.ctx, UUID1, consts.JOB_SUCCESS)
updated_job = db_api.sync_job_list(self.ctx)
self.assertEqual(consts.JOB_SUCCESS, updated_job[0].get('sync_status'))
def test_active_jobs(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
query = db_api.sync_job_list(self.ctx, 'active')
self.assertEqual(query[0].get('sync_status'), job.sync_status)
def test_sync_job_status(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
query = db_api.sync_job_status(self.ctx, job_id=UUID1)
self.assertEqual(query, consts.SYNC_STATUS)
def test_update_invalid_job(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
self.assertRaises(exceptions.JobNotFound,
db_api.sync_job_update, self.ctx, 'fake_job',
consts.JOB_SUCCESS)
def test_resource_sync_create(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
resource_sync_create = self.resource_sync_create(
self.ctx, job=job, region='Fake_region',
resource='fake_key', resource_type='keypair')
self.assertIsNotNone(resource_sync_create)
self.assertEqual(consts.SYNC_STATUS, resource_sync_create.sync_status)
def test_resource_sync_status(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
resource_sync_create = self.resource_sync_create(
self.ctx, job=job, region='Fake_region',
resource='fake_key', resource_type='keypair')
self.assertIsNotNone(resource_sync_create)
status = db_api.resource_sync_status(self.ctx, job.id)
self.assertEqual(consts.SYNC_STATUS, status[0])
def test_resource_sync_update(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
resource_sync_create = self.resource_sync_create(
self.ctx, job=job, region='Fake_region',
resource='fake_key', resource_type='keypair')
self.assertIsNotNone(resource_sync_create)
self.assertEqual(consts.SYNC_STATUS, resource_sync_create.sync_status)
db_api.resource_sync_update(
self.ctx, job.id, 'Fake_region', 'fake_key', consts.JOB_SUCCESS)
updated_job = db_api.resource_sync_list_by_job(self.ctx, job.id)
self.assertEqual(consts.JOB_SUCCESS, updated_job[0].get('sync_status'))
def test_foreign_key(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
resource_sync_create = self.resource_sync_create(
self.ctx, job=job, region='Fake_region', resource='fake_key',
resource_type='keypair')
self.assertIsNotNone(resource_sync_create)
self.assertEqual(job.id, resource_sync_create.job_id)
def test_delete_sync_job(self):
job_id = UUID1
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.assertIsNotNone(job)
self.resource_sync_create(
self.ctx, job=job, region='Fake_region', resource='fake_key',
resource_type='keypair')
db_api.sync_job_delete(self.ctx, job_id)
updated_job = db_api.sync_job_list(self.ctx)
self.assertEqual(0, len(updated_job))
def test_composite_primary_key(self):
job = self.sync_job_create(self.ctx, job_id=UUID1)
self.resource_sync_create(
self.ctx, job=job, region='Fake_region', resource='fake_key',
resource_type='keypair')
self.assertRaises(oslo_db.exception.DBDuplicateEntry,
self.resource_sync_create, self.ctx, job=job,
region='Fake_region', resource='fake_key',
resource_type='keypair')

View File

@ -85,7 +85,7 @@ def dummy_context(user='test_username', tenant='test_project_id',
return context.RequestContext.from_dict({
'auth_token': 'abcd1234',
'user': user,
'tenant': tenant,
'project': tenant,
'is_admin': True,
'region_name': region_name
})