From 4332f7eb76c28d0d3d6cec1089a060b31bdb938e Mon Sep 17 00:00:00 2001 From: Ofer Ben-Yacov Date: Tue, 10 Jan 2017 15:12:51 +0200 Subject: [PATCH] Add agent heartbeat --- wan_qos/agent/tc_manager.py | 3 +++ wan_qos/common/api.py | 9 +++++++-- wan_qos/db/wan_qos_db.py | 40 ++++++++++++++++++++++++++++++------- wan_qos/services/plugin.py | 12 +++++------ 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/wan_qos/agent/tc_manager.py b/wan_qos/agent/tc_manager.py index 4e44015..463e3dc 100644 --- a/wan_qos/agent/tc_manager.py +++ b/wan_qos/agent/tc_manager.py @@ -37,6 +37,8 @@ class TcAgentManager: self.conf = conf if not host: self.host = self.conf.host + else: + self.host = host lan_port = self.conf.WANTC.lan_port_name wan_port = self.conf.WANTC.wan_port_name self.agent.set_ports(lan_port, wan_port) @@ -62,3 +64,4 @@ class TcAgentManager: def periodic_tasks(self, context, raise_on_error=False): LOG.info("periodic task") + self.plugin_rpc.device_heartbeat(context, self.host) diff --git a/wan_qos/common/api.py b/wan_qos/common/api.py index 99bd7ed..8e1b5e0 100644 --- a/wan_qos/common/api.py +++ b/wan_qos/common/api.py @@ -33,8 +33,13 @@ class TcPluginApi(object): 'lan_port': ports['lan_port'], 'wan_port': ports['wan_port'] } - return cctxt.cast(context, 'agent_up_notification', - host_info=host_info) + cctxt.cast(context, 'agent_up_notification', + host_info=host_info) + + def device_heartbeat(self, context, host): + cctxt = self.client.prepare() + cctxt.cast(context, 'device_heartbeat', + host=host) def get_configuration_from_db(self, context): cctxt = self.client.prepare() diff --git a/wan_qos/db/wan_qos_db.py b/wan_qos/db/wan_qos_db.py index 1c42c98..c6afbe9 100644 --- a/wan_qos/db/wan_qos_db.py +++ b/wan_qos/db/wan_qos_db.py @@ -33,22 +33,48 @@ class WanTcDb(object): if not device: LOG.debug('New device connected: %s' % host_info) wan_tc_device = models.WanTcDevice( - id = uuidutils.generate_uuid(), - host = host_info['host'], - lan_port = host_info['lan_port'], - wan_port = host_info['wan_port'], - uptime = timeutils.utcnow() + id=uuidutils.generate_uuid(), + host=host_info['host'], + lan_port=host_info['lan_port'], + wan_port=host_info['wan_port'], + uptime=timeutils.utcnow() ) context.session.add(wan_tc_device) else: LOG.debug('updating uptime for device: %s' % host_info['host']) device.uptime = timeutils.utcnow() - def device_heartbeat(self, context, device_info): - pass + def device_heartbeat(self, context, host): + device = context.session.query(models.WanTcDevice).filter_by( + host=host + ).first() + if device: + with context.session.begin(subtransactions=True): + device.heartbeat_timestamp = timeutils.utcnow() + else: + LOG.error('Got heartbeat for non-existing device: %s' % host) + + def get_all_devices(self, context): + device_list = context.session.query(models.WanTcDevice).all() + device_list_dict = [] + for device in device_list: + device_list_dict.append(self._device_to_dict(device)) + + return device_list_dict def create_wan_tc_class(self, context, wan_qos_class): pass def get_all_classes(self, context): return context.session.query(models.WanTcClass).all() + + def _device_to_dict(self, device): + device_dict = { + 'host': device.host, + 'lan_port': device.lan_port, + 'wan_port': device.wan_port, + 'uptime': device.uptime, + 'heartbeat': device.heartbeat + } + + return device_dict diff --git a/wan_qos/services/plugin.py b/wan_qos/services/plugin.py index 70a067a..138e843 100644 --- a/wan_qos/services/plugin.py +++ b/wan_qos/services/plugin.py @@ -42,13 +42,17 @@ class PluginRpcCallback(object): def agent_up_notification(self, context, host_info): LOG.debug('got up notification from %s' % host_info['host']) - self.plugin.agent_up_notification(context, host_info) + self.plugin.db.agent_up_notification(context, host_info) + + def device_heartbeat(self, context, host): + self.plugin.db.device_heartbeat(context, host) class WanQosPlugin(wanqos.WanQosPluginBase): supported_extension_aliases = ["wan-tc"] def __init__(self): + self.db = wan_qos_db.WanTcDb() rpc_callback = importutils.import_object( 'wan_qos.services.plugin.PluginRpcCallback', self) endpoints = ( @@ -60,8 +64,6 @@ class WanQosPlugin(wanqos.WanQosPluginBase): fanout=False) self.conn.consume_in_threads() - self.db = wan_qos_db.WanTcDb() - def get_plugin_type(self): """Get type of the plugin.""" return constants.WANTC @@ -102,7 +104,3 @@ class WanQosPlugin(wanqos.WanQosPluginBase): else: tenant_id = context.tenant_id return tenant_id - - def agent_up_notification(self, context, host_info): - LOG.debug('agent %s is up' % host_info['host']) - self.db.agent_up_notification(context, host_info)