Region is added part of os_namos

Change-Id: I54e8f5ca592ae3bc91d35a6997d601e92fd83b04
This commit is contained in:
Kanagaraj Manickam 2016-03-28 14:44:53 +05:30
parent 59a3041e70
commit 97d84d1b45
9 changed files with 160 additions and 65 deletions

View File

@ -1,10 +1,6 @@
[DEFAULT]
rpc_backend = rabbit
debug=True
logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s %(instance)s
logging_debug_format_suffix = from (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d
logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [-%(color)s] %(instance)s%(color)s%(message)s
logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s%(color)s] %(instance)s%(color)s%(message)s
[oslo_messaging_rabbit]
rabbit_userid = stackrabbit
@ -14,5 +10,8 @@ rabbit_hosts = 172.241.0.101
[database]
connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8
[conductor]
[os_manager]
workers=3
[os_namos]
region_name=RegionTwo

View File

@ -1,6 +1,7 @@
# List of config generator conf files for syncing the conf with namos
heat=/opt/stack/heat/config-generator.conf
namos=/home/manickan/workspace/namos/openstack/namos/config-generator.conf
os_namos=/home/manickan/workspace/namos/openstack/os-namos/config-generator.conf
keystone=/opt/stack/keystone/config-generator/keystone.conf
neutron-bgp-dragent=/opt/stack/neutron/etc/oslo-config-generator/bgp_dragent.ini
neutron-dhcp-agent=/opt/stack/neutron/etc/oslo-config-generator/dhcp_agent.ini

View File

@ -43,15 +43,15 @@ def main():
from namos import conductor # noqa
mgr = service.RPCService(
CONF.conductor.name,
CONF.os_manager.name,
config.PROJECT_NAME,
manager.ConductorManager())
launcher = os_service.launch(CONF, mgr, CONF.conductor.workers)
launcher = os_service.launch(CONF, mgr, CONF.os_manager.workers)
# TODO(mrkanag) Namos is not registering the RPC backend, fix it !
import os_namos
os_namos.register_myself()
# import os_namos
# os_namos.register_myself()
launcher.wait()

View File

@ -15,10 +15,10 @@
import sys
from oslo_config import cfg
from oslo_utils import timeutils
from namos.common import config
from namos.common import exception
from namos.common import utils
from namos.db import api
from namos.db import sample
from namos.db.sqlalchemy import migration
@ -29,24 +29,18 @@ MANAGE_COMMAND_NAME = 'namos-manage'
class HeartBeat(object):
def find_status(self, sw, report_interval=60):
status = False
if sw.updated_at is not None:
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
<= report_interval):
status = True
else:
if ((timeutils.utcnow() - sw.created_at).total_seconds()
<= report_interval):
status = True
return status
def report_status(self):
# TODO(mrkanag) Make like Node: Service: worker: status
for sw in api.service_worker_get_all(None):
msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F',
sw.name)
# TODO(mrkanag) Move this to db layer and query non deleted entries
if sw.deleted_at is not None:
continue
msg = '[%s] [%s] %s %s' % (
'T' if sw.is_launcher else 'F',
'T' if utils.find_status(sw) else 'F',
sw.name,
sw.host)
print (msg)

View File

@ -35,7 +35,7 @@ conductor_opts = [
def register_conductor_opts():
CONF.register_opts(conductor_opts, 'conductor')
CONF.register_opts(conductor_opts, 'os_manager')
def init_conf(prog):
@ -52,4 +52,4 @@ def init_log(project=PROJECT_NAME):
def list_opts():
yield 'conductor', conductor_opts
yield 'os_manager', conductor_opts

29
namos/common/utils.py Normal file
View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
def find_status(sw, report_interval=60):
status = False
if sw.updated_at is not None:
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
<= report_interval):
status = True
else:
if ((timeutils.utcnow() - sw.created_at).total_seconds()
<= report_interval):
status = True
return status

View File

