fuel-plugin-neutron-vpnaas/deployment_scripts/puppet/modules/vpnaas/files/q-agent-cleanup.py

601 lines
26 KiB
Python

#!/usr/bin/env python
import re
import time
import os
import sys
import random
import string
import json
import argparse
import logging
import logging.handlers
import shlex
import subprocess
import StringIO
import socket
from neutronclient.neutron import client as q_client
from keystoneclient.v2_0 import client as ks_client
from keystoneclient.apiclient.exceptions import NotFound as ks_NotFound
LOG_NAME = 'q-agent-cleanup'
API_VER = '2.0'
PORT_ID_PART_LEN = 11
TMP_USER_NAME = 'tmp_neutron_admin'
def get_authconfig(cfg_file):
# Read OS auth config file
rv = {}
stripchars=" \'\""
with open(cfg_file) as f:
for line in f:
rg = re.match(r'\s*export\s+(\w+)\s*=\s*(.*)',line)
if rg :
#Use shlex to unescape bash shell escape characters
value = "".join(x for x in
shlex.split(rg.group(2).strip(stripchars)))
rv[rg.group(1).strip(stripchars)] = value
return rv
class NeutronCleaner(object):
PORT_NAME_PREFIXES_BY_DEV_OWNER = {
'network:dhcp': 'tap',
'network:router_gateway': 'qg-',
'network:router_interface': 'qr-',
}
PORT_NAME_PREFIXES = {
# contains tuples of prefixes
'dhcp': (PORT_NAME_PREFIXES_BY_DEV_OWNER['network:dhcp'],),
'l3': (
PORT_NAME_PREFIXES_BY_DEV_OWNER['network:router_gateway'],
PORT_NAME_PREFIXES_BY_DEV_OWNER['network:router_interface']
)
}
BRIDGES_FOR_PORTS_BY_AGENT ={
'dhcp': ('br-int',),
'l3': ('br-int', 'br-ex'),
}
PORT_OWNER_PREFIXES = {
'dhcp': ('network:dhcp',),
'l3': ('network:router_gateway', 'network:router_interface')
}
NS_NAME_PREFIXES = {
'dhcp': 'qdhcp',
'l3': 'qrouter',
}
AGENT_BINARY_NAME = {
'dhcp': 'neutron-dhcp-agent',
'l3': 'neutron-vpn-agent',
'ovs': 'neutron-openvswitch-agent'
}
CMD__list_ovs_port = ['ovs-vsctl', 'list-ports']
CMD__remove_ovs_port = ['ovs-vsctl', '--', '--if-exists', 'del-port']
CMD__remove_ip_addr = ['ip', 'address', 'delete']
CMD__ip_netns_list = ['ip', 'netns', 'list']
CMD__ip_netns_exec = ['ip', 'netns', 'exec']
RE__port_in_portlist = re.compile(r"^\s*\d+\:\s+([\w-]+)\:") # 14: tap-xxxyyyzzz:
def __init__(self, openrc, options, log=None):
self.log = log
self.auth_config = openrc
self.options = options
self.agents = {}
self.debug = options.get('debug')
self.RESCHEDULING_CALLS = {
'dhcp': self._reschedule_agent_dhcp,
'l3': self._reschedule_agent_l3,
}
self._token = None
self._keystone = None
self._client = None
self._need_cleanup_tmp_admin = False
def __del__(self):
if self._need_cleanup_tmp_admin and self._keystone and self._keystone.username:
try:
self._keystone.users.delete(self._keystone.users.find(username=self._keystone.username))
except:
# if we get exception while cleaning temporary account -- nothing harm
pass
def generate_random_passwd(self, length=13):
chars = string.ascii_letters + string.digits + '!@#$%^&*()'
random.seed = (os.urandom(1024))
return ''.join(random.choice(chars) for i in range(length))
@property
def keystone(self):
if self._keystone is None:
ret_count = self.options.get('retries', 1)
tmp_passwd = self.generate_random_passwd()
while True:
if ret_count <= 0:
self.log.error(">>> Keystone error: no more retries for connect to keystone server.")
sys.exit(1)
try:
a_token = self.options.get('auth-token')
a_url = self.options.get('admin-auth-url')
if a_token and a_url:
self.log.debug("Authentication by predefined token.")
# create keystone instance, authorized by service token
ks = ks_client.Client(
token=a_token,
endpoint=a_url,
)
service_tenant = ks.tenants.find(name='services')
auth_url = ks.endpoints.find(
service_id=ks.services.find(type='identity').id
).internalurl
# find and re-create temporary rescheduling-admin user with random password
try:
user = ks.users.find(username=TMP_USER_NAME)
ks.users.delete(user)
except ks_NotFound:
# user not found, it's OK
pass
user = ks.users.create(TMP_USER_NAME, tmp_passwd, tenant_id=service_tenant.id)
ks.roles.add_user_role(user, ks.roles.find(name='admin'), service_tenant)
# authenticate newly-created tmp neutron admin
self._keystone = ks_client.Client(
username=user.username,
password=tmp_passwd,
tenant_id=user.tenantId,
auth_url=auth_url,
)
self._need_cleanup_tmp_admin = True
else:
self.log.debug("Authentication by given credentionals.")
self._keystone = ks_client.Client(
username=self.auth_config['OS_USERNAME'],
password=self.auth_config['OS_PASSWORD'],
tenant_name=self.auth_config['OS_TENANT_NAME'],
auth_url=self.auth_config['OS_AUTH_URL'],
)
break
except Exception as e:
errmsg = str(e.message).strip() # str() need, because keystone may use int as message in exception
if re.search(r"Connection\s+refused$", errmsg, re.I) or \
re.search(r"Connection\s+timed\s+out$", errmsg, re.I) or\
re.search(r"Lost\s+connection\s+to\s+MySQL\s+server", errmsg, re.I) or\
re.search(r"Service\s+Unavailable$", errmsg, re.I) or\
re.search(r"'*NoneType'*\s+object\s+has\s+no\s+attribute\s+'*__getitem__'*$", errmsg, re.I) or \
re.search(r"No\s+route\s+to\s+host$", errmsg, re.I):
self.log.info(">>> Can't connect to {0}, wait for server ready...".format(self.auth_config['OS_AUTH_URL']))
time.sleep(self.options.sleep)
else:
self.log.error(">>> Keystone error:\n{0}".format(e.message))
raise e
ret_count -= 1
return self._keystone
@property
def token(self):
if self._token is None:
self._token = self._keystone.auth_token
#self.log.debug("Auth_token: '{0}'".format(self._token))
#todo: Validate existing token
return self._token
@property
def client(self):
if self._client is None:
self._client = q_client.Client(
API_VER,
endpoint_url=self.keystone.endpoints.find(
service_id=self.keystone.services.find(type='network').id
).adminurl,
token=self.token,
)
return self._client
def _neutron_API_call(self, method, *args):
ret_count = self.options.get('retries')
while True:
if ret_count <= 0:
self.log.error("Q-server error: no more retries for connect to server.")
return []
try:
rv = method (*args)
break
except Exception as e:
errmsg = str(e.message).strip()
if re.search(r"Connection\s+refused", errmsg, re.I) or\
re.search(r"Connection\s+timed\s+out", errmsg, re.I) or\
re.search(r"Lost\s+connection\s+to\s+MySQL\s+server", errmsg, re.I) or\
re.search(r"503\s+Service\s+Unavailable", errmsg, re.I) or\
re.search(r"No\s+route\s+to\s+host", errmsg, re.I):
self.log.info("Can't connect to {0}, wait for server ready...".format(self.keystone.service_catalog.url_for(service_type='network')))
time.sleep(self.options.sleep)
else:
self.log.error("Neutron error:\n{0}".format(e.message))
raise e
ret_count -= 1
return rv
def _get_agents(self,use_cache=True):
return self._neutron_API_call(self.client.list_agents)['agents']
def _get_routers(self, use_cache=True):
return self._neutron_API_call(self.client.list_routers)['routers']
def _get_networks(self, use_cache=True):
return self._neutron_API_call(self.client.list_networks)['networks']
def _list_networks_on_dhcp_agent(self, agent_id):
return self._neutron_API_call(self.client.list_networks_on_dhcp_agent, agent_id)['networks']
def _list_routers_on_l3_agent(self, agent_id):
return self._neutron_API_call(self.client.list_routers_on_l3_agent, agent_id)['routers']
def _list_l3_agents_on_router(self, router_id):
return self._neutron_API_call(self.client.list_l3_agent_hosting_routers, router_id)['agents']
def _list_dhcp_agents_on_network(self, network_id):
return self._neutron_API_call(self.client.list_dhcp_agent_hosting_networks, network_id)['agents']
def _list_orphaned_networks(self):
networks = self._get_networks()
self.log.debug("_list_orphaned_networks:, got list of networks {0}".format(json.dumps(networks,indent=4)))
orphaned_networks = []
for network in networks:
if len(self._list_dhcp_agents_on_network(network['id'])) == 0:
orphaned_networks.append(network['id'])
self.log.debug("_list_orphaned_networks:, got list of orphaned networks {0}".format(orphaned_networks))
return orphaned_networks
def _list_orphaned_routers(self):
routers = self._get_routers()
self.log.debug("_list_orphaned_routers:, got list of routers {0}".format(json.dumps(routers,indent=4)))
orphaned_routers = []
for router in routers:
if len(self._list_l3_agents_on_router(router['id'])) == 0:
orphaned_routers.append(router['id'])
self.log.debug("_list_orphaned_routers:, got list of orphaned routers {0}".format(orphaned_routers))
return orphaned_routers
def _add_network_to_dhcp_agent(self, agent_id, net_id):
return self._neutron_API_call(self.client.add_network_to_dhcp_agent, agent_id, {"network_id": net_id})
def _add_router_to_l3_agent(self, agent_id, router_id):
return self._neutron_API_call(self.client.add_router_to_l3_agent, agent_id, {"router_id": router_id})
def _remove_router_from_l3_agent(self, agent_id, router_id):
return self._neutron_API_call(self.client.remove_router_from_l3_agent, agent_id, router_id)
def _get_agents_by_type(self, agent, use_cache=True):
self.log.debug("_get_agents_by_type: start.")
rv = self.agents.get(agent, []) if use_cache else []
if not rv:
agents = self._get_agents(use_cache=use_cache)
for i in agents:
if i['binary'] == self.AGENT_BINARY_NAME.get(agent):
rv.append(i)
from_cache = ''
else:
from_cache = ' from local cache'
self.log.debug("_get_agents_by_type: end, {0} rv: {1}".format(from_cache, json.dumps(rv, indent=4)))
return rv
def __collect_namespaces_for_agent(self, agent):
cmd = self.CMD__ip_netns_list[:]
self.log.debug("Execute command '{0}'".format(' '.join(cmd)))
process = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
rc = process.wait()
if rc != 0:
self.log.error("ERROR (rc={0}) while execution {1}".format(rc, ' '.join(cmd)))
return []
# filter namespaces by given agent type
netns = []
stdout = process.communicate()[0]
for ns in StringIO.StringIO(stdout):
ns = ns.strip()
self.log.debug("Found network namespace '{0}'".format(ns))
if ns.startswith("{0}-".format(self.NS_NAME_PREFIXES[agent])):
netns.append(ns)
return netns
def __collect_ports_for_namespace(self, ns):
cmd = self.CMD__ip_netns_exec[:]
cmd.extend([ns, 'ip', 'l', 'show'])
self.log.debug("Execute command '{0}'".format(' '.join(cmd)))
process = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
rc = process.wait()
if rc != 0:
self.log.error("ERROR (rc={0}) while execution {1}".format(rc, ' '.join(cmd)))
return []
ports = []
stdout = process.communicate()[0]
for line in StringIO.StringIO(stdout):
pp = self.RE__port_in_portlist.match(line)
if not pp:
continue
port = pp.group(1)
if port != 'lo':
self.log.debug("Found port '{0}'".format(port))
ports.append(port)
return ports
def _cleanup_ports(self, agent):
self.log.debug("_cleanup_ports: start.")
# get namespaces list
netns = self.__collect_namespaces_for_agent(agent)
# collect ports from namespace
ports = []
for ns in netns:
ports.extend(self.__collect_ports_for_namespace(ns))
# iterate by port_list and remove port from OVS
for port in ports:
cmd = self.CMD__remove_ovs_port[:]
cmd.append(port)
if self.options.get('noop'):
self.log.info("NOOP-execution: '{0}'".format(' '.join(cmd)))
else:
self.log.debug("Execute command '{0}'".format(' '.join(cmd)))
process = subprocess.Popen(
cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
rc = process.wait()
if rc != 0:
self.log.error("ERROR (rc={0}) while execution {1}".format(rc, ' '.join(cmd)))
self.log.debug("_cleanup_ports: end.")
return True
def _reschedule_agent_dhcp(self, agent_type):
self.log.debug("_reschedule_agent_dhcp: start.")
agents = {
'alive': [],
'dead': []
}
# collect networklist from dead DHCP-agents
dead_networks = []
for agent in self._get_agents_by_type(agent_type):
if agent['alive']:
self.log.info("found alive DHCP agent: {0}".format(agent['id']))
agents['alive'].append(agent)
else:
# dead agent
self.log.info("found dead DHCP agent: {0}".format(agent['id']))
agents['dead'].append(agent)
for net in self._list_networks_on_dhcp_agent(agent['id']):
dead_networks.append(net)
if dead_networks and agents['alive']:
# get network-ID list of already attached to alive agent networks
lucky_ids = set()
map(
lambda net: lucky_ids.add(net['id']),
self._list_networks_on_dhcp_agent(agents['alive'][0]['id'])
)
# add dead networks to alive agent
for net in dead_networks:
if net['id'] not in lucky_ids:
# attach network to agent
self.log.info("attach network {net} to DHCP agent {agent}".format(
net=net['id'],
agent=agents['alive'][0]['id']
))
if not self.options.get('noop'):
self._add_network_to_dhcp_agent(agents['alive'][0]['id'], net['id'])
#if error:
# return
# remove dead agents if need (and if found alive agent)
if self.options.get('remove-dead'):
for agent in agents['dead']:
self.log.info("remove dead DHCP agent: {0}".format(agent['id']))
if not self.options.get('noop'):
self._neutron_API_call(self.client.delete_agent, agent['id'])
orphaned_networks=self._list_orphaned_networks()
self.log.info("_reschedule_agent_dhcp: rescheduling orphaned networks")
if orphaned_networks and agents['alive']:
for network in orphaned_networks:
self.log.info("_reschedule_agent_dhcp: rescheduling {0} to {1}".format(network,agents['alive'][0]['id']))
if not self.options.get('noop'):
self._add_network_to_dhcp_agent(agents['alive'][0]['id'], network)
self.log.info("_reschedule_agent_dhcp: ended rescheduling of orphaned networks")
self.log.debug("_reschedule_agent_dhcp: end.")
def _reschedule_agent_l3(self, agent_type):
self.log.debug("_reschedule_agent_l3: start.")
agents = {
'alive': [],
'dead': []
}
# collect router-list from dead DHCP-agents
dead_routers = [] # array of tuples (router, agentID)
for agent in self._get_agents_by_type(agent_type):
if agent['alive']:
self.log.info("found alive L3 agent: {0}".format(agent['id']))
agents['alive'].append(agent)
else:
# dead agent
self.log.info("found dead L3 agent: {0}".format(agent['id']))
agents['dead'].append(agent)
map(
lambda rou: dead_routers.append((rou, agent['id'])),
self._list_routers_on_l3_agent(agent['id'])
)
self.log.debug("L3 agents in cluster: {ags}".format(ags=json.dumps(agents, indent=4)))
self.log.debug("Routers, attached to dead L3 agents: {rr}".format(rr=json.dumps(dead_routers, indent=4)))
if dead_routers and agents['alive']:
# get router-ID list of already attached to alive agent routerss
lucky_ids = set()
map(
lambda rou: lucky_ids.add(rou['id']),
self._list_routers_on_l3_agent(agents['alive'][0]['id'])
)
# remove dead agents after rescheduling
for agent in agents['dead']:
self.log.info("remove dead L3 agent: {0}".format(agent['id']))
if not self.options.get('noop'):
self._neutron_API_call(self.client.delete_agent, agent['id'])
# move routers from dead to alive agent
for rou in filter(lambda rr: not(rr[0]['id'] in lucky_ids), dead_routers):
# self.log.info("unschedule router {rou} from L3 agent {agent}".format(
# rou=rou[0]['id'],
# agent=rou[1]
# ))
# if not self.options.get('noop'):
# self._remove_router_from_l3_agent(rou[1], rou[0]['id'])
# #todo: if error:
# #
self.log.info("schedule router {rou} to L3 agent {agent}".format(
rou=rou[0]['id'],
agent=agents['alive'][0]['id']
))
if not self.options.get('noop'):
self._add_router_to_l3_agent(agents['alive'][0]['id'], rou[0]['id'])
orphaned_routers=self._list_orphaned_routers()
self.log.info("_reschedule_agent_l3: rescheduling orphaned routers")
if orphaned_routers and agents['alive']:
for router in orphaned_routers:
self.log.info("_reschedule_agent_l3: rescheduling {0} to {1}".format(router,agents['alive'][0]['id']))
if not self.options.get('noop'):
self._add_router_to_l3_agent(agents['alive'][0]['id'], router)
self.log.info("_reschedule_agent_l3: ended rescheduling of orphaned routers")
self.log.debug("_reschedule_agent_l3: end.")
def _remove_self(self,agent_type):
self.log.debug("_remove_self: start.")
for agent in self._get_agents_by_type(agent_type):
if agent['host'] == socket.gethostname():
self.log.info("_remove_self: deleting our own agent {0} of type {1}".format(agent['id'],agent_type))
if not self.options.get('noop'):
self._neutron_API_call(self.client.delete_agent, agent['id'])
self.log.debug("_remove_self: end.")
def _reschedule_agent(self, agent):
self.log.debug("_reschedule_agents: start.")
task = self.RESCHEDULING_CALLS.get(agent, None)
if task:
task (agent)
self.log.debug("_reschedule_agents: end.")
def do(self, agent):
if self.options.get('cleanup-ports'):
self._cleanup_ports(agent)
if self.options.get('reschedule'):
self._reschedule_agent(agent)
if self.options.get('remove-self'):
self._remove_self(agent)
# if self.options.get('remove-agent'):
# self._cleanup_agents(agent)
def _test_healthy(self, agent_list, hostname):
rv = False
for agent in agent_list:
if agent['host'] == hostname and agent['alive']:
return True
return rv
def test_healthy(self, agent_type):
rc = 9 # OCF_FAILED_MASTER, http://www.linux-ha.org/doc/dev-guides/_literal_ocf_failed_master_literal_9.html
agentlist = self._get_agents_by_type(agent_type)
for hostname in self.options.get('test-hostnames'):
if self._test_healthy(agentlist, hostname):
return 0
return rc
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Neutron network node cleaning tool.')
parser.add_argument("-c", "--auth-config", dest="authconf", default="/root/openrc",
help="Authenticating config FILE", metavar="FILE")
parser.add_argument("-t", "--auth-token", dest="auth-token", default=None,
help="Authenticating token (instead username/passwd)", metavar="TOKEN")
parser.add_argument("-u", "--admin-auth-url", dest="admin-auth-url", default=None,
help="Authenticating URL (admin)", metavar="URL")
parser.add_argument("--retries", dest="retries", type=int, default=50,
help="try NN retries for API call", metavar="NN")
parser.add_argument("--sleep", dest="sleep", type=int, default=2,
help="sleep seconds between retries", metavar="SEC")
parser.add_argument("-a", "--agent", dest="agent", action="append",
help="specyfy agents for cleaning", required=True)
parser.add_argument("--cleanup-ports", dest="cleanup-ports", action="store_true", default=False,
help="cleanup ports for given agents on this node")
parser.add_argument("--remove-self", dest="remove-self", action="store_true", default=False,
help="remove ourselves from agent list")
parser.add_argument("--activeonly", dest="activeonly", action="store_true", default=False,
help="cleanup only active ports")
parser.add_argument("--reschedule", dest="reschedule", action="store_true", default=False,
help="reschedule given agents")
parser.add_argument("--remove-dead", dest="remove-dead", action="store_true", default=False,
help="remove dead agents while rescheduling")
parser.add_argument("--test-alive-for-hostname", dest="test-hostnames", action="append",
help="testing agent's healthy for given hostname")
parser.add_argument("--external-bridge", dest="external-bridge", default="br-ex",
help="external bridge name", metavar="IFACE")
parser.add_argument("--integration-bridge", dest="integration-bridge", default="br-int",
help="integration bridge name", metavar="IFACE")
parser.add_argument("-l", "--log", dest="log", action="store",
help="log file or logging.conf location")
parser.add_argument("--noop", dest="noop", action="store_true", default=False,
help="do not execute, print to log instead")
parser.add_argument("--debug", dest="debug", action="store_true", default=False,
help="debug")
args = parser.parse_args()
# if len(args) != 1:
# parser.error("incorrect number of arguments")
# parser.print_help() args = parser.parse_args()
#setup logging
if args.debug:
_log_level = logging.DEBUG
else:
_log_level = logging.INFO
if not args.log:
# log config or file not given -- log to console
LOG = logging.getLogger(LOG_NAME) # do not move to UP of file
_log_handler = logging.StreamHandler(sys.stdout)
_log_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
LOG.addHandler(_log_handler)
LOG.setLevel(_log_level)
elif args.log.split(os.sep)[-1] == 'logging.conf':
# setup logging by external file
import logging.config
logging.config.fileConfig(args.log)
LOG = logging.getLogger(LOG_NAME) # do not move to UP of file
else:
# log to given file
LOG = logging.getLogger(LOG_NAME) # do not move to UP of file
LOG.addHandler(logging.handlers.WatchedFileHandler(args.log))
LOG.setLevel(_log_level)
LOG.info("Started: {0}".format(' '.join(sys.argv)))
cleaner = NeutronCleaner(get_authconfig(args.authconf), options=vars(args), log=LOG)
rc = 0
if vars(args).get('test-hostnames'):
rc = cleaner.test_healthy(args.agent[0])
else:
for i in args.agent:
cleaner.do(i)
LOG.debug("End.")
sys.exit(rc)
#
###