From 38476e0ab9c4fded0b6cb6a55ccff6ad8478acc1 Mon Sep 17 00:00:00 2001 From: Kanagaraj Manickam Date: Sun, 27 Mar 2016 12:19:15 +0530 Subject: [PATCH] namos.cfg RPC server enabled Change-Id: I5cf339222115ec5d1f4390421a79d42fc12ed322 --- os_namos/common/rpcapi.py | 37 ++++++++++++++++++++++++++++++++- os_namos/sync.py | 43 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/os_namos/common/rpcapi.py b/os_namos/common/rpcapi.py index 929ed0e..2386401 100644 --- a/os_namos/common/rpcapi.py +++ b/os_namos/common/rpcapi.py @@ -18,6 +18,7 @@ Client side of the OSLO CONFIG NAMOS import functools import json +from oslo_context import context import oslo_messaging from oslo_messaging import RemoteError @@ -37,12 +38,27 @@ def wrapper_function(func): return wrapped +def request_context(func): + @functools.wraps(func) + def wrapped(self, ctx, *args, **kwargs): + if ctx is not None and not isinstance(ctx, context.RequestContext): + ctx = context.RequestContext.from_dict(ctx.to_dict()) + + return func(self, ctx, *args, **kwargs) + + return wrapped + + class ConductorAPI(object): RPC_API_VERSION = '1.0' - def __init__(self, project): + def __init__(self, host, project, identification, mgr): super(ConductorAPI, self).__init__() self.topic = 'namos.conductor' + self.project = project + self.host = host + self.server_topic = identification + self.mgr = mgr # Setup the messaging tweaks ! here rpc._ALIASES.update( @@ -58,6 +74,12 @@ class ConductorAPI(object): self.client = rpc.get_rpc_client(version=self.RPC_API_VERSION, topic=self.topic) + self.server = rpc.get_rpc_server(host=self.host, + topic='namos.CONF.%s' % + identification, + endpoint=self, + version=self.RPC_API_VERSION) + @wrapper_function def register_myself(self, context, registration_info): # TODO(mrkanag): is to be call instead of cast @@ -71,3 +93,16 @@ class ConductorAPI(object): 'heart_beat', identification=identification, dieing=dieing) + + def manage_me(self): + self.server.start() + + def stop_me(self): + try: + self.server.stop() + except: # noqa + pass + + @request_context + def regisgration_ackw(self, context, identification): + self.mgr.regisgration_ackw(identification) diff --git a/os_namos/sync.py b/os_namos/sync.py index 9eb780b..f4f3e87 100644 --- a/os_namos/sync.py +++ b/os_namos/sync.py @@ -29,6 +29,7 @@ logger = log.getLogger(__name__) # one for whole service component == PID IDENTIFICATION = str(uuid.uuid4()) HEART_BEAT_STARTED = False +NAMOS_RPCSERVER_STARTED = False class RegistrationInfo(object): @@ -137,28 +138,45 @@ def collect_registration_info(): return reg_info -def register_myself(registration_info=None, start_heart_beat=True): +def register_myself(registration_info=None, + start_heart_beat=True, + start_rpc_server=True): global NAMOS_RPCAPI if registration_info is None: registration_info = collect_registration_info() + import sys + current_module = sys.modules[__name__] + if NAMOS_RPCAPI is None: NAMOS_RPCAPI = rpcapi.ConductorAPI( - project=registration_info.project_name) + project=registration_info.project_name, + host=registration_info.host, + identification=registration_info.identification, + mgr=current_module + ) ctx = context.RequestContext() NAMOS_RPCAPI.register_myself(ctx, registration_info) - logger.info("*** [%s ]Registered with Namos successfully. ***" % + logger.info("*** [%s ]Registeration with Namos started successfully. ***" % registration_info.identification) if start_heart_beat: heart_beat(registration_info.identification) + if start_rpc_server: + manage_me() return registration_info.identification +def regisgration_ackw(identification): + # TODO(mrkanag) start the heart beat here + logger.info("*** [%s ]Registeration with Namos completed successfully. ***" + % identification) + + def heart_beat(identification): global HEART_BEAT_STARTED @@ -185,6 +203,25 @@ def i_am_dieing(): True) logger.info("*** [%s] HEART-BEAT with Namos is stopping. ***" % IDENTIFICATION) + NAMOS_RPCAPI.stop_me() + logger.info("*** [%s] RPC Server for Namos is stopping. ***" % + IDENTIFICATION) + + +def manage_me(): + global NAMOS_RPCSERVER_STARTED + + if NAMOS_RPCSERVER_STARTED: + return + + NAMOS_RPCSERVER_STARTED = True + from oslo_service import loopingcall + th = loopingcall.FixedIntervalLoopingCall(NAMOS_RPCAPI.manage_me) + # TODO(mrkanag) make this periods configurable + th.start(60, 0) + + logger.info("*** [%s] RPC Server for Namos is started successfully. ***" % + IDENTIFICATION) def add_config(config):