dragonflow/dragonflow/controller/apps/metadata_service.py

541 lines
19 KiB
Python

# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import hmac
import httplib2
import netaddr
from oslo_log import log
from oslo_utils import encodeutils
from ryu.lib.packet import arp
from ryu.lib.packet import ethernet
from ryu.lib.packet import ipv4
from ryu.ofproto import nicira_ext
import six
import six.moves.urllib.parse as urlparse
import webob
from dragonflow._i18n import _
from dragonflow.common import exceptions
from dragonflow import conf as cfg
from dragonflow.controller.common import arp_responder
from dragonflow.controller.common import constants as const
from dragonflow.controller import df_base_app
from dragonflow.db.models import constants as model_const
from dragonflow.db.models import l2
from dragonflow.db.models import switch
LOG = log.getLogger(__name__)
FLOW_IDLE_TIMEOUT = 60
# TODO(oanson) The TCP_* flag constants have already made it into ryu
# master, but not to pip. Once that is done, they should be taken from
# there. (ryu.lib.packet.tcp.TCP_SYN and ryu.lib.packet.tcp.TCP_ACK)
TCP_SYN = 0x002
TCP_ACK = 0x010
class MetadataServiceApp(df_base_app.DFlowApp):
def __init__(self, *args, **kwargs):
super(MetadataServiceApp, self).__init__(*args, **kwargs)
self._arp_responder = None
self._ofport = None
self._interface_mac = ""
self._ip = cfg.CONF.df_metadata.ip
self._port = cfg.CONF.df_metadata.port
self._interface = cfg.CONF.df_metadata.metadata_interface
def switch_features_handler(self, ev):
if self._interface_mac and self._ofport and self._ofport > 0:
# For reconnection, if the mac and ofport is set, re-download
# the flows.
self._add_tap_metadata_port(self._ofport, self._interface_mac)
@df_base_app.register_event(switch.SwitchPort, model_const.EVENT_CREATED)
@df_base_app.register_event(switch.SwitchPort, model_const.EVENT_UPDATED)
def ovs_port_updated(self, ovs_port, orig_ovs_port=None):
if ovs_port.name != cfg.CONF.df_metadata.metadata_interface:
return
ofport = ovs_port.ofport
mac = ovs_port.mac_in_use
if not ofport or not mac:
return
if ofport <= 0:
return
if ofport == self._ofport and mac == self._interface_mac:
return
self._add_tap_metadata_port(ofport, mac)
self._ofport = ofport
self._interface_mac = mac
@df_base_app.register_event(switch.SwitchPort, model_const.EVENT_DELETED)
def ovs_port_deleted(self, ovs_port):
if ovs_port.name != cfg.CONF.df_metadata.metadata_interface:
return
self._remove_metadata_interface_flows()
def _remove_metadata_interface_flows(self):
if not self._ofport:
return
parser = self.parser
ofproto = self.ofproto
self.mod_flow(
table_id=const.INGRESS_CLASSIFICATION_DISPATCH_TABLE,
command=ofproto.OFPFC_DELETE,
priority=const.PRIORITY_MEDIUM,
match=parser.OFPMatch(in_port=self._ofport))
self._ofport = None
self._interface_mac = ""
def _add_tap_metadata_port(self, ofport, mac):
"""
Add the flows that can be added with the current available information:
Regular Client->Server packets have IP rewritten, and sent to OVS port
TCP Syn packets are sent to controller, so that response flows can be
added.
Packets from the OVS port are detected and sent for classification.
"""
self._ofport = ofport
ofproto = self.ofproto
parser = self.parser
self._add_incoming_flows()
# Regular packet
match = parser.OFPMatch(eth_type=ethernet.ether.ETH_TYPE_IP)
actions = self._get_rewrite_ip_and_output_actions(ofproto, parser)
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
actions,
)]
self.mod_flow(
table_id=const.METADATA_SERVICE_TABLE,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_MEDIUM,
match=match,
inst=inst,
)
# TCP SYN packet
match = parser.OFPMatch(
eth_type=ethernet.ether.ETH_TYPE_IP,
ip_proto=ipv4.inet.IPPROTO_TCP,
tcp_flags=(TCP_SYN, TCP_SYN | TCP_ACK),
)
learn_actions = self._get_learn_actions(ofproto, parser)
learn_actions.extend(actions)
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
learn_actions,
)]
self.mod_flow(
table_id=const.METADATA_SERVICE_TABLE,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_HIGH,
match=match,
inst=inst,
)
# ARP responder
match = parser.OFPMatch(in_port=ofport,
eth_type=ethernet.ether.ETH_TYPE_ARP)
actions = [
parser.NXActionResubmitTable(
table_id=const.METADATA_SERVICE_REPLY_TABLE),
parser.OFPActionOutput(ofproto.OFPP_IN_PORT, 0)]
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
actions)]
self.mod_flow(
table_id=const.INGRESS_CLASSIFICATION_DISPATCH_TABLE,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_MEDIUM,
match=match,
inst=inst,
)
self._create_arp_responder(mac)
# Response packet
match = parser.OFPMatch(in_port=ofport,
eth_type=ethernet.ether.ETH_TYPE_IP)
actions = [
parser.NXActionResubmitTable(
table_id=const.METADATA_SERVICE_REPLY_TABLE),
parser.NXActionResubmitTable(
table_id=const.INGRESS_DISPATCH_TABLE)
]
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
actions)]
self.mod_flow(
table_id=const.INGRESS_CLASSIFICATION_DISPATCH_TABLE,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_MEDIUM,
match=match,
inst=inst,
)
def _add_incoming_flows(self):
ofproto = self.ofproto
parser = self.parser
match = parser.OFPMatch(
eth_type=ethernet.ether.ETH_TYPE_IP,
ipv4_dst=const.METADATA_SERVICE_IP,
ip_proto=ipv4.inet.IPPROTO_TCP,
tcp_dst=const.METADATA_HTTP_PORT,
)
actions = [
parser.NXActionResubmitTable(
table_id=self.dfdp.apps['portsec'].exitpoints.services),
]
# Bypass the security group check for metadata request.
self.mod_flow(
table_id=self.dfdp.apps['portsec'].states.main,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_VERY_HIGH,
match=match,
actions=actions)
inst = self._get_incoming_flow_instructions(ofproto, parser)
self.mod_flow(
table_id=const.SERVICES_CLASSIFICATION_TABLE,
command=ofproto.OFPFC_ADD,
priority=const.PRIORITY_MEDIUM,
match=match,
inst=inst)
def _get_incoming_flow_instructions(self, ofproto, parser):
actions = self._get_incoming_flow_actions(ofproto, parser)
inst = []
if actions:
inst.append(
parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
actions
),
)
inst.append(
parser.OFPInstructionGotoTable(const.METADATA_SERVICE_TABLE)
)
return inst
def _get_incoming_flow_actions(self, ofproto, parser):
actions = []
if self._ip != const.METADATA_SERVICE_IP:
actions.append(parser.OFPActionSetField(ipv4_dst=self._ip))
if self._port != const.METADATA_HTTP_PORT:
actions.append(parser.OFPActionSetField(tcp_dst=self._port))
return actions
def _get_rewrite_ip_and_output_actions(self, ofproto, parser):
"""
Retrieve the actions that rewrite the dst IP field with the reg6
(the tunnel key), set the first bit of that field, and output to the
metadata service OVS port.
The IP is set to <reg6> | 0x8000000, so that the transparent proxy
can extract the <reg6> from the source IP address, and be able to
identify the source VM. reg6 holds the local DF id identifying the VM.
"""
return [
parser.NXActionRegMove(
src_field='reg6',
dst_field='ipv4_src',
n_bits=32,
),
parser.NXActionRegLoad(
ofs_nbits=nicira_ext.ofs_nbits(31, 31),
dst="ipv4_src",
value=1,),
parser.OFPActionOutput(
self._ofport,
ofproto.OFPCML_NO_BUFFER,
)
]
def _get_learn_actions(self, ofproto, parser):
return [
# Return flow
parser.NXActionLearn(
table_id=const.METADATA_SERVICE_REPLY_TABLE,
specs=[
# Match
parser.NXFlowSpecMatch(
src=ethernet.ether.ETH_TYPE_IP,
dst=('eth_type', 0),
n_bits=16,
),
parser.NXFlowSpecMatch(
src=ipv4.inet.IPPROTO_TCP,
dst=('ip_proto', 0),
n_bits=8,
),
parser.NXFlowSpecMatch(
src=1,
dst=('ipv4_dst', 31),
n_bits=1,
),
parser.NXFlowSpecMatch(
src=('reg6', 0),
dst=('ipv4_dst', 0),
n_bits=31,
),
parser.NXFlowSpecMatch(
src=('tcp_src', 0),
dst=('tcp_dst', 0),
n_bits=16,
),
# Actions
parser.NXFlowSpecLoad(
src=('ipv4_src', 0),
dst=('ipv4_dst', 0),
n_bits=32,
),
parser.NXFlowSpecLoad(
src=int(netaddr.IPAddress(const.METADATA_SERVICE_IP)),
dst=('ipv4_src', 0),
n_bits=32,
),
parser.NXFlowSpecLoad(
src=const.METADATA_HTTP_PORT,
dst=('tcp_src', 0),
n_bits=16,
),
parser.NXFlowSpecLoad(
src=('reg6', 0),
dst=('reg7', 0),
n_bits=32,
),
],
fin_idle_timeout=1,
fin_hard_timeout=1,
),
# ARP responder
parser.NXActionLearn(
table_id=const.METADATA_SERVICE_REPLY_TABLE,
priority=const.PRIORITY_HIGH,
specs=[
# Match
parser.NXFlowSpecMatch(
src=ethernet.ether.ETH_TYPE_ARP,
dst=('eth_type', 0),
n_bits=16,
),
parser.NXFlowSpecMatch(
src=('reg6', 0),
dst=('arp_tpa', 0),
n_bits=31,
),
parser.NXFlowSpecMatch(
src=arp.ARP_REQUEST,
dst=('arp_op', 0),
n_bits=8,
),
# Actions
parser.NXFlowSpecLoad(
src=0,
dst=('reg6', 0),
n_bits=32,
),
parser.NXFlowSpecLoad(
src=arp.ARP_REPLY,
dst=('arp_op', 0),
n_bits=8,
),
parser.NXFlowSpecLoad(
src=('eth_dst', 0),
dst=('arp_tha', 0),
n_bits=48,
),
parser.NXFlowSpecLoad(
src=int(netaddr.IPAddress(self._ip)),
dst=('arp_tpa', 0),
n_bits=32,
),
parser.NXFlowSpecLoad(
src=('eth_src', 0),
dst=('eth_src', 0),
n_bits=48,
),
parser.NXFlowSpecLoad(
src=('eth_src', 0),
dst=('arp_sha', 0),
n_bits=48,
),
parser.NXFlowSpecLoad(
src=('reg6', 0),
dst=('arp_spa', 0),
n_bits=32,
),
parser.NXFlowSpecLoad(
src=1,
dst=('arp_spa', 31),
n_bits=1,
),
],
idle_timeout=30,
)
]
def _create_arp_responder(self, mac):
self._arp_responder = arp_responder.ArpResponder(
self,
None,
const.METADATA_SERVICE_IP,
mac
)
self._arp_responder.add()
class BaseMetadataProxyHandler(object):
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
try:
LOG.debug("Request: %s", req)
return self.proxy_request(req)
except Exception:
LOG.exception("Unexpected error.")
msg = _('An unknown error has occurred. '
'Please try your request again.')
explanation = six.text_type(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
def proxy_request(self, req):
headers = self.get_headers(req)
url = urlparse.urlunsplit((
self.get_scheme(req),
self.get_host(req),
self.get_path_info(req),
self.get_query_string(req),
''))
h = self.create_http_client(req)
resp, content = h.request(
url,
method=req.method,
headers=headers,
body=req.body
)
if resp.status == 200:
LOG.debug(str(resp))
return self.create_response(req, resp, content)
elif resp.status == 403:
LOG.warning(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.')
return webob.exc.HTTPForbidden()
elif resp.status == 400:
return webob.exc.HTTPBadRequest()
elif resp.status == 404:
return webob.exc.HTTPNotFound()
elif resp.status == 409:
return webob.exc.HTTPConflict()
elif resp.status == 500:
msg = (
'Remote metadata server experienced an internal server error.'
)
LOG.warning(msg)
explanation = six.text_type(msg)
return webob.exc.HTTPInternalServerError(explanation=explanation)
else:
raise Exception(_('Unexpected response code: %s') % resp.status)
def get_headers(self, req):
return req.headers
def create_response(self, req, resp, content):
req.response.content_type = resp['content-type']
req.response.body = content
return req.response
def get_scheme(self, req):
return req.scheme
def get_host(self, req):
return req.host
def get_path_info(self, req):
return req.path
def get_query_string(self, req):
return req.query_string
def create_http_client(self, req):
return httplib2.Http()
class DFMetadataProxyHandler(BaseMetadataProxyHandler):
def __init__(self, conf, nb_api):
super(DFMetadataProxyHandler, self).__init__()
self.conf = conf
self.nb_api = nb_api
def get_headers(self, req):
remote_addr = req.remote_addr
if not remote_addr:
raise exceptions.NoRemoteIPProxyException()
tunnel_key = int(netaddr.IPAddress(remote_addr) & ~0x80000000)
lport = self._get_logical_port_by_tunnel_key(tunnel_key)
headers = dict(req.headers)
tenant_id = lport.topic
instance_id = lport.device_id
ip = lport.ip
headers.update({
'X-Forwarded-For': str(ip),
'X-Tenant-ID': tenant_id,
'X-Instance-ID': instance_id,
'X-Instance-ID-Signature': self._sign_instance_id(instance_id),
})
return headers
def get_host(self, req):
return '{}:{}'.format(
self.conf.nova_metadata_host,
self.conf.nova_metadata_port,
)
def get_scheme(self, req):
return self.conf.nova_metadata_protocol
def create_http_client(self, req):
h = httplib2.Http(
ca_certs=self.conf.auth_ca_cert,
disable_ssl_certificate_validation=self.conf.nova_metadata_insecure
)
if self.conf.nova_client_cert and self.conf.nova_client_priv_key:
h.add_certificate(self.conf.nova_client_priv_key,
self.conf.nova_client_cert,
self.get_host(req))
return h
def _get_logical_port_by_tunnel_key(self, tunnel_key):
lports = self.nb_api.get_all(l2.LogicalPort)
for lport in lports:
if lport.unique_key == tunnel_key:
return lport
raise exceptions.LogicalPortNotFoundByTunnelKey(key=tunnel_key)
# Taken from Neurton: neutron/agent/metadata/agent.py
def _sign_instance_id(self, instance_id):
secret = self.conf.metadata_proxy_shared_secret
secret = encodeutils.to_utf8(secret)
instance_id = encodeutils.to_utf8(instance_id)
return hmac.new(secret, instance_id, hashlib.sha256).hexdigest()