use IDLs built-in notify for ovsdb monitor

Change-Id: I13d45fc1e75ee080b48f979ebf8380489de3d8ed
Closes-Bug: 1564766
Closes-Bug: 1563708
This commit is contained in:
Omer Anson 2016-04-04 16:26:23 +03:00
parent f5fcf49fb9
commit d08b5ae03d
3 changed files with 62 additions and 266 deletions

View File

@ -17,3 +17,4 @@ OVS_VM_INTERFACE = "vm"
OVS_BRIDGE_INTERFACE = "bridge"
OVS_PATCH_INTERFACE = "patch"
OVS_TUNNEL_INTERFACE = "tunnel"
OVS_UNKNOWN_INTERFACE = "unknown"

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from dragonflow.common import constants
class SwitchApi(object):
@ -75,6 +77,46 @@ class LocalInterface(object):
self.remote_ip = ""
self.tunnel_type = ""
@classmethod
def _get_interface_type(cls, row):
interface_type = row.type
interface_name = row.name
if interface_type == "internal" and "br" in interface_name:
return constants.OVS_BRIDGE_INTERFACE
if interface_type == "patch":
return constants.OVS_PATCH_INTERFACE
if 'iface-id' in row.external_ids:
return constants.OVS_VM_INTERFACE
options = row.options
if 'remote_ip' in options:
return constants.OVS_TUNNEL_INTERFACE
return constants.OVS_UNKNOWN_INTERFACE
@classmethod
def from_idl_row(cls, row):
result = cls()
result.uuid = row.uuid
if row.ofport:
result.ofport = int(row.ofport[0])
result.name = row.name
if row.admin_state:
result.admin_state = row.admin_state[0]
result.type = cls._get_interface_type(row)
external_ids = row.external_ids
result.iface_id = external_ids.get('iface-id', "")
result.attached_mac = external_ids.get('attached-mac', "")
if result.type == "patch":
result.peer = row.options['peer']
if result.type == "tunnel":
result.remote_ip = row.options['remote_ip']
result.tunnel_type = row.type
return result
def get_id(self):
return self.uuid

View File

