namos.cfg RPC server enabled

Change-Id: I7f05316a69344cc286339100da3cd90447df7ceb
This commit is contained in:
Kanagaraj Manickam 2016-03-27 12:21:31 +05:30
parent 26c8878d12
commit ce9913a9e5
8 changed files with 77 additions and 30 deletions

View File

@ -1,5 +1,10 @@
[DEFAULT]
rpc_backend = rabbit
debug=True
logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s %(instance)s
logging_debug_format_suffix = from (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d
logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [-%(color)s] %(instance)s%(color)s%(message)s
logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s%(color)s] %(instance)s%(color)s%(message)s
[oslo_messaging_rabbit]
rabbit_userid = stackrabbit

View File

@ -45,7 +45,9 @@ class HeartBeat(object):
def report_status(self):
# TODO(mrkanag) Make like Node: Service: worker: status
for sw in api.service_worker_get_all(None):
print ('[', 'T' if self.find_status(sw) else 'F', ']', sw.name)
msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F',
sw.name)
print (msg)
class OsloConfigSchemaManager(object):
@ -108,8 +110,11 @@ class OsloConfigSchemaManager(object):
except exception.AlreadyExist:
_a = 'F'
print ('[', _a, ']', namespace, ':', grp, ':',
name)
msg = '[%s] %s::%s::%s' % (_a,
namespace,
grp,
name)
print (msg)
class DBCommand(object):

View File

@ -80,7 +80,7 @@ def get_transport(url=None, optional=False, cache=True):
def get_rpc_server(host, exchange, topic, version, endpoint):
"""Return a configured olso.messaging rpc server."""
oslo_messaging.set_transport_defaults(exchange)
# oslo_messaging.set_transport_defaults(exchange)
target = oslo_messaging.Target(server=host,
topic=topic,
version=version)

View File

