namos.cfg RPC server enabled

Change-Id: I5cf339222115ec5d1f4390421a79d42fc12ed322
This commit is contained in:
Kanagaraj Manickam 2016-03-27 12:19:15 +05:30
parent 320de5ff8a
commit 38476e0ab9
2 changed files with 76 additions and 4 deletions

View File

@ -18,6 +18,7 @@ Client side of the OSLO CONFIG NAMOS
import functools
import json
from oslo_context import context
import oslo_messaging
from oslo_messaging import RemoteError
@ -37,12 +38,27 @@ def wrapper_function(func):
return wrapped
def request_context(func):
@functools.wraps(func)
def wrapped(self, ctx, *args, **kwargs):
if ctx is not None and not isinstance(ctx, context.RequestContext):
ctx = context.RequestContext.from_dict(ctx.to_dict())
return func(self, ctx, *args, **kwargs)
return wrapped
class ConductorAPI(object):
RPC_API_VERSION = '1.0'
def __init__(self, project):
def __init__(self, host, project, identification, mgr):
super(ConductorAPI, self).__init__()
self.topic = 'namos.conductor'
self.project = project
self.host = host
self.server_topic = identification
self.mgr = mgr
# Setup the messaging tweaks ! here
rpc._ALIASES.update(
@ -58,6 +74,12 @@ class ConductorAPI(object):
self.client = rpc.get_rpc_client(version=self.RPC_API_VERSION,
topic=self.topic)
self.server = rpc.get_rpc_server(host=self.host,
topic='namos.CONF.%s' %
identification,
endpoint=self,
version=self.RPC_API_VERSION)
@wrapper_function
def register_myself(self, context, registration_info):
# TODO(mrkanag): is to be call instead of cast
@ -71,3 +93,16 @@ class ConductorAPI(object):
'heart_beat',
identification=identification,
dieing=dieing)
def manage_me(self):
self.server.start()
def stop_me(self):
try:
self.server.stop()
except: # noqa
pass
@request_context
def regisgration_ackw(self, context, identification):
self.mgr.regisgration_ackw(identification)

View File

@ -29,6 +29,7 @@ logger = log.getLogger(__name__)
# one for whole service component == PID
IDENTIFICATION = str(uuid.uuid4())
HEART_BEAT_STARTED = False
NAMOS_RPCSERVER_STARTED = False
class RegistrationInfo(object):
@ -137,28 +138,45 @@ def collect_registration_info():
return reg_info
def register_myself(registration_info=None, start_heart_beat=True):
def register_myself(registration_info=None,
start_heart_beat=True,
start_rpc_server=True):
global NAMOS_RPCAPI
if registration_info is None:
registration_info = collect_registration_info()
import sys
current_module = sys.modules[__name__]
if NAMOS_RPCAPI is None:
NAMOS_RPCAPI = rpcapi.ConductorAPI(
project=registration_info.project_name)
project=registration_info.project_name,
host=registration_info.host,
identification=registration_info.identification,
mgr=current_module
)
ctx = context.RequestContext()
NAMOS_RPCAPI.register_myself(ctx, registration_info)
logger.info("*** [%s ]Registered with Namos successfully. ***" %
logger.info("*** [%s ]Registeration with Namos started successfully. ***" %
registration_info.identification)
if start_heart_beat:
heart_beat(registration_info.identification)
if start_rpc_server:
manage_me()
return registration_info.identification
def regisgration_ackw(identification):
# TODO(mrkanag) start the heart beat here
logger.info("*** [%s ]Registeration with Namos completed successfully. ***"
% identification)
def heart_beat(identification):
global HEART_BEAT_STARTED
@ -185,6 +203,25 @@ def i_am_dieing():
True)
logger.info("*** [%s] HEART-BEAT with Namos is stopping. ***" %
IDENTIFICATION)
NAMOS_RPCAPI.stop_me()
logger.info("*** [%s] RPC Server for Namos is stopping. ***" %
IDENTIFICATION)
def manage_me():
global NAMOS_RPCSERVER_STARTED
if NAMOS_RPCSERVER_STARTED:
return
NAMOS_RPCSERVER_STARTED = True
from oslo_service import loopingcall
th = loopingcall.FixedIntervalLoopingCall(NAMOS_RPCAPI.manage_me)
# TODO(mrkanag) make this periods configurable
th.start(60, 0)
logger.info("*** [%s] RPC Server for Namos is started successfully. ***" %
IDENTIFICATION)
def add_config(config):