namos.cfg RPC server enabled
Change-Id: I7f05316a69344cc286339100da3cd90447df7ceb
This commit is contained in:
parent
26c8878d12
commit
ce9913a9e5
|
@ -1,5 +1,10 @@
|
|||
[DEFAULT]
|
||||
rpc_backend = rabbit
|
||||
debug=True
|
||||
logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s [01;35m%(instance)s[00m
|
||||
logging_debug_format_suffix = [00;33mfrom (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d[00m
|
||||
logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [[00;36m-%(color)s] [01;35m%(instance)s%(color)s%(message)s[00m
|
||||
logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [[01;36m%(request_id)s [00;36m%(user)s %(tenant)s%(color)s] [01;35m%(instance)s%(color)s%(message)s[00m
|
||||
|
||||
[oslo_messaging_rabbit]
|
||||
rabbit_userid = stackrabbit
|
||||
|
|
|
@ -45,7 +45,9 @@ class HeartBeat(object):
|
|||
def report_status(self):
|
||||
# TODO(mrkanag) Make like Node: Service: worker: status
|
||||
for sw in api.service_worker_get_all(None):
|
||||
print ('[', 'T' if self.find_status(sw) else 'F', ']', sw.name)
|
||||
msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F',
|
||||
sw.name)
|
||||
print (msg)
|
||||
|
||||
|
||||
class OsloConfigSchemaManager(object):
|
||||
|
@ -108,8 +110,11 @@ class OsloConfigSchemaManager(object):
|
|||
except exception.AlreadyExist:
|
||||
_a = 'F'
|
||||
|
||||
print ('[', _a, ']', namespace, ':', grp, ':',
|
||||
name)
|
||||
msg = '[%s] %s::%s::%s' % (_a,
|
||||
namespace,
|
||||
grp,
|
||||
name)
|
||||
print (msg)
|
||||
|
||||
|
||||
class DBCommand(object):
|
||||
|
|
|
@ -80,7 +80,7 @@ def get_transport(url=None, optional=False, cache=True):
|
|||
|
||||
def get_rpc_server(host, exchange, topic, version, endpoint):
|
||||
"""Return a configured olso.messaging rpc server."""
|
||||
oslo_messaging.set_transport_defaults(exchange)
|
||||
# oslo_messaging.set_transport_defaults(exchange)
|
||||
target = oslo_messaging.Target(server=host,
|
||||
topic=topic,
|
||||
version=version)
|
||||
|
|
|
@ -21,6 +21,7 @@ from oslo_utils import timeutils
|
|||
|
||||
from namos.common import config
|
||||
from namos.common import exception
|
||||
from namos.common import messaging
|
||||
from namos.db import api as db_api
|
||||
from namos.db import openstack_drivers
|
||||
|
||||
|
@ -60,8 +61,11 @@ class ConductorManager(object):
|
|||
|
||||
@request_context
|
||||
def register_myself(self, context, registration_info):
|
||||
LOG.info("REGISTER [%s.%s] START" % (registration_info['project_name'],
|
||||
registration_info['prog_name']))
|
||||
LOG.info("REGISTER [%s.%s.%s] START" % (
|
||||
registration_info['project_name'],
|
||||
registration_info['prog_name'],
|
||||
registration_info['identification']
|
||||
))
|
||||
|
||||
# Service processing
|
||||
sp = ServiceProcessor(registration_info)
|
||||
|
@ -71,11 +75,25 @@ class ConductorManager(object):
|
|||
dp = DriverProcessor(service_worker_id,
|
||||
registration_info['config_dict'])
|
||||
dp.process_drivers(context)
|
||||
LOG.info("REGISTER [%s.%s] DONE" % (registration_info['project_name'],
|
||||
registration_info['prog_name']))
|
||||
|
||||
LOG.info("REGISTER [%s.%s.%s] DONE" % (
|
||||
registration_info['project_name'],
|
||||
registration_info['prog_name'],
|
||||
registration_info['identification']
|
||||
))
|
||||
self._regisgration_ackw(context,
|
||||
registration_info['identification'])
|
||||
return service_worker_id
|
||||
|
||||
def _regisgration_ackw(self, context, identification):
|
||||
client = messaging.get_rpc_client(topic='namos.CONF.%s' %
|
||||
identification,
|
||||
version=self.RPC_API_VERSION,
|
||||
exchange=config.PROJECT_NAME)
|
||||
client.cast(context,
|
||||
'regisgration_ackw',
|
||||
identification=identification)
|
||||
LOG.info("REGISTER [%s] ACK" % identification)
|
||||
|
||||
@request_context
|
||||
def heart_beat(self, context, identification, dieing=False):
|
||||
try:
|
||||
|
@ -215,13 +233,14 @@ class ServiceProcessor(object):
|
|||
dict(name=cfg_f,
|
||||
file=self.registration_info[
|
||||
'config_file_dict'][cfg_f],
|
||||
service_component_id=service_component.id))
|
||||
service_component_id=service_component.id,
|
||||
service_node_id=node.id))
|
||||
LOG.info('Oslo config file %s is created' % config_file)
|
||||
except exception.AlreadyExist:
|
||||
config_files = \
|
||||
db_api.config_file_get_by_name_for_service_component(
|
||||
db_api.config_file_get_by_name_for_service_node(
|
||||
context,
|
||||
service_component_id=service_component.id,
|
||||
service_node_id=node.id,
|
||||
name=cfg_f
|
||||
)
|
||||
if len(config_files) == 1:
|
||||
|
@ -288,8 +307,8 @@ class ServiceProcessor(object):
|
|||
|
||||
if len(cfg_schs) > 1:
|
||||
cfg_sche = cfg_schs[0]
|
||||
LOG.info("Config Schema %s is existing and is updated" %
|
||||
cfg_sche)
|
||||
LOG.debug("Config Schema %s is existing and is updated" %
|
||||
cfg_sche)
|
||||
else:
|
||||
try:
|
||||
cfg_sche = db_api.config_schema_create(
|
||||
|
@ -306,7 +325,7 @@ class ServiceProcessor(object):
|
|||
name=cfg_obj['name']
|
||||
)
|
||||
)
|
||||
LOG.info("Config Schema %s is created" % cfg_sche)
|
||||
LOG.debug("Config Schema %s is created" % cfg_sche)
|
||||
except exception.AlreadyExist:
|
||||
cfg_schs = db_api.config_schema_get_by(
|
||||
context=context,
|
||||
|
@ -316,8 +335,8 @@ class ServiceProcessor(object):
|
|||
)
|
||||
|
||||
cfg_sche = cfg_schs[0]
|
||||
LOG.info("Config Schema %s is existing and is updated" %
|
||||
cfg_sche)
|
||||
LOG.debug("Config Schema %s is existing and is updated" %
|
||||
cfg_sche)
|
||||
|
||||
cfg_obj_ = dict(
|
||||
service_worker_id=service_worker.id,
|
||||
|
@ -328,7 +347,7 @@ class ServiceProcessor(object):
|
|||
|
||||
try:
|
||||
config = db_api.config_create(context, cfg_obj_)
|
||||
LOG.info("Config %s is created" % config)
|
||||
LOG.debug("Config %s is created" % config)
|
||||
except exception.AlreadyExist:
|
||||
configs = db_api.config_get_by_name_for_service_worker(
|
||||
context,
|
||||
|
@ -338,7 +357,7 @@ class ServiceProcessor(object):
|
|||
config = db_api.config_update(context,
|
||||
configs[0].id,
|
||||
cfg_obj_)
|
||||
LOG.info("Config %s is existing and is updated" % config)
|
||||
LOG.debug("Config %s is existing and is updated" % config)
|
||||
|
||||
return service_worker.id
|
||||
|
||||
|
|
|
@ -397,13 +397,13 @@ def config_file_get_by_name(context, name):
|
|||
return IMPL.config_file_get_by_name(context, name)
|
||||
|
||||
|
||||
def config_file_get_by_name_for_service_component(
|
||||
def config_file_get_by_name_for_service_node(
|
||||
context,
|
||||
service_component_id,
|
||||
service_node_id,
|
||||
name=None):
|
||||
return IMPL.config_file_get_by_name_for_service_component(
|
||||
return IMPL.config_file_get_by_name_for_service_node(
|
||||
context,
|
||||
service_component_id,
|
||||
service_node_id,
|
||||
name)
|
||||
|
||||
|
||||
|
|
|
@ -629,12 +629,12 @@ def config_file_get_by_name(context, name):
|
|||
return config
|
||||
|
||||
|
||||
def config_file_get_by_name_for_service_component(
|
||||
def config_file_get_by_name_for_service_node(
|
||||
context,
|
||||
service_component_id,
|
||||
service_node_id,
|
||||
name=None):
|
||||
query = _model_query(context, models.OsloConfigFile). \
|
||||
filter_by(service_component_id=service_component_id)
|
||||
filter_by(service_node_id=service_node_id)
|
||||
if name is not None:
|
||||
query = query.filter_by(name=name)
|
||||
|
||||
|
@ -934,9 +934,9 @@ def view_360(context):
|
|||
'service_component'][srv_cmp.id] = dict()
|
||||
view['region'][rg.id]['service_node'][srv_nd.id][
|
||||
'service_component'][srv_cmp.id]['config_file'] = dict()
|
||||
cfg_fl_lst = config_file_get_by_name_for_service_component(
|
||||
cfg_fl_lst = config_file_get_by_name_for_service_node(
|
||||
context,
|
||||
service_component_id=srv_cmp.id
|
||||
service_node_id=srv_nd.id
|
||||
)
|
||||
for cfg_fl in cfg_fl_lst:
|
||||
# config file
|
||||
|
|
|
@ -22,7 +22,9 @@ from sqlalchemy import UniqueConstraint
|
|||
import uuid
|
||||
|
||||
from namos.db.sqlalchemy.types import Json
|
||||
from namos.db.sqlalchemy.types import LongText
|
||||
from namos.db.sqlalchemy.types import Uuid
|
||||
|
||||
from oslo_db.sqlalchemy import models
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
@ -376,7 +378,7 @@ class OsloConfigFile(BASE,
|
|||
Extra):
|
||||
__tablename__ = 'oslo_config_file'
|
||||
__table_args__ = (
|
||||
UniqueConstraint("name", "service_component_id"),
|
||||
UniqueConstraint("name", "service_node_id"),
|
||||
)
|
||||
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255),
|
||||
|
@ -385,10 +387,16 @@ class OsloConfigFile(BASE,
|
|||
default=lambda: str(uuid.uuid4()))
|
||||
|
||||
file = sqlalchemy.Column(
|
||||
sqlalchemy.Text
|
||||
LongText
|
||||
)
|
||||
# Always having last one updated the conf file
|
||||
service_component_id = sqlalchemy.Column(
|
||||
Uuid,
|
||||
sqlalchemy.ForeignKey('service_component.id'),
|
||||
nullable=False
|
||||
)
|
||||
service_node_id = sqlalchemy.Column(
|
||||
Uuid,
|
||||
sqlalchemy.ForeignKey('service_node.id'),
|
||||
nullable=False
|
||||
)
|
||||
|
|
|
@ -53,3 +53,13 @@ class Uuid(TypeDecorator):
|
|||
|
||||
def process_result_value(self, value, dialect):
|
||||
return value
|
||||
|
||||
|
||||
class LongText(TypeDecorator):
|
||||
impl = Text
|
||||
|
||||
def load_dialect_impl(self, dialect):
|
||||
if dialect.name == 'mysql':
|
||||
return dialect.type_descriptor(mysql.LONGTEXT())
|
||||
else:
|
||||
return self.impl
|
||||
|
|
Loading…
Reference in New Issue