diff --git a/etc/namos.conf b/etc/namos.conf index d18c998..4701427 100644 --- a/etc/namos.conf +++ b/etc/namos.conf @@ -1,10 +1,6 @@ [DEFAULT] rpc_backend = rabbit debug=True -logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s %(instance)s -logging_debug_format_suffix = from (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d -logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [-%(color)s] %(instance)s%(color)s%(message)s -logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s%(color)s] %(instance)s%(color)s%(message)s [oslo_messaging_rabbit] rabbit_userid = stackrabbit @@ -14,5 +10,8 @@ rabbit_hosts = 172.241.0.101 [database] connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8 -[conductor] +[os_manager] workers=3 + +[os_namos] +region_name=RegionTwo \ No newline at end of file diff --git a/etc/oslo-config-schema.sync b/etc/oslo-config-schema.sync index 9949d41..4da1269 100644 --- a/etc/oslo-config-schema.sync +++ b/etc/oslo-config-schema.sync @@ -1,6 +1,7 @@ # List of config generator conf files for syncing the conf with namos heat=/opt/stack/heat/config-generator.conf namos=/home/manickan/workspace/namos/openstack/namos/config-generator.conf +os_namos=/home/manickan/workspace/namos/openstack/os-namos/config-generator.conf keystone=/opt/stack/keystone/config-generator/keystone.conf neutron-bgp-dragent=/opt/stack/neutron/etc/oslo-config-generator/bgp_dragent.ini neutron-dhcp-agent=/opt/stack/neutron/etc/oslo-config-generator/dhcp_agent.ini diff --git a/namos/cmd/conductor.py b/namos/cmd/conductor.py index 32ab4a4..fcfcb6b 100644 --- a/namos/cmd/conductor.py +++ b/namos/cmd/conductor.py @@ -43,15 +43,15 @@ def main(): from namos import conductor # noqa mgr = service.RPCService( - CONF.conductor.name, + CONF.os_manager.name, config.PROJECT_NAME, manager.ConductorManager()) - launcher = os_service.launch(CONF, mgr, CONF.conductor.workers) + launcher = os_service.launch(CONF, mgr, CONF.os_manager.workers) # TODO(mrkanag) Namos is not registering the RPC backend, fix it ! - import os_namos - os_namos.register_myself() + # import os_namos + # os_namos.register_myself() launcher.wait() diff --git a/namos/cmd/manage.py b/namos/cmd/manage.py index ae82298..be7619f 100644 --- a/namos/cmd/manage.py +++ b/namos/cmd/manage.py @@ -15,10 +15,10 @@ import sys from oslo_config import cfg -from oslo_utils import timeutils from namos.common import config from namos.common import exception +from namos.common import utils from namos.db import api from namos.db import sample from namos.db.sqlalchemy import migration @@ -29,24 +29,18 @@ MANAGE_COMMAND_NAME = 'namos-manage' class HeartBeat(object): - def find_status(self, sw, report_interval=60): - status = False - if sw.updated_at is not None: - if ((timeutils.utcnow() - sw.updated_at).total_seconds() - <= report_interval): - status = True - else: - if ((timeutils.utcnow() - sw.created_at).total_seconds() - <= report_interval): - status = True - - return status - def report_status(self): # TODO(mrkanag) Make like Node: Service: worker: status for sw in api.service_worker_get_all(None): - msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F', - sw.name) + # TODO(mrkanag) Move this to db layer and query non deleted entries + if sw.deleted_at is not None: + continue + + msg = '[%s] [%s] %s %s' % ( + 'T' if sw.is_launcher else 'F', + 'T' if utils.find_status(sw) else 'F', + sw.name, + sw.host) print (msg) diff --git a/namos/common/config.py b/namos/common/config.py index 12e889c..3254e68 100644 --- a/namos/common/config.py +++ b/namos/common/config.py @@ -35,7 +35,7 @@ conductor_opts = [ def register_conductor_opts(): - CONF.register_opts(conductor_opts, 'conductor') + CONF.register_opts(conductor_opts, 'os_manager') def init_conf(prog): @@ -52,4 +52,4 @@ def init_log(project=PROJECT_NAME): def list_opts(): - yield 'conductor', conductor_opts + yield 'os_manager', conductor_opts diff --git a/namos/common/utils.py b/namos/common/utils.py new file mode 100644 index 0000000..3da88d6 --- /dev/null +++ b/namos/common/utils.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import timeutils + + +def find_status(sw, report_interval=60): + status = False + if sw.updated_at is not None: + if ((timeutils.utcnow() - sw.updated_at).total_seconds() + <= report_interval): + status = True + else: + if ((timeutils.utcnow() - sw.created_at).total_seconds() + <= report_interval): + status = True + + return status diff --git a/namos/conductor/manager.py b/namos/conductor/manager.py index 133161a..316f40f 100644 --- a/namos/conductor/manager.py +++ b/namos/conductor/manager.py @@ -22,6 +22,7 @@ from oslo_utils import timeutils from namos.common import config from namos.common import exception from namos.common import messaging +from namos.common import utils from namos.db import api as db_api from namos.db import openstack_drivers @@ -68,8 +69,10 @@ class ConductorManager(object): )) # Service processing - sp = ServiceProcessor(registration_info) - service_worker_id = sp.process_service(context) + sp = ServiceProcessor(context, + self, + registration_info) + service_component_id, service_worker_id = sp.process_service(context) # Device Driver processing dp = DriverProcessor(service_worker_id, @@ -82,6 +85,8 @@ class ConductorManager(object): )) self._regisgration_ackw(context, registration_info['identification']) + + sp.cleanup(service_component_id) return service_worker_id def _regisgration_ackw(self, context, identification): @@ -94,6 +99,22 @@ class ConductorManager(object): identification=identification) LOG.info("REGISTER [%s] ACK" % identification) + def _ping(self, context, identification): + client = messaging.get_rpc_client( + topic='namos.CONF.%s' % identification, + version=self.RPC_API_VERSION, + exchange=config.PROJECT_NAME) + try: + client.call(context, + 'ping_me', + identification=identification) + + LOG.debug("PING [%s] SUCCESSFUL" % identification) + return True + except: # noqa + LOG.debug("PING [%s] FAILED" % identification) + return False + @request_context def heart_beat(self, context, identification, dieing=False): try: @@ -164,8 +185,13 @@ class ConductorManager(object): class ServiceProcessor(object): - def __init__(self, registration_info): + def __init__(self, + context, + manager, + registration_info): self.registration_info = registration_info + self.manager = manager + self.context = context def file_to_configs(self, file_content): tmp_file_path = '/tmp/sample-namos-config.conf' @@ -191,15 +217,33 @@ class ServiceProcessor(object): return conf_dict def process_service(self, context): + # region + # If region is not provided, make it as belongs to namos's region + if not self.registration_info.get('region_name'): + self.registration_info[ + 'region_name'] = cfg.CONF.os_namos.region_name + + try: + region = db_api.region_create( + context, + dict(name=self.registration_info.get('region_name')) + ) + LOG.info('Region %s is created' % region) + except exception.AlreadyExist: + region = db_api.region_get_by_name( + context, + name=self.registration_info.get('region_name') + ) + LOG.info('Region %s is existing' % region) + # Service Node try: - # TODO(mrkanag) region_id is hard-coded, fix it ! - # user proper node name instead of fqdn + # TODO(mrkanag) user proper node name instead of fqdn node = db_api.service_node_create( context, dict(name=self.registration_info.get('fqdn'), fqdn=self.registration_info.get('fqdn'), - region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) + region_id=region.id)) LOG.info('Service node %s is created' % node) except exception.AlreadyExist: @@ -258,35 +302,17 @@ class ServiceProcessor(object): pid=self.registration_info['identification'], host=self.registration_info['host'], service_component_id=service_component.id, - deleted_at=None + deleted_at=None, + is_launcher=self.registration_info['i_am_launcher'] )) LOG.info('Service Worker %s is created' % service_worker) except exception.AlreadyExist: - # TODO(mrkanag) Find a way to purge the dead service worker - # Once each service is enabled with heart beating namos - # purging can be done once heart beat stopped. this can be - # done from openstack.common.service.py - service_workers = \ - db_api.service_worker_get_by_host_for_service_component( - context, - service_component_id=service_component.id, - host=self.registration_info['host'] - ) - if len(service_workers) == 1: - service_worker = \ - db_api.service_worker_update( - context, - service_workers[0].id, - dict( - deleted_at=None, - pid=self.registration_info['identification'], - name='%s@%s' % (self.registration_info['pid'], - service_component.name) - )) - LOG.info('Service Worker %s is existing and is updated' - % service_worker) - - # TODO(mrkanag) what to do when service_workers size is > 1 + LOG.info('Service Worker %s is existing' % + db_api.service_worker_get_all_by( + context, + pid=self.registration_info['identification'], + service_component_id=service_component.id + )[0]) # config file conf_files = dict() @@ -398,7 +424,40 @@ class ServiceProcessor(object): cfg_obj_) LOG.debug("Config %s is existing and is updated" % config) - return service_worker.id + return service_component.id, service_worker.id + + def cleanup(self, service_component_id): + # clean up the dead service workers + # TODO(mrkanag) Make this into thread + service_workers = \ + db_api.service_worker_get_all_by( + context, + service_component_id=service_component_id + ) + + for srv_wkr in service_workers: + # TODO(mrkanag) Move this to db layer and query non deleted entries + if srv_wkr.deleted_at is not None: + continue + + if utils.find_status(srv_wkr): + LOG.info('Service Worker %s is live' + % srv_wkr.id) + continue + else: + confs = db_api.config_get_by_name_for_service_worker( + self.context, + service_worker_id=srv_wkr.id + ) + + for conf in confs: + db_api.config_delete(self.context, conf.id) + LOG.debug('Config %s is deleted' + % conf.id) + + db_api.service_worker_delete(self.context, srv_wkr.id) + LOG.info('Service Worker %s is deleted' + % srv_wkr.id) class DriverProcessor(object): diff --git a/namos/db/sqlalchemy/api.py b/namos/db/sqlalchemy/api.py index a4409a0..1370d83 100644 --- a/namos/db/sqlalchemy/api.py +++ b/namos/db/sqlalchemy/api.py @@ -48,6 +48,7 @@ def get_backend(): def _model_query(context, *args): session = _session(context) query = session.query(*args) + return query @@ -98,9 +99,14 @@ def _get_all_by(context, cls, **kwargs): return results -def _delete(context, cls, _id): +def _delete(context, cls, _id, soft=True): result = _get(context, cls, _id) if result is not None: + if soft and hasattr(result, 'soft_delete'): + result.soft_delete(_session(context)) + return + # TODO(mrkanag) is it ok to hard delete when soft =True and soft_delete + # is missing result.delete(_session(context)) @@ -588,7 +594,7 @@ def config_get_by_name_for_service_worker(context, query = query.filter_by(name=name) elif only_configured: query = query.filter( - models.OsloConfig.value != models.OsloConfig.default_value) + models.OsloConfig.oslo_config_file_id is not None) return query.all() diff --git a/namos/db/sqlalchemy/models.py b/namos/db/sqlalchemy/models.py index 84d049a..431cdea 100644 --- a/namos/db/sqlalchemy/models.py +++ b/namos/db/sqlalchemy/models.py @@ -102,9 +102,10 @@ class Region(BASE, __tablename__ = 'region' # Its of type String to match with keystone region id + # TODO(mrkanag) make this as non nullable keystone_region_id = sqlalchemy.Column( sqlalchemy.String(255), - nullable=False) + nullable=True) class Device(BASE, @@ -211,9 +212,10 @@ class Service(BASE, Extra): __tablename__ = 'service' + # TODO(mrkanag) make this as non nullable keystone_service_id = sqlalchemy.Column( Uuid, - nullable=False) + nullable=True) class ServiceNode(BASE, @@ -264,7 +266,7 @@ class ServiceWorker(BASE, __tablename__ = 'service_worker' __table_args__ = ( - UniqueConstraint("host", "service_component_id"), + UniqueConstraint("pid", "service_component_id"), ) name = sqlalchemy.Column(sqlalchemy.String(255), @@ -281,6 +283,11 @@ class ServiceWorker(BASE, sqlalchemy.String(248), nullable=False ) + is_launcher = sqlalchemy.Column( + sqlalchemy.Boolean, + nullable=False, + default=False + ) service_component_id = sqlalchemy.Column( Uuid, sqlalchemy.ForeignKey('service_component.id'),