@ -22,6 +22,7 @@ from oslo_utils import timeutils
from namos.common import config
from namos.common import exception
from namos.common import messaging
from namos.common import utils
from namos.db import api as db_api
from namos.db import openstack_drivers
@ -68,8 +69,10 @@ class ConductorManager(object):
))
# Service processing
sp = ServiceProcessor(registration_info)
service_worker_id = sp.process_service(context)
sp = ServiceProcessor(context,
self,
registration_info)
service_component_id, service_worker_id = sp.process_service(context)
# Device Driver processing
dp = DriverProcessor(service_worker_id,
@ -82,6 +85,8 @@ class ConductorManager(object):
))
self._regisgration_ackw(context,
registration_info['identification'])
sp.cleanup(service_component_id)
return service_worker_id
def _regisgration_ackw(self, context, identification):
@ -94,6 +99,22 @@ class ConductorManager(object):
identification=identification)
LOG.info("REGISTER [%s] ACK" % identification)
def _ping(self, context, identification):
client = messaging.get_rpc_client(
topic='namos.CONF.%s' % identification,
version=self.RPC_API_VERSION,
exchange=config.PROJECT_NAME)
try:
client.call(context,
'ping_me',
identification=identification)
LOG.debug("PING [%s] SUCCESSFUL" % identification)
return True
except: # noqa
LOG.debug("PING [%s] FAILED" % identification)
return False
@request_context
def heart_beat(self, context, identification, dieing=False):
try:
@ -164,8 +185,13 @@ class ConductorManager(object):
class ServiceProcessor(object):
def __init__(self, registration_info):
def __init__(self,
context,
manager,
registration_info):
self.registration_info = registration_info
self.manager = manager
self.context = context
def file_to_configs(self, file_content):
tmp_file_path = '/tmp/sample-namos-config.conf'
@ -191,15 +217,33 @@ class ServiceProcessor(object):
return conf_dict
def process_service(self, context):
# region
# If region is not provided, make it as belongs to namos's region
if not self.registration_info.get('region_name'):
self.registration_info[
'region_name'] = cfg.CONF.os_namos.region_name
try:
region = db_api.region_create(
context,
dict(name=self.registration_info.get('region_name'))
)
LOG.info('Region %s is created' % region)
except exception.AlreadyExist:
region = db_api.region_get_by_name(
context,
name=self.registration_info.get('region_name')
)
LOG.info('Region %s is existing' % region)
# Service Node
try:
# TODO(mrkanag) region_id is hard-coded, fix it !
# user proper node name instead of fqdn
# TODO(mrkanag) user proper node name instead of fqdn
node = db_api.service_node_create(
context,
dict(name=self.registration_info.get('fqdn'),
fqdn=self.registration_info.get('fqdn'),
region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0'))
region_id=region.id))
LOG.info('Service node %s is created' % node)
except exception.AlreadyExist:
@ -258,35 +302,17 @@ class ServiceProcessor(object):
pid=self.registration_info['identification'],
host=self.registration_info['host'],
service_component_id=service_component.id,
deleted_at=None
deleted_at=None,
is_launcher=self.registration_info['i_am_launcher']
))
LOG.info('Service Worker %s is created' % service_worker)
except exception.AlreadyExist:
# TODO(mrkanag) Find a way to purge the dead service worker
# Once each service is enabled with heart beating namos
# purging can be done once heart beat stopped. this can be
# done from openstack.common.service.py
service_workers = \
db_api.service_worker_get_by_host_for_service_component(
context,
service_component_id=service_component.id,
host=self.registration_info['host']
)
if len(service_workers) == 1:
service_worker = \
db_api.service_worker_update(
context,
service_workers[0].id,
dict(
deleted_at=None,
pid=self.registration_info['identification'],
name='%s@%s' % (self.registration_info['pid'],
service_component.name)
))
LOG.info('Service Worker %s is existing and is updated'
% service_worker)
# TODO(mrkanag) what to do when service_workers size is > 1
LOG.info('Service Worker %s is existing' %
db_api.service_worker_get_all_by(
context,
pid=self.registration_info['identification'],
service_component_id=service_component.id
)[0])
# config file
conf_files = dict()
@ -398,7 +424,40 @@ class ServiceProcessor(object):
cfg_obj_)
LOG.debug("Config %s is existing and is updated" % config)
return service_worker.id
return service_component.id, service_worker.id
def cleanup(self, service_component_id):
# clean up the dead service workers
# TODO(mrkanag) Make this into thread
service_workers = \
db_api.service_worker_get_all_by(
context,
service_component_id=service_component_id
)
for srv_wkr in service_workers:
# TODO(mrkanag) Move this to db layer and query non deleted entries
if srv_wkr.deleted_at is not None:
continue
if utils.find_status(srv_wkr):
LOG.info('Service Worker %s is live'
% srv_wkr.id)
continue
else:
confs = db_api.config_get_by_name_for_service_worker(
self.context,
service_worker_id=srv_wkr.id
)
for conf in confs:
db_api.config_delete(self.context, conf.id)
LOG.debug('Config %s is deleted'
% conf.id)
db_api.service_worker_delete(self.context, srv_wkr.id)
LOG.info('Service Worker %s is deleted'
% srv_wkr.id)
class DriverProcessor(object):

View File

@ -48,6 +48,7 @@ def get_backend():
def _model_query(context, *args):
session = _session(context)
query = session.query(*args)
return query
@ -98,9 +99,14 @@ def _get_all_by(context, cls, **kwargs):
return results
def _delete(context, cls, _id):
def _delete(context, cls, _id, soft=True):
result = _get(context, cls, _id)
if result is not None:
if soft and hasattr(result, 'soft_delete'):
result.soft_delete(_session(context))
return
# TODO(mrkanag) is it ok to hard delete when soft =True and soft_delete
# is missing
result.delete(_session(context))
@ -588,7 +594,7 @@ def config_get_by_name_for_service_worker(context,
query = query.filter_by(name=name)
elif only_configured:
query = query.filter(
models.OsloConfig.value != models.OsloConfig.default_value)
models.OsloConfig.oslo_config_file_id is not None)
return query.all()

View File

@ -102,9 +102,10 @@ class Region(BASE,
__tablename__ = 'region'
# Its of type String to match with keystone region id
# TODO(mrkanag) make this as non nullable
keystone_region_id = sqlalchemy.Column(
sqlalchemy.String(255),
nullable=False)
nullable=True)
class Device(BASE,
@ -211,9 +212,10 @@ class Service(BASE,
Extra):
__tablename__ = 'service'
# TODO(mrkanag) make this as non nullable
keystone_service_id = sqlalchemy.Column(
Uuid,
nullable=False)
nullable=True)
class ServiceNode(BASE,
@ -264,7 +266,7 @@ class ServiceWorker(BASE,
__tablename__ = 'service_worker'
__table_args__ = (
UniqueConstraint("host", "service_component_id"),
UniqueConstraint("pid", "service_component_id"),
)
name = sqlalchemy.Column(sqlalchemy.String(255),
@ -281,6 +283,11 @@ class ServiceWorker(BASE,
sqlalchemy.String(248),
nullable=False
)
is_launcher = sqlalchemy.Column(
sqlalchemy.Boolean,
nullable=False,
default=False
)
service_component_id = sqlalchemy.Column(
Uuid,
sqlalchemy.ForeignKey('service_component.id'),