@ -13,13 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ryu.contrib.ovs.json
from ryu.contrib.ovs.jsonrpc import Message
from dragonflow._i18n import _LE, _LI
from dragonflow.common import constants
from dragonflow.common import exceptions as df_exceptions
from dragonflow.common import utils as df_utils
from dragonflow.db import api_vswitch
from neutron.agent.ovsdb import impl_idl
@ -28,11 +23,7 @@ from neutron.agent.ovsdb.native.commands import BaseCommand
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from oslo_config import cfg
from oslo_log import log
import six
import socket
import time
LOG = log.getLogger(__name__)
@ -51,6 +42,7 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
self.ovsdb = None
self.idl = None
self.nb_api = nb_api
self.ovsdb_monitor = None
def initialize(self):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
@ -60,8 +52,8 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
self.ovsdb.start()
self.idl = self.ovsdb.idl
ovsdb_monitor = OvsdbMonitor(self.nb_api, self.idl)
ovsdb_monitor.daemonize()
self.ovsdb_monitor = OvsdbMonitor(self.nb_api, self.idl)
self.idl.notify = self.ovsdb_monitor.notify
@property
def _tables(self):
@ -285,269 +277,30 @@ class AddTunnelPort(BaseCommand):
class OvsdbMonitor(object):
MONITOR_TABLE_NAME = "Interface"
MSG_STATUS_NEW = "new"
MSG_STATUS_OLD = "old"
INTERFACE_FIELD_OFPORT = "ofport"
INTERFACE_FIELD_NAME = "name"
INTERFACE_FIELD_ADMIN_STATE = "admin_state"
INTERFACE_FIELD_EXTERNAL_IDS = "external_ids"
INTERFACE_FIELD_OPTIONS = "options"
INTERFACE_FIELD_TYPE = "type"
TYPE_UNKNOW_PORT = 0
TYPE_VM_PORT = 1
TYPE_TUNNEL_PORT = 2
TYPE_BRIDGE_PORT = 3
TYPE_PATCH_PORT = 4
def __init__(self, nb_api, idl):
super(OvsdbMonitor, self).__init__()
self.input = ""
self.output = ""
self.parser = None
self.sock = None
self.monitor_request_id = None
self.nb_api = nb_api
self.idl = idl
self._daemon = df_utils.DFDaemon()
def daemonize(self):
return self._daemon.daemonize(self.run)
def _is_handle_interface_update(self, interface):
if interface.type != constants.OVS_VM_INTERFACE:
return False
if interface.name.startswith('qg'):
return False
return True
def stop(self):
return self._daemon.stop()
def _notify_update_local_interface(self, local_interface, action):
if self._is_handle_interface_update(local_interface):
table = constants.OVS_INTERFACE
key = local_interface.uuid
self.nb_api.db_change_callback(table, key, action, local_interface)
def connect_ovsdb(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
unconnected = True
while unconnected:
try:
self.sock.connect(cfg.CONF.df.ovsdb_local_address)
unconnected = False
except socket.error as e:
LOG.exception(_LE("could not connect to local ovsdb, %s"), e)
time.sleep(5)
def send_msg(self, msg):
self.output += ryu.contrib.ovs.json.to_string(msg.to_json())
while len(self.output):
retval = self.sock.send(self.output)
if retval > 0:
self.output = self.output[retval:]
continue
elif retval == 0:
continue
else:
raise df_exceptions.SocketWriteException()
def send_monitor_request(self):
monitor_request = {}
columns_keys = [OvsdbMonitor.INTERFACE_FIELD_OFPORT,
OvsdbMonitor.INTERFACE_FIELD_NAME,
OvsdbMonitor.INTERFACE_FIELD_ADMIN_STATE,
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS,
OvsdbMonitor.INTERFACE_FIELD_OPTIONS,
OvsdbMonitor.INTERFACE_FIELD_TYPE]
monitor_request[OvsdbMonitor.MONITOR_TABLE_NAME] = {
"columns": columns_keys}
msg = Message.create_request(
"monitor", ["Open_vSwitch", None, monitor_request])
self.monitor_request_id = msg.id
self.send_msg(msg)
def handle_update(self, table_update):
table_rows = table_update.get(OvsdbMonitor.MONITOR_TABLE_NAME)
if table_rows is None:
def notify(self, event, row, updates=None):
if not row or not hasattr(row, '_table'):
return
for row_uuid, table_row in six.iteritems(table_rows):
if not self._is_handle_interface_update(row_uuid):
LOG.info(_LI("Skipping port id %s"), row_uuid)
continue
new = table_row.get(OvsdbMonitor.MSG_STATUS_NEW)
old = table_row.get(OvsdbMonitor.MSG_STATUS_OLD)
if not old and not new:
return
elif not new:
# delete a old interface
_interface = api_vswitch.LocalInterface()
_interface.uuid = row_uuid
self.parse_interface(_interface, old)
self.notify_update_local_interface(_interface, "delete")
else:
# add a new interface or update a exist interface
_interface = api_vswitch.LocalInterface()
_interface.uuid = row_uuid
self.parse_interface(_interface, new)
self.notify_update_local_interface(_interface, "create")
def _is_handle_interface_update(self, interface_uuid):
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name', 'br-int')
return interface_uuid in br_int.ports
def get_interface_type(self, input_dict):
interface_type = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_TYPE)
interface_name = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_NAME)
if interface_type == "internal" and "br" in interface_name:
return OvsdbMonitor.TYPE_BRIDGE_PORT
if interface_type == "patch":
return OvsdbMonitor.TYPE_PATCH_PORT
external_ids = input_dict.get(
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS)
external_elements = external_ids[1]
for element in external_elements:
if element[0] == "iface-id":
return OvsdbMonitor.TYPE_VM_PORT
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
options_elements = options[1]
for element in options_elements:
if element[0] == "remote_ip":
return OvsdbMonitor.TYPE_TUNNEL_PORT
return OvsdbMonitor.TYPE_UNKNOW_PORT
def parse_interface(self, _interface, input_dict):
interface_type = self.get_interface_type(input_dict)
if interface_type == OvsdbMonitor.TYPE_UNKNOW_PORT:
return
interface_ofport = input_dict.get(
OvsdbMonitor.INTERFACE_FIELD_OFPORT)
if isinstance(interface_ofport, list):
_interface.ofport = -1
else:
_interface.ofport = interface_ofport
interface_name = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_NAME)
if isinstance(interface_name, list):
_interface.name = ""
else:
_interface.name = interface_name
interface_admin_state = input_dict.get(
OvsdbMonitor.INTERFACE_FIELD_ADMIN_STATE)
if isinstance(interface_admin_state, list):
_interface.admin_state = ""
else:
_interface.admin_state = interface_admin_state
if interface_type == OvsdbMonitor.TYPE_VM_PORT:
_interface.type = constants.OVS_VM_INTERFACE
external_ids = input_dict.get(
OvsdbMonitor.INTERFACE_FIELD_EXTERNAL_IDS)
external_elements = external_ids[1]
for element in external_elements:
if element[0] == "attached-mac":
_interface.attached_mac = element[1]
elif element[0] == "iface-id":
_interface.iface_id = element[1]
elif interface_type == OvsdbMonitor.TYPE_BRIDGE_PORT:
_interface.type = constants.OVS_BRIDGE_INTERFACE
elif interface_type == OvsdbMonitor.TYPE_PATCH_PORT:
_interface.type = constants.OVS_PATCH_INTERFACE
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
options_elements = options[1]
for element in options_elements:
if element[0] == "peer":
_interface.peer = element[1]
break
elif interface_type == OvsdbMonitor.TYPE_TUNNEL_PORT:
_interface.type = constants.OVS_TUNNEL_INTERFACE
_interface.tunnel_type = input_dict.get(
OvsdbMonitor.INTERFACE_FIELD_TYPE)
options = input_dict.get(OvsdbMonitor.INTERFACE_FIELD_OPTIONS)
options_elements = options[1]
for element in options_elements:
if element[0] == "remote_ip":
_interface.remote_ip = element[1]
break
else:
pass
def wait_for_parser(self):
while not self.parser.is_done():
if self.input == "":
data = self.sock.recv(4095)
if not data:
raise df_exceptions.SocketReadException()
else:
self.input += data
self.input = self.input[self.parser.feed(self.input):]
def handle_message(self):
while True:
self.parser = ryu.contrib.ovs.json.Parser()
try:
self.wait_for_parser()
except df_exceptions.SocketReadException:
LOG.exception(_LE("exception happened "
"when read from socket"))
return
json_ = self.parser.finish()
self.parser = None
msg = Message.from_json(json_)
if msg is None:
continue
elif msg.id == "echo":
reply = Message.create_reply([], "echo")
try:
self.send_msg(reply)
except df_exceptions.SocketWriteException:
LOG.exception(_LE("exception happened "
"when send msg to socket"))
return
elif (msg.type == Message.T_NOTIFY
and msg.method == "update"
and len(msg.params) == 2
and msg.params[0] is None):
self.handle_update(msg.params[1])
elif (msg.type == Message.T_REPLY
and self.monitor_request_id is not None
and self.monitor_request_id == msg.id):
self.monitor_request_id = None
self.handle_update(msg.result)
self.notify_monitor_reply_finish()
else:
continue
def notify_update_local_interface(self, local_interface, action):
table = constants.OVS_INTERFACE
key = local_interface.uuid
self.nb_api.db_change_callback(table, key, action,
local_interface, None)
def notify_monitor_reply_finish(self):
table = constants.OVS_INTERFACE
action = "sync_finished"
self.nb_api.db_change_callback(table, None, action, None, None)
def run(self):
while True:
self.output = ""
self.input = ""
if self.sock is not None:
self.sock.close()
self.sock = None
self.connect_ovsdb()
try:
self.send_monitor_request()
except Exception as e:
LOG.exception(_LE("exception happened "
"when send monitor request:%s"), e)
continue
self.handle_message()
if row._table.name == 'Interface':
_interface = api_vswitch.LocalInterface.from_idl_row(row)
self._notify_update_local_interface(_interface, event)
class AddPatchPort(BaseCommand):