Region is added part of os_namos
Change-Id: I54e8f5ca592ae3bc91d35a6997d601e92fd83b04
This commit is contained in:
parent
59a3041e70
commit
97d84d1b45
|
@ -1,10 +1,6 @@
|
|||
[DEFAULT]
|
||||
rpc_backend = rabbit
|
||||
debug=True
|
||||
logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s [01;35m%(instance)s[00m
|
||||
logging_debug_format_suffix = [00;33mfrom (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d[00m
|
||||
logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [[00;36m-%(color)s] [01;35m%(instance)s%(color)s%(message)s[00m
|
||||
logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [[01;36m%(request_id)s [00;36m%(user)s %(tenant)s%(color)s] [01;35m%(instance)s%(color)s%(message)s[00m
|
||||
|
||||
[oslo_messaging_rabbit]
|
||||
rabbit_userid = stackrabbit
|
||||
|
@ -14,5 +10,8 @@ rabbit_hosts = 172.241.0.101
|
|||
[database]
|
||||
connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8
|
||||
|
||||
[conductor]
|
||||
[os_manager]
|
||||
workers=3
|
||||
|
||||
[os_namos]
|
||||
region_name=RegionTwo
|
|
@ -1,6 +1,7 @@
|
|||
# List of config generator conf files for syncing the conf with namos
|
||||
heat=/opt/stack/heat/config-generator.conf
|
||||
namos=/home/manickan/workspace/namos/openstack/namos/config-generator.conf
|
||||
os_namos=/home/manickan/workspace/namos/openstack/os-namos/config-generator.conf
|
||||
keystone=/opt/stack/keystone/config-generator/keystone.conf
|
||||
neutron-bgp-dragent=/opt/stack/neutron/etc/oslo-config-generator/bgp_dragent.ini
|
||||
neutron-dhcp-agent=/opt/stack/neutron/etc/oslo-config-generator/dhcp_agent.ini
|
||||
|
|
|
@ -43,15 +43,15 @@ def main():
|
|||
from namos import conductor # noqa
|
||||
|
||||
mgr = service.RPCService(
|
||||
CONF.conductor.name,
|
||||
CONF.os_manager.name,
|
||||
config.PROJECT_NAME,
|
||||
manager.ConductorManager())
|
||||
|
||||
launcher = os_service.launch(CONF, mgr, CONF.conductor.workers)
|
||||
launcher = os_service.launch(CONF, mgr, CONF.os_manager.workers)
|
||||
|
||||
# TODO(mrkanag) Namos is not registering the RPC backend, fix it !
|
||||
import os_namos
|
||||
os_namos.register_myself()
|
||||
# import os_namos
|
||||
# os_namos.register_myself()
|
||||
|
||||
launcher.wait()
|
||||
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from namos.common import config
|
||||
from namos.common import exception
|
||||
from namos.common import utils
|
||||
from namos.db import api
|
||||
from namos.db import sample
|
||||
from namos.db.sqlalchemy import migration
|
||||
|
@ -29,24 +29,18 @@ MANAGE_COMMAND_NAME = 'namos-manage'
|
|||
|
||||
|
||||
class HeartBeat(object):
|
||||
def find_status(self, sw, report_interval=60):
|
||||
status = False
|
||||
if sw.updated_at is not None:
|
||||
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
|
||||
<= report_interval):
|
||||
status = True
|
||||
else:
|
||||
if ((timeutils.utcnow() - sw.created_at).total_seconds()
|
||||
<= report_interval):
|
||||
status = True
|
||||
|
||||
return status
|
||||
|
||||
def report_status(self):
|
||||
# TODO(mrkanag) Make like Node: Service: worker: status
|
||||
for sw in api.service_worker_get_all(None):
|
||||
msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F',
|
||||
sw.name)
|
||||
# TODO(mrkanag) Move this to db layer and query non deleted entries
|
||||
if sw.deleted_at is not None:
|
||||
continue
|
||||
|
||||
msg = '[%s] [%s] %s %s' % (
|
||||
'T' if sw.is_launcher else 'F',
|
||||
'T' if utils.find_status(sw) else 'F',
|
||||
sw.name,
|
||||
sw.host)
|
||||
print (msg)
|
||||
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ conductor_opts = [
|
|||
|
||||
|
||||
def register_conductor_opts():
|
||||
CONF.register_opts(conductor_opts, 'conductor')
|
||||
CONF.register_opts(conductor_opts, 'os_manager')
|
||||
|
||||
|
||||
def init_conf(prog):
|
||||
|
@ -52,4 +52,4 @@ def init_log(project=PROJECT_NAME):
|
|||
|
||||
|
||||
def list_opts():
|
||||
yield 'conductor', conductor_opts
|
||||
yield 'os_manager', conductor_opts
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
|
||||
def find_status(sw, report_interval=60):
|
||||
status = False
|
||||
if sw.updated_at is not None:
|
||||
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
|
||||
<= report_interval):
|
||||
status = True
|
||||
else:
|
||||
if ((timeutils.utcnow() - sw.created_at).total_seconds()
|
||||
<= report_interval):
|
||||
status = True
|
||||
|
||||
return status
|
|
@ -22,6 +22,7 @@ from oslo_utils import timeutils
|
|||
from namos.common import config
|
||||
from namos.common import exception
|
||||
from namos.common import messaging
|
||||
from namos.common import utils
|
||||
from namos.db import api as db_api
|
||||
from namos.db import openstack_drivers
|
||||
|
||||
|
@ -68,8 +69,10 @@ class ConductorManager(object):
|
|||
))
|
||||
|
||||
# Service processing
|
||||
sp = ServiceProcessor(registration_info)
|
||||
service_worker_id = sp.process_service(context)
|
||||
sp = ServiceProcessor(context,
|
||||
self,
|
||||
registration_info)
|
||||
service_component_id, service_worker_id = sp.process_service(context)
|
||||
|
||||
# Device Driver processing
|
||||
dp = DriverProcessor(service_worker_id,
|
||||
|
@ -82,6 +85,8 @@ class ConductorManager(object):
|
|||
))
|
||||
self._regisgration_ackw(context,
|
||||
registration_info['identification'])
|
||||
|
||||
sp.cleanup(service_component_id)
|
||||
return service_worker_id
|
||||
|
||||
def _regisgration_ackw(self, context, identification):
|
||||
|
@ -94,6 +99,22 @@ class ConductorManager(object):
|
|||
identification=identification)
|
||||
LOG.info("REGISTER [%s] ACK" % identification)
|
||||
|
||||
def _ping(self, context, identification):
|
||||
client = messaging.get_rpc_client(
|
||||
topic='namos.CONF.%s' % identification,
|
||||
version=self.RPC_API_VERSION,
|
||||
exchange=config.PROJECT_NAME)
|
||||
try:
|
||||
client.call(context,
|
||||
'ping_me',
|
||||
identification=identification)
|
||||
|
||||
LOG.debug("PING [%s] SUCCESSFUL" % identification)
|
||||
return True
|
||||
except: # noqa
|
||||
LOG.debug("PING [%s] FAILED" % identification)
|
||||
return False
|
||||
|
||||
@request_context
|
||||
def heart_beat(self, context, identification, dieing=False):
|
||||
try:
|
||||
|
@ -164,8 +185,13 @@ class ConductorManager(object):
|
|||
|
||||
|
||||
class ServiceProcessor(object):
|
||||
def __init__(self, registration_info):
|
||||
def __init__(self,
|
||||
context,
|
||||
manager,
|
||||
registration_info):
|
||||
self.registration_info = registration_info
|
||||
self.manager = manager
|
||||
self.context = context
|
||||
|
||||
def file_to_configs(self, file_content):
|
||||
tmp_file_path = '/tmp/sample-namos-config.conf'
|
||||
|
@ -191,15 +217,33 @@ class ServiceProcessor(object):
|
|||
return conf_dict
|
||||
|
||||
def process_service(self, context):
|
||||
# region
|
||||
# If region is not provided, make it as belongs to namos's region
|
||||
if not self.registration_info.get('region_name'):
|
||||
self.registration_info[
|
||||
'region_name'] = cfg.CONF.os_namos.region_name
|
||||
|
||||
try:
|
||||
region = db_api.region_create(
|
||||
context,
|
||||
dict(name=self.registration_info.get('region_name'))
|
||||
)
|
||||
LOG.info('Region %s is created' % region)
|
||||
except exception.AlreadyExist:
|
||||
region = db_api.region_get_by_name(
|
||||
context,
|
||||
name=self.registration_info.get('region_name')
|
||||
)
|
||||
LOG.info('Region %s is existing' % region)
|
||||
|
||||
# Service Node
|
||||
try:
|
||||
# TODO(mrkanag) region_id is hard-coded, fix it !
|
||||
# user proper node name instead of fqdn
|
||||
# TODO(mrkanag) user proper node name instead of fqdn
|
||||
node = db_api.service_node_create(
|
||||
context,
|
||||
dict(name=self.registration_info.get('fqdn'),
|
||||
fqdn=self.registration_info.get('fqdn'),
|
||||
region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0'))
|
||||
region_id=region.id))
|
||||
|
||||
LOG.info('Service node %s is created' % node)
|
||||
except exception.AlreadyExist:
|
||||
|
@ -258,35 +302,17 @@ class ServiceProcessor(object):
|
|||
pid=self.registration_info['identification'],
|
||||
host=self.registration_info['host'],
|
||||
service_component_id=service_component.id,
|
||||
deleted_at=None
|
||||
deleted_at=None,
|
||||
is_launcher=self.registration_info['i_am_launcher']
|
||||
))
|
||||
LOG.info('Service Worker %s is created' % service_worker)
|
||||
except exception.AlreadyExist:
|
||||
# TODO(mrkanag) Find a way to purge the dead service worker
|
||||
# Once each service is enabled with heart beating namos
|
||||
# purging can be done once heart beat stopped. this can be
|
||||
# done from openstack.common.service.py
|
||||
service_workers = \
|
||||
db_api.service_worker_get_by_host_for_service_component(
|
||||
context,
|
||||
service_component_id=service_component.id,
|
||||
host=self.registration_info['host']
|
||||
)
|
||||
if len(service_workers) == 1:
|
||||
service_worker = \
|
||||
db_api.service_worker_update(
|
||||
context,
|
||||
service_workers[0].id,
|
||||
dict(
|
||||
deleted_at=None,
|
||||
pid=self.registration_info['identification'],
|
||||
name='%s@%s' % (self.registration_info['pid'],
|
||||
service_component.name)
|
||||
))
|
||||
LOG.info('Service Worker %s is existing and is updated'
|
||||
% service_worker)
|
||||
|
||||
# TODO(mrkanag) what to do when service_workers size is > 1
|
||||
LOG.info('Service Worker %s is existing' %
|
||||
db_api.service_worker_get_all_by(
|
||||
context,
|
||||
pid=self.registration_info['identification'],
|
||||
service_component_id=service_component.id
|
||||
)[0])
|
||||
|
||||
# config file
|
||||
conf_files = dict()
|
||||
|
@ -398,7 +424,40 @@ class ServiceProcessor(object):
|
|||
cfg_obj_)
|
||||
LOG.debug("Config %s is existing and is updated" % config)
|
||||
|
||||
return service_worker.id
|
||||
return service_component.id, service_worker.id
|
||||
|
||||
def cleanup(self, service_component_id):
|
||||
# clean up the dead service workers
|
||||
# TODO(mrkanag) Make this into thread
|
||||
service_workers = \
|
||||
db_api.service_worker_get_all_by(
|
||||
context,
|
||||
service_component_id=service_component_id
|
||||
)
|
||||
|
||||
for srv_wkr in service_workers:
|
||||
# TODO(mrkanag) Move this to db layer and query non deleted entries
|
||||
if srv_wkr.deleted_at is not None:
|
||||
continue
|
||||
|
||||
if utils.find_status(srv_wkr):
|
||||
LOG.info('Service Worker %s is live'
|
||||
% srv_wkr.id)
|
||||
continue
|
||||
else:
|
||||
confs = db_api.config_get_by_name_for_service_worker(
|
||||
self.context,
|
||||
service_worker_id=srv_wkr.id
|
||||
)
|
||||
|
||||
for conf in confs:
|
||||
db_api.config_delete(self.context, conf.id)
|
||||
LOG.debug('Config %s is deleted'
|
||||
% conf.id)
|
||||
|
||||
db_api.service_worker_delete(self.context, srv_wkr.id)
|
||||
LOG.info('Service Worker %s is deleted'
|
||||
% srv_wkr.id)
|
||||
|
||||
|
||||
class DriverProcessor(object):
|
||||
|
|
|
@ -48,6 +48,7 @@ def get_backend():
|
|||
def _model_query(context, *args):
|
||||
session = _session(context)
|
||||
query = session.query(*args)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
|
@ -98,9 +99,14 @@ def _get_all_by(context, cls, **kwargs):
|
|||
return results
|
||||
|
||||
|
||||
def _delete(context, cls, _id):
|
||||
def _delete(context, cls, _id, soft=True):
|
||||
result = _get(context, cls, _id)
|
||||
if result is not None:
|
||||
if soft and hasattr(result, 'soft_delete'):
|
||||
result.soft_delete(_session(context))
|
||||
return
|
||||
# TODO(mrkanag) is it ok to hard delete when soft =True and soft_delete
|
||||
# is missing
|
||||
result.delete(_session(context))
|
||||
|
||||
|
||||
|
@ -588,7 +594,7 @@ def config_get_by_name_for_service_worker(context,
|
|||
query = query.filter_by(name=name)
|
||||
elif only_configured:
|
||||
query = query.filter(
|
||||
models.OsloConfig.value != models.OsloConfig.default_value)
|
||||
models.OsloConfig.oslo_config_file_id is not None)
|
||||
return query.all()
|
||||
|
||||
|
||||
|
|
|
@ -102,9 +102,10 @@ class Region(BASE,
|
|||
__tablename__ = 'region'
|
||||
|
||||
# Its of type String to match with keystone region id
|
||||
# TODO(mrkanag) make this as non nullable
|
||||
keystone_region_id = sqlalchemy.Column(
|
||||
sqlalchemy.String(255),
|
||||
nullable=False)
|
||||
nullable=True)
|
||||
|
||||
|
||||
class Device(BASE,
|
||||
|
@ -211,9 +212,10 @@ class Service(BASE,
|
|||
Extra):
|
||||
__tablename__ = 'service'
|
||||
|
||||
# TODO(mrkanag) make this as non nullable
|
||||
keystone_service_id = sqlalchemy.Column(
|
||||
Uuid,
|
||||
nullable=False)
|
||||
nullable=True)
|
||||
|
||||
|
||||
class ServiceNode(BASE,
|
||||
|
@ -264,7 +266,7 @@ class ServiceWorker(BASE,
|
|||
__tablename__ = 'service_worker'
|
||||
|
||||
__table_args__ = (
|
||||
UniqueConstraint("host", "service_component_id"),
|
||||
UniqueConstraint("pid", "service_component_id"),
|
||||
)
|
||||
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255),
|
||||
|
@ -281,6 +283,11 @@ class ServiceWorker(BASE,
|
|||
sqlalchemy.String(248),
|
||||
nullable=False
|
||||
)
|
||||
is_launcher = sqlalchemy.Column(
|
||||
sqlalchemy.Boolean,
|
||||
nullable=False,
|
||||
default=False
|
||||
)
|
||||
service_component_id = sqlalchemy.Column(
|
||||
Uuid,
|
||||
sqlalchemy.ForeignKey('service_component.id'),
|
||||
|
|
Loading…
Reference in New Issue