namos.cfg RPC server enabled
Change-Id: I5cf339222115ec5d1f4390421a79d42fc12ed322
This commit is contained in:
parent
320de5ff8a
commit
38476e0ab9
|
@ -18,6 +18,7 @@ Client side of the OSLO CONFIG NAMOS
|
|||
import functools
|
||||
|
||||
import json
|
||||
from oslo_context import context
|
||||
import oslo_messaging
|
||||
from oslo_messaging import RemoteError
|
||||
|
||||
|
@ -37,12 +38,27 @@ def wrapper_function(func):
|
|||
return wrapped
|
||||
|
||||
|
||||
def request_context(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(self, ctx, *args, **kwargs):
|
||||
if ctx is not None and not isinstance(ctx, context.RequestContext):
|
||||
ctx = context.RequestContext.from_dict(ctx.to_dict())
|
||||
|
||||
return func(self, ctx, *args, **kwargs)
|
||||
|
||||
return wrapped
|
||||
|
||||
|
||||
class ConductorAPI(object):
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, project):
|
||||
def __init__(self, host, project, identification, mgr):
|
||||
super(ConductorAPI, self).__init__()
|
||||
self.topic = 'namos.conductor'
|
||||
self.project = project
|
||||
self.host = host
|
||||
self.server_topic = identification
|
||||
self.mgr = mgr
|
||||
|
||||
# Setup the messaging tweaks ! here
|
||||
rpc._ALIASES.update(
|
||||
|
@ -58,6 +74,12 @@ class ConductorAPI(object):
|
|||
self.client = rpc.get_rpc_client(version=self.RPC_API_VERSION,
|
||||
topic=self.topic)
|
||||
|
||||
self.server = rpc.get_rpc_server(host=self.host,
|
||||
topic='namos.CONF.%s' %
|
||||
identification,
|
||||
endpoint=self,
|
||||
version=self.RPC_API_VERSION)
|
||||
|
||||
@wrapper_function
|
||||
def register_myself(self, context, registration_info):
|
||||
# TODO(mrkanag): is to be call instead of cast
|
||||
|
@ -71,3 +93,16 @@ class ConductorAPI(object):
|
|||
'heart_beat',
|
||||
identification=identification,
|
||||
dieing=dieing)
|
||||
|
||||
def manage_me(self):
|
||||
self.server.start()
|
||||
|
||||
def stop_me(self):
|
||||
try:
|
||||
self.server.stop()
|
||||
except: # noqa
|
||||
pass
|
||||
|
||||
@request_context
|
||||
def regisgration_ackw(self, context, identification):
|
||||
self.mgr.regisgration_ackw(identification)
|
||||
|
|
|
@ -29,6 +29,7 @@ logger = log.getLogger(__name__)
|
|||
# one for whole service component == PID
|
||||
IDENTIFICATION = str(uuid.uuid4())
|
||||
HEART_BEAT_STARTED = False
|
||||
NAMOS_RPCSERVER_STARTED = False
|
||||
|
||||
|
||||
class RegistrationInfo(object):
|
||||
|
@ -137,28 +138,45 @@ def collect_registration_info():
|
|||
return reg_info
|
||||
|
||||
|
||||
def register_myself(registration_info=None, start_heart_beat=True):
|
||||
def register_myself(registration_info=None,
|
||||
start_heart_beat=True,
|
||||
start_rpc_server=True):
|
||||
global NAMOS_RPCAPI
|
||||
|
||||
if registration_info is None:
|
||||
registration_info = collect_registration_info()
|
||||
|
||||
import sys
|
||||
current_module = sys.modules[__name__]
|
||||
|
||||
if NAMOS_RPCAPI is None:
|
||||
NAMOS_RPCAPI = rpcapi.ConductorAPI(
|
||||
project=registration_info.project_name)
|
||||
project=registration_info.project_name,
|
||||
host=registration_info.host,
|
||||
identification=registration_info.identification,
|
||||
mgr=current_module
|
||||
)
|
||||
|
||||
ctx = context.RequestContext()
|
||||
NAMOS_RPCAPI.register_myself(ctx, registration_info)
|
||||
|
||||
logger.info("*** [%s ]Registered with Namos successfully. ***" %
|
||||
logger.info("*** [%s ]Registeration with Namos started successfully. ***" %
|
||||
registration_info.identification)
|
||||
|
||||
if start_heart_beat:
|
||||
heart_beat(registration_info.identification)
|
||||
if start_rpc_server:
|
||||
manage_me()
|
||||
|
||||
return registration_info.identification
|
||||
|
||||
|
||||
def regisgration_ackw(identification):
|
||||
# TODO(mrkanag) start the heart beat here
|
||||
logger.info("*** [%s ]Registeration with Namos completed successfully. ***"
|
||||
% identification)
|
||||
|
||||
|
||||
def heart_beat(identification):
|
||||
global HEART_BEAT_STARTED
|
||||
|
||||
|
@ -185,6 +203,25 @@ def i_am_dieing():
|
|||
True)
|
||||
logger.info("*** [%s] HEART-BEAT with Namos is stopping. ***" %
|
||||
IDENTIFICATION)
|
||||
NAMOS_RPCAPI.stop_me()
|
||||
logger.info("*** [%s] RPC Server for Namos is stopping. ***" %
|
||||
IDENTIFICATION)
|
||||
|
||||
|
||||
def manage_me():
|
||||
global NAMOS_RPCSERVER_STARTED
|
||||
|
||||
if NAMOS_RPCSERVER_STARTED:
|
||||
return
|
||||
|
||||
NAMOS_RPCSERVER_STARTED = True
|
||||
from oslo_service import loopingcall
|
||||
th = loopingcall.FixedIntervalLoopingCall(NAMOS_RPCAPI.manage_me)
|
||||
# TODO(mrkanag) make this periods configurable
|
||||
th.start(60, 0)
|
||||
|
||||
logger.info("*** [%s] RPC Server for Namos is started successfully. ***" %
|
||||
IDENTIFICATION)
|
||||
|
||||
|
||||
def add_config(config):
|
||||
|
|
Loading…
Reference in New Issue