use IDLs built-in notify for ovsdb monitor
Change-Id: I13d45fc1e75ee080b48f979ebf8380489de3d8ed Closes-Bug: 1564766 Closes-Bug: 1563708
This commit is contained in:
parent
f5fcf49fb9
commit
d08b5ae03d
|
@ -17,3 +17,4 @@ OVS_VM_INTERFACE = "vm"
|
|||
OVS_BRIDGE_INTERFACE = "bridge"
|
||||
OVS_PATCH_INTERFACE = "patch"
|
||||
OVS_TUNNEL_INTERFACE = "tunnel"
|
||||
OVS_UNKNOWN_INTERFACE = "unknown"
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from dragonflow.common import constants
|
||||
|
||||
|
||||
class SwitchApi(object):
|
||||
|
||||
|
@ -75,6 +77,46 @@ class LocalInterface(object):
|
|||
self.remote_ip = ""
|
||||
self.tunnel_type = ""
|
||||
|
||||
@classmethod
|
||||
def _get_interface_type(cls, row):
|
||||
interface_type = row.type
|
||||
interface_name = row.name
|
||||
|
||||
if interface_type == "internal" and "br" in interface_name:
|
||||
return constants.OVS_BRIDGE_INTERFACE
|
||||
|
||||
if interface_type == "patch":
|
||||
return constants.OVS_PATCH_INTERFACE
|
||||
|
||||
if 'iface-id' in row.external_ids:
|
||||
return constants.OVS_VM_INTERFACE
|
||||
|
||||
options = row.options
|
||||
if 'remote_ip' in options:
|
||||
return constants.OVS_TUNNEL_INTERFACE
|
||||
|
||||
return constants.OVS_UNKNOWN_INTERFACE
|
||||
|
||||
@classmethod
|
||||
def from_idl_row(cls, row):
|
||||
result = cls()
|
||||
result.uuid = row.uuid
|
||||
if row.ofport:
|
||||
result.ofport = int(row.ofport[0])
|
||||
result.name = row.name
|
||||
if row.admin_state:
|
||||
result.admin_state = row.admin_state[0]
|
||||
result.type = cls._get_interface_type(row)
|
||||
external_ids = row.external_ids
|
||||
result.iface_id = external_ids.get('iface-id', "")
|
||||
result.attached_mac = external_ids.get('attached-mac', "")
|
||||
if result.type == "patch":
|
||||
result.peer = row.options['peer']
|
||||
if result.type == "tunnel":
|
||||
result.remote_ip = row.options['remote_ip']
|
||||
result.tunnel_type = row.type
|
||||
return result
|
||||
|
||||
def get_id(self):
|
||||
return self.uuid
|
||||
|
||||
|
|
|
@ -13,13 +13,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import ryu.contrib.ovs.json
|
||||
from ryu.contrib.ovs.jsonrpc import Message
|
||||
|
||||
from dragonflow._i18n import _LE, _LI
|
||||
from dragonflow.common import constants
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow.common import utils as df_utils
|
||||
from dragonflow.db import api_vswitch
|
||||
|
||||
from neutron.agent.ovsdb import impl_idl
|
||||
|
@ -28,11 +23,7 @@ from neutron.agent.ovsdb.native.commands import BaseCommand
|
|||
from neutron.agent.ovsdb.native import connection
|
||||
from neutron.agent.ovsdb.native import idlutils
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import six
|
||||
import socket
|
||||
import time
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -51,6 +42,7 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
|
|||
self.ovsdb = None
|
||||
self.idl = None
|
||||
self.nb_api = nb_api
|
||||
self.ovsdb_monitor = None
|
||||
|
||||
def initialize(self):
|
||||
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
|
||||
|
@ -60,8 +52,8 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
|
|||
self.ovsdb.start()
|
||||
self.idl = self.ovsdb.idl
|
||||
|
||||
ovsdb_monitor = OvsdbMonitor(self.nb_api, self.idl)
|
||||
ovsdb_monitor.daemonize()
|
||||
self.ovsdb_monitor = OvsdbMonitor(self.nb_api, self.idl)
|
||||
self.idl.notify = self.ovsdb_monitor.notify
|
||||
|
||||
@property
|
||||
def _tables(self):
|
||||
|
@ -285,269 +277,30 @@ class AddTunnelPort(BaseCommand):
|
|||
|
||||
|
||||
class OvsdbMonitor(object):
|
||||
|
||||
MONITOR_TABLE_NAME = "Interface"
|
||||
MSG_STATUS_NEW = "new"
|
||||
MSG_STATUS_OLD = "old"
|
||||
INTERFACE_FIELD_OFPORT = "ofport"
|
||||
INTERFACE_FIELD_NAME = "name"
|
||||
INTERFACE_FIELD_ADMIN_STATE = "admin_state"
|
||||
INTERFACE_FIELD_EXTERNAL_IDS = "external_ids"
|
||||
INTERFACE_FIELD_OPTIONS = "options"
|
||||
INTERFACE_FIELD_TYPE = "type"
|
||||
|
||||
TYPE_UNKNOW_PORT = 0
|
||||
TYPE_VM_PORT = 1
|
||||
TYPE_TUNNEL_PORT = 2
|
||||
TYPE_BRIDGE_PORT = 3
|
||||
TYPE_PATCH_PORT = 4
|
||||
|
||||
def __init__(self, nb_api, idl):
|
||||
super(OvsdbMonitor, self).__init__()
|
||||
self.input = ""
|
||||
self.output = ""
|
||||
self.parser = None
|
||||
self.sock = None
|
||||
self.monitor_request_id = None
|
||||
self.nb_api = nb_api
|
||||
self.idl = idl
|
||||
self._daemon = df_utils.DFDaemon()
|
||||
|
||||
def daemonize(self):
|
||||
return self._daemon.daemonize(self.run)
|
||||
def _is_handle_interface_update(self, interface):
|
||||
if interface.type != constants.OVS_VM_INTERFACE:
|
||||
return False
|
||||
if interface.name.startswith('qg'):
|
||||
return False
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
return self._daemon.stop()
|
||||
def _notify_update_local_interface(self, local_interface, action):
|
||||
if self._is_handle_interface_update(local_interface):
|
||||
table = constants.OVS_INTERFACE
|
||||
key = local_interface.uuid
|
||||
self.nb_api.db_change_callback(table, key, action, local_interface)
|
||||
|
||||
def connect_ovsdb(self):
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
|
||||
unconnected = True
|
||||
while unconnected:
|
||||
try:
|
||||
self.sock.connect(cfg.CONF.df.ovsdb_local_address)
|
||||
unconnected = False
|
||||
except socket.error as e:
|
||||
LOG.exception(_LE("could not connect to local ovsdb, %s"), e)
|
||||
time.sleep(5)
|
||||
|
||||
def send_msg(self, msg):
|
||||
self.output += ryu.contrib.ovs.json.to_string(msg.to_json())
|
||||
while len(self.output):
|
||||
retval = self.sock.send(self.output)
|
||||
if retval > 0:
|
||||
self.output = self.output[retval:]
|
||||
continue
|
||||
elif retval == 0:
|
||||
continue
|
||||
else:
|
||||
raise df_exceptions.SocketWriteException()
|
||||
|
||||
def send_monitor_request(self):
|
||||
monitor_request = {}
|
||||
columns_keys = [OvsdbMonitor.INTERFACE_FIELD_OFPORT,
|
||||
OvsdbMonitor.INTERFACE_FIELD_NAME,
|
||||
OvsdbMonitor.INTERFACE_FIELD_ADMIN_STATE,
|
||||
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS,
|
||||
OvsdbMonitor.INTERFACE_FIELD_OPTIONS,
|
||||
OvsdbMonitor.INTERFACE_FIELD_TYPE]
|
||||
|
||||
monitor_request[OvsdbMonitor.MONITOR_TABLE_NAME] = {
|
||||
"columns": columns_keys}
|
||||
msg = Message.create_request(
|
||||
"monitor", ["Open_vSwitch", None, monitor_request])
|
||||
self.monitor_request_id = msg.id
|
||||
self.send_msg(msg)
|
||||
|
||||
def handle_update(self, table_update):
|
||||
table_rows = table_update.get(OvsdbMonitor.MONITOR_TABLE_NAME)
|
||||
if table_rows is None:
|
||||
def notify(self, event, row, updates=None):
|
||||
if not row or not hasattr(row, '_table'):
|
||||
return
|
||||
|
||||
for row_uuid, table_row in six.iteritems(table_rows):
|
||||
if not self._is_handle_interface_update(row_uuid):
|
||||
LOG.info(_LI("Skipping port id %s"), row_uuid)
|
||||
continue
|
||||
|
||||
new = table_row.get(OvsdbMonitor.MSG_STATUS_NEW)
|
||||
old = table_row.get(OvsdbMonitor.MSG_STATUS_OLD)
|
||||
|
||||
if not old and not new:
|
||||
return
|
||||
elif not new:
|
||||
# delete a old interface
|
||||
_interface = api_vswitch.LocalInterface()
|
||||
_interface.uuid = row_uuid
|
||||
self.parse_interface(_interface, old)
|
||||
self.notify_update_local_interface(_interface, "delete")
|
||||
else:
|
||||
# add a new interface or update a exist interface
|
||||
_interface = api_vswitch.LocalInterface()
|
||||
_interface.uuid = row_uuid
|
||||
self.parse_interface(_interface, new)
|
||||
self.notify_update_local_interface(_interface, "create")
|
||||
|
||||
def _is_handle_interface_update(self, interface_uuid):
|
||||
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
|
||||
return interface_uuid in br_int.ports
|
||||
|
||||
def get_interface_type(self, input_dict):
|
||||
interface_type = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_TYPE)
|
||||
interface_name = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_NAME)
|
||||
|
||||
if interface_type == "internal" and "br" in interface_name:
|
||||
return OvsdbMonitor.TYPE_BRIDGE_PORT
|
||||
|
||||
if interface_type == "patch":
|
||||
return OvsdbMonitor.TYPE_PATCH_PORT
|
||||
|
||||
external_ids = input_dict.get(
|
||||
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS)
|
||||
external_elements = external_ids[1]
|
||||
for element in external_elements:
|
||||
if element[0] == "iface-id":
|
||||
return OvsdbMonitor.TYPE_VM_PORT
|
||||
|
||||
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
|
||||
options_elements = options[1]
|
||||
for element in options_elements:
|
||||
if element[0] == "remote_ip":
|
||||
return OvsdbMonitor.TYPE_TUNNEL_PORT
|
||||
|
||||
return OvsdbMonitor.TYPE_UNKNOW_PORT
|
||||
|
||||
def parse_interface(self, _interface, input_dict):
|
||||
interface_type = self.get_interface_type(input_dict)
|
||||
if interface_type == OvsdbMonitor.TYPE_UNKNOW_PORT:
|
||||
return
|
||||
|
||||
interface_ofport = input_dict.get(
|
||||
OvsdbMonitor.INTERFACE_FIELD_OFPORT)
|
||||
if isinstance(interface_ofport, list):
|
||||
_interface.ofport = -1
|
||||
else:
|
||||
_interface.ofport = interface_ofport
|
||||
|
||||
interface_name = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_NAME)
|
||||
if isinstance(interface_name, list):
|
||||
_interface.name = ""
|
||||
else:
|
||||
_interface.name = interface_name
|
||||
|
||||
interface_admin_state = input_dict.get(
|
||||
OvsdbMonitor.INTERFACE_FIELD_ADMIN_STATE)
|
||||
if isinstance(interface_admin_state, list):
|
||||
_interface.admin_state = ""
|
||||
else:
|
||||
_interface.admin_state = interface_admin_state
|
||||
|
||||
if interface_type == OvsdbMonitor.TYPE_VM_PORT:
|
||||
_interface.type = constants.OVS_VM_INTERFACE
|
||||
external_ids = input_dict.get(
|
||||
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS)
|
||||
external_elements = external_ids[1]
|
||||
for element in external_elements:
|
||||
if element[0] == "attached-mac":
|
||||
_interface.attached_mac = element[1]
|
||||
elif element[0] == "iface-id":
|
||||
_interface.iface_id = element[1]
|
||||
elif interface_type == OvsdbMonitor.TYPE_BRIDGE_PORT:
|
||||
_interface.type = constants.OVS_BRIDGE_INTERFACE
|
||||
elif interface_type == OvsdbMonitor.TYPE_PATCH_PORT:
|
||||
_interface.type = constants.OVS_PATCH_INTERFACE
|
||||
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
|
||||
options_elements = options[1]
|
||||
for element in options_elements:
|
||||
if element[0] == "peer":
|
||||
_interface.peer = element[1]
|
||||
break
|
||||
elif interface_type == OvsdbMonitor.TYPE_TUNNEL_PORT:
|
||||
_interface.type = constants.OVS_TUNNEL_INTERFACE
|
||||
_interface.tunnel_type = input_dict.get(
|
||||
OvsdbMonitor.INTERFACE_FIELD_TYPE)
|
||||
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
|
||||
options_elements = options[1]
|
||||
for element in options_elements:
|
||||
if element[0] == "remote_ip":
|
||||
_interface.remote_ip = element[1]
|
||||
break
|
||||
else:
|
||||
pass
|
||||
|
||||
def wait_for_parser(self):
|
||||
while not self.parser.is_done():
|
||||
if self.input == "":
|
||||
data = self.sock.recv(4095)
|
||||
if not data:
|
||||
raise df_exceptions.SocketReadException()
|
||||
else:
|
||||
self.input += data
|
||||
self.input = self.input[self.parser.feed(self.input):]
|
||||
|
||||
def handle_message(self):
|
||||
while True:
|
||||
self.parser = ryu.contrib.ovs.json.Parser()
|
||||
try:
|
||||
self.wait_for_parser()
|
||||
except df_exceptions.SocketReadException:
|
||||
LOG.exception(_LE("exception happened "
|
||||
"when read from socket"))
|
||||
return
|
||||
json_ = self.parser.finish()
|
||||
self.parser = None
|
||||
msg = Message.from_json(json_)
|
||||
if msg is None:
|
||||
continue
|
||||
elif msg.id == "echo":
|
||||
reply = Message.create_reply([], "echo")
|
||||
try:
|
||||
self.send_msg(reply)
|
||||
except df_exceptions.SocketWriteException:
|
||||
LOG.exception(_LE("exception happened "
|
||||
"when send msg to socket"))
|
||||
return
|
||||
elif (msg.type == Message.T_NOTIFY
|
||||
and msg.method == "update"
|
||||
and len(msg.params) == 2
|
||||
and msg.params[0] is None):
|
||||
self.handle_update(msg.params[1])
|
||||
elif (msg.type == Message.T_REPLY
|
||||
and self.monitor_request_id is not None
|
||||
and self.monitor_request_id == msg.id):
|
||||
self.monitor_request_id = None
|
||||
self.handle_update(msg.result)
|
||||
self.notify_monitor_reply_finish()
|
||||
else:
|
||||
continue
|
||||
|
||||
def notify_update_local_interface(self, local_interface, action):
|
||||
table = constants.OVS_INTERFACE
|
||||
key = local_interface.uuid
|
||||
self.nb_api.db_change_callback(table, key, action,
|
||||
local_interface, None)
|
||||
|
||||
def notify_monitor_reply_finish(self):
|
||||
table = constants.OVS_INTERFACE
|
||||
action = "sync_finished"
|
||||
self.nb_api.db_change_callback(table, None, action, None, None)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.output = ""
|
||||
self.input = ""
|
||||
if self.sock is not None:
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
|
||||
self.connect_ovsdb()
|
||||
try:
|
||||
self.send_monitor_request()
|
||||
except Exception as e:
|
||||
LOG.exception(_LE("exception happened "
|
||||
"when send monitor request:%s"), e)
|
||||
continue
|
||||
|
||||
self.handle_message()
|
||||
if row._table.name == 'Interface':
|
||||
_interface = api_vswitch.LocalInterface.from_idl_row(row)
|
||||
self._notify_update_local_interface(_interface, event)
|
||||
|
||||
|
||||
class AddPatchPort(BaseCommand):
|
||||
|
|
Loading…
Reference in New Issue