cloudbase-init/cloudbaseinit/metadata/services/baseopenstackservice.py

315 lines
11 KiB
Python

# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import posixpath
import netaddr
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
from cloudbaseinit.models import network as network_model
from cloudbaseinit.utils import debiface
from cloudbaseinit.utils import encoding
from cloudbaseinit.utils import x509constants
NETWORK_LINK_TYPE_PHYSICAL = "phy"
NETWORK_LINK_TYPE_BOND = "bond"
NETWORK_LINK_TYPE_VLAN = "vlan"
NETWORK_TYPE_IPV4 = "ipv4"
NETWORK_TYPE_IPV4_DHCP = "ipv4_dhcp"
NETWORK_TYPE_IPV6 = "ipv6"
NETWORK_TYPE_IPV6_DHCP = "ipv6_dhcp"
NETWORK_SERVICE_TYPE_DNS = "dns"
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class BaseOpenStackService(base.BaseMetadataService):
def get_content(self, name):
path = posixpath.normpath(
posixpath.join('openstack', 'content', name))
return self._get_cache_data(path)
def get_user_data(self):
path = posixpath.normpath(
posixpath.join('openstack', 'latest', 'user_data'))
return self._get_cache_data(path)
def _get_openstack_json_data(self, version, file_name):
path = posixpath.normpath(
posixpath.join('openstack', version, file_name))
data = self._get_cache_data(path, decode=True)
if data:
return json.loads(data)
def _get_meta_data(self, version='latest'):
return self._get_openstack_json_data(version, 'meta_data.json')
def _get_network_data(self, version='latest'):
return self._get_openstack_json_data(version, 'network_data.json')
def get_instance_id(self):
return self._get_meta_data().get('uuid')
def get_host_name(self):
return self._get_meta_data().get('hostname')
def get_public_keys(self):
"""Get a list of all unique public keys found among the metadata."""
public_keys = []
meta_data = self._get_meta_data()
public_keys_dict = meta_data.get("public_keys")
if public_keys_dict:
public_keys = list(public_keys_dict.values())
keys = meta_data.get("keys")
if keys:
for key_dict in keys:
if key_dict["type"] == "ssh":
public_keys.append(key_dict["data"])
return list(set((key.strip() for key in public_keys)))
def get_network_details(self):
network_config = self._get_meta_data().get('network_config')
if not network_config:
return None
key = "content_path"
if key not in network_config:
return None
content_name = network_config[key].rsplit("/", 1)[-1]
content = self.get_content(content_name)
content = encoding.get_as_string(content)
return debiface.parse(content)
@staticmethod
def _ip_netmask_to_cidr(ip_address, netmask):
if netmask is None:
return ip_address
prefix_len = netaddr.IPNetwork(
u"%s/%s" % (ip_address, netmask)).prefixlen
return u"%s/%s" % (ip_address, prefix_len)
@staticmethod
def _parse_network_data_links(links_data):
links = []
for link_data in links_data:
link_id = link_data.get("id")
mac = link_data.get("ethernet_mac_address")
mtu = link_data.get("mtu")
openstack_link_type = link_data.get("type")
bond = None
vlan_id = None
vlan_link = None
if openstack_link_type == NETWORK_LINK_TYPE_BOND:
link_type = network_model.LINK_TYPE_BOND
bond_links = link_data.get("bond_links")
bond_mode = link_data.get("bond_mode")
bond_xmit_hash_policy = link_data.get("bond_xmit_hash_policy")
if bond_mode not in network_model.AVAILABLE_BOND_TYPES:
raise exception.CloudbaseInitException(
"Unsupported bond mode: %s" % bond_mode)
if (bond_xmit_hash_policy is not None and
bond_xmit_hash_policy not in
network_model.AVAILABLE_BOND_LB_ALGORITHMS):
raise exception.CloudbaseInitException(
"Unsupported bond hash policy: %s" %
bond_xmit_hash_policy)
bond = network_model.Bond(
members=bond_links,
type=bond_mode,
lb_algorithm=bond_xmit_hash_policy,
lacp_rate=None,
)
elif openstack_link_type == NETWORK_LINK_TYPE_VLAN:
link_type = network_model.LINK_TYPE_VLAN
vlan_id = link_data.get("vlan_id")
vlan_link = link_data.get("vlan_link")
vlan_mac_address = link_data.get("vlan_mac_address")
if vlan_mac_address is not None:
mac = vlan_mac_address
else:
# Any other link type is considered physical
link_type = network_model.LINK_TYPE_PHYSICAL
link = network_model.Link(
id=link_id,
name=link_id,
type=link_type,
enabled=True,
mac_address=mac,
mtu=mtu,
bond=bond,
vlan_id=vlan_id,
vlan_link=vlan_link)
links.append(link)
return links
@staticmethod
def _parse_dns_data(services_data):
dns_nameservers = []
for service_data in services_data:
service_type = service_data.get("type")
if service_type != NETWORK_SERVICE_TYPE_DNS:
LOG.warn("Skipping unsupported service type: %s", service_type)
continue
address = service_data.get("address")
if address is not None:
dns_nameservers.append(address)
return dns_nameservers
@staticmethod
def _parse_network_data_networks(networks_data):
networks = []
for network_data in networks_data:
network_type = network_data.get("type")
if network_type not in [NETWORK_TYPE_IPV4, NETWORK_TYPE_IPV6]:
continue
link_id = network_data.get("link")
ip_address = network_data.get("ip_address")
netmask = network_data.get("netmask")
address_cidr = BaseOpenStackService._ip_netmask_to_cidr(
ip_address, netmask)
routes = []
for route_data in network_data.get("routes", []):
gateway = route_data.get("gateway")
network = route_data.get("network")
netmask = route_data.get("netmask")
network_cidr = BaseOpenStackService._ip_netmask_to_cidr(
network, netmask)
route = network_model.Route(
network_cidr=network_cidr,
gateway=gateway
)
routes.append(route)
dns_nameservers = BaseOpenStackService._parse_dns_data(
network_data.get("services", []))
network = network_model.Network(
link=link_id,
address_cidr=address_cidr,
dns_nameservers=dns_nameservers,
routes=routes
)
networks.append(network)
return networks
@staticmethod
def _parse_network_data_services(services_data):
services = []
dns_nameservers = BaseOpenStackService._parse_dns_data(services_data)
if len(dns_nameservers):
service = network_model.NameServerService(
addresses=dns_nameservers,
search=None
)
services.append(service)
return services
def get_network_details_v2(self):
try:
network_data = self._get_network_data()
except base.NotExistingMetadataException:
LOG.info("V2 network metadata not found")
return
links = self._parse_network_data_links(
network_data.get("links", []))
networks = self._parse_network_data_networks(
network_data.get("networks", []))
services = self._parse_network_data_services(
network_data.get("services", []))
return network_model.NetworkDetailsV2(
links=links,
networks=networks,
services=services
)
def get_admin_password(self):
meta_data = self._get_meta_data()
meta = meta_data.get('meta')
if meta and 'admin_pass' in meta:
password = meta['admin_pass']
elif 'admin_pass' in meta_data:
password = meta_data['admin_pass']
else:
password = None
return password
def get_client_auth_certs(self):
"""Gather all unique certificates found among the metadata.
If there are no certificates under "meta" or "keys" field,
then try looking into user-data for this kind of information.
"""
certs = []
meta_data = self._get_meta_data()
meta = meta_data.get("meta")
if meta:
cert_data_list = []
idx = 0
while True:
# Chunking is necessary as metadata items can be
# max. 255 chars long.
cert_chunk = meta.get("admin_cert%d" % idx)
if not cert_chunk:
break
cert_data_list.append(cert_chunk)
idx += 1
if cert_data_list:
# It's a list of strings for sure.
certs.append("".join(cert_data_list))
keys = meta_data.get("keys")
if keys:
for key_dict in keys:
if key_dict["type"] == "x509":
certs.append(key_dict["data"])
if not certs:
# Look if the user_data contains a PEM certificate
try:
user_data = self.get_user_data().strip()
if user_data.startswith(
x509constants.PEM_HEADER.encode()):
certs.append(encoding.get_as_string(user_data))
except base.NotExistingMetadataException:
LOG.debug("user_data metadata not present")
return list(set((cert.strip() for cert in certs)))