FIX-1237 register_myself in manager is resilient

Change-Id: I8f7623dd3b1717799efb8d7a378bb09af6efda12
This commit is contained in:
Kanagaraj Manickam 2016-03-23 15:25:02 +05:30
parent eca5fb1b5c
commit 0a765fe3ae
7 changed files with 165 additions and 107 deletions

View File

@ -10,4 +10,4 @@ rabbit_hosts = 172.241.0.101
connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8
[conductor]
workers=10
workers=1

View File

@ -50,7 +50,8 @@ def main():
launcher = os_service.launch(CONF, mgr, CONF.conductor.workers)
# TODO(mrkanag) Namos is not registering the RPC backend, fix it !
# namos.register_myself()
import os_namos
os_namos.register_myself()
launcher.wait()

View File

@ -63,6 +63,12 @@ class NotFound(NamosException):
http_status_code = 404
class AlreadyExist(NamosException):
msg_fmt = ("%(model)s %(name)s already exists")
error_code = 0x01002
http_status_code = 403
class RegionNotFound(NotFound):
msg_fmt = ("Region %(region_id)s does not found")
error_code = 0x01001

View File

@ -129,12 +129,6 @@ class ServiceProcessor(object):
def process_service(self, context):
# Service Node
try:
# TODO(mrkanag) is this to be region specifc search
node = db_api.service_node_get_by_name(
context,
self.registration_info.get('fqdn'))
LOG.info('Service node %s is existing' % node)
except exception.ServiceNodeNotFound:
# TODO(mrkanag) region_id is hard-coded, fix it !
# user proper node name instead of fqdn
node = db_api.service_node_create(
@ -144,14 +138,15 @@ class ServiceProcessor(object):
region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0'))
LOG.info('Service node %s is created' % node)
except exception.AlreadyExist:
# TODO(mrkanag) is this to be region specifc search
node = db_api.service_node_get_by_name(
context,
self.registration_info.get('fqdn'))
LOG.info('Service node %s is existing' % node)
# Service
try:
service = db_api.service_get_by_name(
context,
self.registration_info.get('project_name'))
LOG.info('Service %s is existing' % service)
except exception.ServiceNotFound:
s_id = 'b9c2549f-f685-4bc2-92e9-ba8af9c18591'
service = db_api.service_create(
context,
@ -161,53 +156,36 @@ class ServiceProcessor(object):
keystone_service_id=s_id))
LOG.info('Service %s is created' % service)
except exception.AlreadyExist:
service = db_api.service_get_by_name(
context,
self.registration_info.get('project_name'))
LOG.info('Service %s is existing' % service)
# Service Component
service_components = \
db_api.service_component_get_all_by_node_for_service(
context,
node_id=node.id,
service_id=service.id,
name=self.registration_info['prog_name']
)
if len(service_components) == 1:
service_component = service_components[0]
LOG.info('Service Component %s is existing' % service_component)
# TODO(mrkanag) what to do when service_components size is > 1
else:
try:
service_component = db_api.service_component_create(
context,
dict(name=self.registration_info['prog_name'],
node_id=node.id,
service_id=service.id))
LOG.info('Service Component %s is created' % service_component)
except exception.AlreadyExist:
service_components = \
db_api.service_component_get_all_by_node_for_service(
context,
node_id=node.id,
service_id=service.id,
name=self.registration_info['prog_name']
)
if len(service_components) == 1:
service_component = service_components[0]
LOG.info('Service Component %s is existing' %
service_component)
# TODO(mrkanag) what to do when service_components size is > 1
# Service Worker
# TODO(mrkanag) Find a way to purge the dead service worker
# Once each service is enabled with heart beating namos
# purging can be done once heart beat stopped. this can be
# done from openstack.common.service.py
service_workers = \
db_api.service_worker_get_by_host_for_service_component(
context,
service_component_id=service_component.id,
host=self.registration_info['host']
)
if len(service_workers) == 1:
service_worker = \
db_api.service_worker_update(
context,
service_workers[0].id,
dict(
pid=self.registration_info['pid'],
name='%s@%s' % (self.registration_info['pid'],
service_component.name)
))
LOG.info('Service Worker %s is existing and is updated'
% service_worker)
# TODO(mrkanag) what to do when service_workers size is > 1
else:
try:
service_worker = db_api.service_worker_create(
context,
# TODO(mrkanag) Fix the name, device driver proper !
@ -217,6 +195,31 @@ class ServiceProcessor(object):
host=self.registration_info['host'],
service_component_id=service_component.id))
LOG.info('Service Worker %s is created' % service_worker)
except exception.AlreadyExist:
# TODO(mrkanag) Find a way to purge the dead service worker
# Once each service is enabled with heart beating namos
# purging can be done once heart beat stopped. this can be
# done from openstack.common.service.py
service_workers = \
db_api.service_worker_get_by_host_for_service_component(
context,
service_component_id=service_component.id,
host=self.registration_info['host']
)
if len(service_workers) == 1:
service_worker = \
db_api.service_worker_update(
context,
service_workers[0].id,
dict(
pid=self.registration_info['pid'],
name='%s@%s' % (self.registration_info['pid'],
service_component.name)
))
LOG.info('Service Worker %s is existing and is updated'
% service_worker)
# TODO(mrkanag) what to do when service_workers size is > 1
# Config
# TODO(mrkanag) Optimize the config like per service_component
@ -224,18 +227,20 @@ class ServiceProcessor(object):
for cfg_name, cfg_obj in self.registration_info[
'config_dict'].iteritems():
cfg_obj['service_worker_id'] = service_worker.id
configs = db_api.config_get_by_name_for_service_worker(
context,
service_worker_id=cfg_obj['service_worker_id'],
name=cfg_obj['name'])
if len(configs) == 1:
config = db_api.config_update(context,
configs[0].id,
cfg_obj)
LOG.info("Config %s is existing and is updated" % config)
else:
try:
config = db_api.config_create(context, cfg_obj)
LOG.info("Config %s is created" % config)
except exception.AlreadyExist:
configs = db_api.config_get_by_name_for_service_worker(
context,
service_worker_id=cfg_obj['service_worker_id'],
name=cfg_obj['name'])
if len(configs) == 1:
config = db_api.config_update(context,
configs[0].id,
cfg_obj)
LOG.info("Config %s is existing and is updated" % config)
return service_worker.id
@ -367,11 +372,6 @@ class DriverProcessor(object):
# Device
device_name = self._get_value(device_cfg['name'])
try:
device = db_api.device_get_by_name(
context,
device_name)
LOG.info('Device %s is existing' % device)
except exception.DeviceNotFound:
# TODO(mrkanag) region_id is hard-coded, fix it !
# Set the right status as well
device = db_api.device_create(
@ -381,8 +381,13 @@ class DriverProcessor(object):
region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0'))
LOG.info('Device %s is created' % device)
except exception.AlreadyExist:
device = db_api.device_get_by_name(
context,
device_name)
LOG.info('Device %s is existing' % device)
# Handle child devices
# TODO(mrkanag) Poperly Handle child devices
if child_device_cfg is not None:
for d_name in self._get_value(child_device_cfg['key']):
base_name = self._get_value(child_device_cfg['base_name'])
@ -406,16 +411,7 @@ class DriverProcessor(object):
LOG.info('Device %s is created' % device)
# Device Endpoint
device_endpoints = db_api.device_endpoint_get_by_device_type(
context,
device_id=device.id,
type=endpoint_type,
name=device_endpoint_name)
if len(device_endpoints) >= 1:
device_endpoint = device_endpoints[0]
LOG.info('Device Endpoint %s is existing' %
device_endpoints[0])
else:
try:
for k, v in connection_cfg.iteritems():
connection[k] = self._get_value(k)
@ -426,15 +422,19 @@ class DriverProcessor(object):
type=endpoint_type,
device_id=device.id))
LOG.info('Device Endpoint %s is created' % device_endpoint)
except exception.AlreadyExist:
device_endpoints = db_api.device_endpoint_get_by_device_type(
context,
device_id=device.id,
type=endpoint_type,
name=device_endpoint_name)
if len(device_endpoints) >= 1:
device_endpoint = device_endpoints[0]
LOG.info('Device Endpoint %s is existing' %
device_endpoints[0])
# Device Driver Class
try:
device_driver_class = db_api.device_driver_class_get_by_name(
context,
driver_name)
LOG.info('Device Driver Class %s is existing' %
device_driver_class)
except exception.DeviceDriverClassNotFound:
device_driver_class = db_api.device_driver_class_create(
context,
dict(name=driver_name,
@ -446,21 +446,15 @@ class DriverProcessor(object):
extra=driver_def.get('extra')))
LOG.info('Device Driver Class %s is created' %
device_driver_class)
except exception.AlreadyExist:
device_driver_class = db_api.device_driver_class_get_by_name(
context,
driver_name)
LOG.info('Device Driver Class %s is existing' %
device_driver_class)
# Device Driver
device_drivers = \
db_api.device_driver_get_by_device_endpoint_service_worker(
context,
device_id=device.id,
endpoint_id=device_endpoint.id,
device_driver_class_id=device_driver_class.id,
service_worker_id=self.service_worker_id
)
if len(device_drivers) >= 1:
device_driver = device_drivers[0]
LOG.info('Device Driver %s is existing' %
device_driver)
else:
try:
device_driver = db_api.device_driver_create(
context,
dict(device_id=device.id,
@ -471,6 +465,19 @@ class DriverProcessor(object):
)
LOG.info('Device Driver %s is created' %
device_driver)
except exception.AlreadyExist:
device_drivers = \
db_api.device_driver_get_by_device_endpoint_service_worker(
context,
device_id=device.id,
endpoint_id=device_endpoint.id,
device_driver_class_id=device_driver_class.id,
service_worker_id=self.service_worker_id
)
if len(device_drivers) >= 1:
device_driver = device_drivers[0]
LOG.info('Device Driver %s is existing' %
device_driver)
if __name__ == '__main__':

View File

@ -34,7 +34,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('keystone_service_id', sa.Uuid(length=36), nullable=False),
@ -47,7 +47,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('python_class', sa.String(length=256), nullable=False),
@ -62,7 +62,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('keystone_region_id', sa.String(length=255), nullable=False),
@ -75,7 +75,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('status', sa.String(length=64), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
@ -94,7 +94,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
@ -110,7 +110,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('device_id', sa.Uuid(length=36), nullable=True),
sa.Column('connection', sa.Json(), nullable=False),
@ -124,7 +124,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
@ -141,7 +141,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('endpoint_id', sa.Uuid(length=36), nullable=True),
@ -160,7 +160,7 @@ def upgrade():
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Uuid(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, unique=True),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.Column('extra', sa.Json(), nullable=True),
sa.Column('pid', sa.String(length=32), nullable=False),
@ -173,6 +173,7 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
mysql_engine='InnoDB'
)
# TODO(mrkanag) add oslo_config schema here
def downgrade():

View File

@ -16,6 +16,7 @@
import sys
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import session as db_session
from namos.common import exception
@ -57,7 +58,11 @@ def _session(context):
def _create(context, resource_ref, values):
resource_ref.update(values)
resource_ref.save(_session(context))
try:
resource_ref.save(_session(context))
except db_exception.DBDuplicateEntry:
raise exception.AlreadyExist(model=resource_ref.__class__.__name__,
name=resource_ref.name)
return resource_ref

View File

@ -18,6 +18,7 @@ SQLAlchemy models for namos database
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import UniqueConstraint
import uuid
from namos.db.sqlalchemy.types import Json
@ -37,7 +38,7 @@ class NamosBase(models.ModelBase,
id = sqlalchemy.Column(Uuid, primary_key=True,
default=lambda: str(uuid.uuid4()))
name = sqlalchemy.Column(sqlalchemy.String(255),
# unique=True,
unique=True,
nullable=False,
default=lambda: str(uuid.uuid4()))
@ -128,6 +129,9 @@ class DeviceEndpoint(BASE,
Extra):
__tablename__ = 'device_endpoint'
__table_args__ = (
UniqueConstraint("device_id", "type"),
)
device_id = sqlalchemy.Column(
Uuid,
sqlalchemy.ForeignKey('device.id'),
@ -145,6 +149,12 @@ class DeviceDriver(BASE,
SoftDelete,
Extra):
__tablename__ = 'device_driver'
__table_args__ = (
UniqueConstraint("device_id",
"endpoint_id",
"device_driver_class_id",
"service_worker_id"),
)
endpoint_id = sqlalchemy.Column(
Uuid,
@ -179,7 +189,8 @@ class DeviceDriverClass(BASE,
# TODO(kanagaraj-manickam) Correct the max python class path here
python_class = sqlalchemy.Column(
sqlalchemy.String(256),
nullable=False
nullable=False,
unique=True
)
# service type like compute, network, volume, etc
type = sqlalchemy.Column(
@ -225,6 +236,15 @@ class ServiceComponent(BASE,
Extra):
__tablename__ = 'service_component'
__table_args__ = (
UniqueConstraint("name", "node_id", "service_id"),
)
name = sqlalchemy.Column(sqlalchemy.String(255),
# unique=True,
nullable=False,
default=lambda: str(uuid.uuid4()))
node_id = sqlalchemy.Column(
Uuid,
sqlalchemy.ForeignKey('service_node.id'),
@ -241,6 +261,15 @@ class ServiceWorker(BASE,
Extra):
__tablename__ = 'service_worker'
__table_args__ = (
UniqueConstraint("host", "service_component_id"),
)
name = sqlalchemy.Column(sqlalchemy.String(255),
# unique=True,
nullable=False,
default=lambda: str(uuid.uuid4()))
pid = sqlalchemy.Column(
sqlalchemy.String(32),
nullable=False
@ -261,9 +290,18 @@ class OsloConfig(BASE,
Extra):
__tablename__ = 'oslo_config'
__table_args__ = (
UniqueConstraint("name", "service_worker_id"),
)
default_value = sqlalchemy.Column(
sqlalchemy.Text
)
name = sqlalchemy.Column(sqlalchemy.String(255),
# unique=True,
nullable=False,
default=lambda: str(uuid.uuid4()))
help = sqlalchemy.Column(
sqlalchemy.Text,
nullable=False,