@ -21,6 +21,7 @@ from oslo_utils import timeutils
from namos.common import config
from namos.common import exception
from namos.common import messaging
from namos.db import api as db_api
from namos.db import openstack_drivers
@ -60,8 +61,11 @@ class ConductorManager(object):
@request_context
def register_myself(self, context, registration_info):
LOG.info("REGISTER [%s.%s] START" % (registration_info['project_name'],
registration_info['prog_name']))
LOG.info("REGISTER [%s.%s.%s] START" % (
registration_info['project_name'],
registration_info['prog_name'],
registration_info['identification']
))
# Service processing
sp = ServiceProcessor(registration_info)
@ -71,11 +75,25 @@ class ConductorManager(object):
dp = DriverProcessor(service_worker_id,
registration_info['config_dict'])
dp.process_drivers(context)
LOG.info("REGISTER [%s.%s] DONE" % (registration_info['project_name'],
registration_info['prog_name']))
LOG.info("REGISTER [%s.%s.%s] DONE" % (
registration_info['project_name'],
registration_info['prog_name'],
registration_info['identification']
))
self._regisgration_ackw(context,
registration_info['identification'])
return service_worker_id
def _regisgration_ackw(self, context, identification):
client = messaging.get_rpc_client(topic='namos.CONF.%s' %
identification,
version=self.RPC_API_VERSION,
exchange=config.PROJECT_NAME)
client.cast(context,
'regisgration_ackw',
identification=identification)
LOG.info("REGISTER [%s] ACK" % identification)
@request_context
def heart_beat(self, context, identification, dieing=False):
try:
@ -215,13 +233,14 @@ class ServiceProcessor(object):
dict(name=cfg_f,
file=self.registration_info[
'config_file_dict'][cfg_f],
service_component_id=service_component.id))
service_component_id=service_component.id,
service_node_id=node.id))
LOG.info('Oslo config file %s is created' % config_file)
except exception.AlreadyExist:
config_files = \
db_api.config_file_get_by_name_for_service_component(
db_api.config_file_get_by_name_for_service_node(
context,
service_component_id=service_component.id,
service_node_id=node.id,
name=cfg_f
)
if len(config_files) == 1:
@ -288,8 +307,8 @@ class ServiceProcessor(object):
if len(cfg_schs) > 1:
cfg_sche = cfg_schs[0]
LOG.info("Config Schema %s is existing and is updated" %
cfg_sche)
LOG.debug("Config Schema %s is existing and is updated" %
cfg_sche)
else:
try:
cfg_sche = db_api.config_schema_create(
@ -306,7 +325,7 @@ class ServiceProcessor(object):
name=cfg_obj['name']
)
)
LOG.info("Config Schema %s is created" % cfg_sche)
LOG.debug("Config Schema %s is created" % cfg_sche)
except exception.AlreadyExist:
cfg_schs = db_api.config_schema_get_by(
context=context,
@ -316,8 +335,8 @@ class ServiceProcessor(object):
)
cfg_sche = cfg_schs[0]
LOG.info("Config Schema %s is existing and is updated" %
cfg_sche)
LOG.debug("Config Schema %s is existing and is updated" %
cfg_sche)
cfg_obj_ = dict(
service_worker_id=service_worker.id,
@ -328,7 +347,7 @@ class ServiceProcessor(object):
try:
config = db_api.config_create(context, cfg_obj_)
LOG.info("Config %s is created" % config)
LOG.debug("Config %s is created" % config)
except exception.AlreadyExist:
configs = db_api.config_get_by_name_for_service_worker(
context,
@ -338,7 +357,7 @@ class ServiceProcessor(object):
config = db_api.config_update(context,
configs[0].id,
cfg_obj_)
LOG.info("Config %s is existing and is updated" % config)
LOG.debug("Config %s is existing and is updated" % config)
return service_worker.id

View File

@ -397,13 +397,13 @@ def config_file_get_by_name(context, name):
return IMPL.config_file_get_by_name(context, name)
def config_file_get_by_name_for_service_component(
def config_file_get_by_name_for_service_node(
context,
service_component_id,
service_node_id,
name=None):
return IMPL.config_file_get_by_name_for_service_component(
return IMPL.config_file_get_by_name_for_service_node(
context,
service_component_id,
service_node_id,
name)

View File

@ -629,12 +629,12 @@ def config_file_get_by_name(context, name):
return config
def config_file_get_by_name_for_service_component(
def config_file_get_by_name_for_service_node(
context,
service_component_id,
service_node_id,
name=None):
query = _model_query(context, models.OsloConfigFile). \
filter_by(service_component_id=service_component_id)
filter_by(service_node_id=service_node_id)
if name is not None:
query = query.filter_by(name=name)
@ -934,9 +934,9 @@ def view_360(context):
'service_component'][srv_cmp.id] = dict()
view['region'][rg.id]['service_node'][srv_nd.id][
'service_component'][srv_cmp.id]['config_file'] = dict()
cfg_fl_lst = config_file_get_by_name_for_service_component(
cfg_fl_lst = config_file_get_by_name_for_service_node(
context,
service_component_id=srv_cmp.id
service_node_id=srv_nd.id
)
for cfg_fl in cfg_fl_lst:
# config file

View File

@ -22,7 +22,9 @@ from sqlalchemy import UniqueConstraint
import uuid
from namos.db.sqlalchemy.types import Json
from namos.db.sqlalchemy.types import LongText
from namos.db.sqlalchemy.types import Uuid
from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
@ -376,7 +378,7 @@ class OsloConfigFile(BASE,
Extra):
__tablename__ = 'oslo_config_file'
__table_args__ = (
UniqueConstraint("name", "service_component_id"),
UniqueConstraint("name", "service_node_id"),
)
name = sqlalchemy.Column(sqlalchemy.String(255),
@ -385,10 +387,16 @@ class OsloConfigFile(BASE,
default=lambda: str(uuid.uuid4()))
file = sqlalchemy.Column(
sqlalchemy.Text
LongText
)
# Always having last one updated the conf file
service_component_id = sqlalchemy.Column(
Uuid,
sqlalchemy.ForeignKey('service_component.id'),
nullable=False
)
service_node_id = sqlalchemy.Column(
Uuid,
sqlalchemy.ForeignKey('service_node.id'),
nullable=False
)

View File

@ -53,3 +53,13 @@ class Uuid(TypeDecorator):
def process_result_value(self, value, dialect):
return value
class LongText(TypeDecorator):
impl = Text
def load_dialect_impl(self, dialect):
if dialect.name == 'mysql':
return dialect.type_descriptor(mysql.LONGTEXT())
else:
return self.impl