Add periodic task for cleaning up dead ports.
Sometimes it happen, that during ports creation, there could be quota violations, so that port could be created without tags, and would just hanging there. This periodic task would remove all the dead ports. Change-Id: If646cb3bf00aca387c769fafe2f73a8194642f69
This commit is contained in:
parent
d49add94df
commit
bc8ba2bc17
|
@ -172,6 +172,10 @@ class KuryrK8sService(service.Service, periodic_task.PeriodicTasks,
|
|||
for handler in self.handlers:
|
||||
handler.reconcile()
|
||||
|
||||
@periodic_task.periodic_task(spacing=90, run_immediately=False)
|
||||
def cleanup_dead_ports(self, context):
|
||||
utils.cleanup_dead_ports()
|
||||
|
||||
|
||||
def start():
|
||||
urllib3.disable_warnings()
|
||||
|
|
|
@ -17,6 +17,7 @@ import munch
|
|||
from openstack import exceptions as os_exc
|
||||
from os_vif import objects
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
|
@ -30,6 +31,10 @@ CONF = cfg.CONF
|
|||
|
||||
class TestUtils(test_base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
cfg.CONF.set_override('resource_tags', [], group='neutron_defaults')
|
||||
|
||||
@mock.patch('socket.gethostname')
|
||||
def test_get_node_name(self, m_gethostname):
|
||||
m_gethostname.return_value = 'foo'
|
||||
|
@ -500,3 +505,72 @@ class TestUtils(test_base.TestCase):
|
|||
def test_is_pod_completed_failed(self):
|
||||
self.assertTrue(utils.is_pod_completed({'status': {'phase':
|
||||
k_const.K8S_POD_STATUS_FAILED}}))
|
||||
|
||||
@mock.patch('kuryr_kubernetes.clients.get_network_client')
|
||||
def test_cleanup_dead_ports_no_tags(self, m_get_net):
|
||||
utils.cleanup_dead_ports()
|
||||
m_get_net.assert_not_called()
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||
@mock.patch('kuryr_kubernetes.clients.get_network_client')
|
||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||
def test_cleanup_dead_ports(self, m_get_k8s, m_get_net, m_utcnow):
|
||||
cfg.CONF.set_override('resource_tags', ['foo'],
|
||||
group='neutron_defaults')
|
||||
m_net = mock.Mock()
|
||||
time1 = '2022-04-14T09:00:00Z'
|
||||
now = '2022-04-14T09:00:00Z'
|
||||
m_utcnow.return_value = timeutils.parse_isotime(now)
|
||||
port = munch.Munch({'updated_at': time1, 'tags': ['foo']})
|
||||
m_net.ports.return_value = iter((port,))
|
||||
m_get_net.return_value = m_net
|
||||
|
||||
m_k8s = mock.Mock()
|
||||
m_k8s.get.return_value = {'items': [{'status': {'netId': 'netid'}}]}
|
||||
m_get_k8s.return_value = m_k8s
|
||||
|
||||
utils.cleanup_dead_ports()
|
||||
|
||||
m_get_net.assert_called_once()
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.utcnow')
|
||||
@mock.patch('kuryr_kubernetes.clients.get_network_client')
|
||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||
def test_cleanup_dead_no_tagged_ports(self, m_get_k8s, m_get_net,
|
||||
m_utcnow):
|
||||
cfg.CONF.set_override('resource_tags', ['foo'],
|
||||
group='neutron_defaults')
|
||||
m_net = mock.Mock()
|
||||
time1 = '2022-04-14T09:00:00Z'
|
||||
now = '2022-04-14T09:16:00Z'
|
||||
m_utcnow.return_value = timeutils.parse_isotime(now)
|
||||
port = munch.Munch({'updated_at': time1, 'tags': []})
|
||||
m_net.ports.return_value = iter((port,))
|
||||
m_get_net.return_value = m_net
|
||||
|
||||
m_k8s = mock.Mock()
|
||||
m_k8s.get.return_value = {'items': [{'status': {'netId': 'netid'}}]}
|
||||
m_get_k8s.return_value = m_k8s
|
||||
|
||||
utils.cleanup_dead_ports()
|
||||
|
||||
m_get_net.assert_called_once()
|
||||
m_net.delete_port.assert_called_once_with(port)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.clients.get_network_client')
|
||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||
def test_cleanup_dead_no_networks(self, m_get_k8s, m_get_net):
|
||||
cfg.CONF.set_override('resource_tags', ['foo'],
|
||||
group='neutron_defaults')
|
||||
m_net = mock.Mock()
|
||||
m_net.ports.return_value = iter([])
|
||||
m_get_net.return_value = m_net
|
||||
|
||||
m_k8s = mock.Mock()
|
||||
m_k8s.get.return_value = {'items': []}
|
||||
m_get_k8s.return_value = m_k8s
|
||||
|
||||
utils.cleanup_dead_ports()
|
||||
|
||||
m_get_net.assert_called_once()
|
||||
m_net.delete_port.assert_not_called()
|
||||
|
|
|
@ -20,12 +20,14 @@ import time
|
|||
import requests
|
||||
|
||||
from kuryr.lib._i18n import _
|
||||
from kuryr.lib import constants as kl_const
|
||||
from openstack import exceptions as os_exc
|
||||
from os_vif import objects
|
||||
from oslo_cache import core as cache
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes import constants
|
||||
|
@ -50,6 +52,8 @@ DEFAULT_INTERVAL = 1
|
|||
DEFAULT_JITTER = 3
|
||||
MAX_BACKOFF = 60
|
||||
MAX_ATTEMPTS = 10
|
||||
ZOMBIE_AGE = 600
|
||||
|
||||
|
||||
subnet_caching_opts = [
|
||||
cfg.BoolOpt('caching', default=True,
|
||||
|
@ -727,3 +731,43 @@ def get_referenced_object(obj, kind):
|
|||
except exceptions.K8sClientException:
|
||||
LOG.debug('Error when fetching %s to add an event %s, ignoring',
|
||||
kind, get_res_unique_name(obj))
|
||||
|
||||
|
||||
def cleanup_dead_ports():
|
||||
tags = set(CONF.neutron_defaults.resource_tags)
|
||||
if not tags:
|
||||
# NOTE(gryf): there is no reliable way for removing kuryr-related
|
||||
# ports if there are no tags enabled - without tags there is a chance,
|
||||
# that ports are down, created by someone/something else and would
|
||||
# be deleted.
|
||||
# Perhaps a be better idea to would be to have some mark in other
|
||||
# field during port creation to identify "our" ports.
|
||||
return
|
||||
|
||||
os_net = clients.get_network_client()
|
||||
k8s = clients.get_kubernetes_client()
|
||||
|
||||
try:
|
||||
crds = k8s.get(constants.K8S_API_CRD_KURYRNETWORKS)
|
||||
except exceptions.K8sClientException as ex:
|
||||
LOG.exception('Error fetching KuryrNetworks: %s', ex)
|
||||
return
|
||||
|
||||
for item in crds['items']:
|
||||
network_id = item.get('status', {}).get('netId')
|
||||
if not network_id:
|
||||
continue
|
||||
|
||||
for port in os_net.ports(status='DOWN', network_id=network_id,
|
||||
device_owner=kl_const.DEVICE_OWNER,
|
||||
not_tags=list(tags)):
|
||||
now = timeutils.utcnow(True)
|
||||
port_time = timeutils.parse_isotime(port.updated_at)
|
||||
# NOTE(gryf): if port hanging more than 10 minutes already in DOWN
|
||||
# state, consider it as a dead one.
|
||||
if (now - port_time).seconds > ZOMBIE_AGE:
|
||||
try:
|
||||
os_net.delete_port(port)
|
||||
except os_exc.SDKException as ex:
|
||||
LOG.warning('There was an issue with port "%s" '
|
||||
'removal: %s', port, ex)
|
||||
|
|
Loading…
Reference in New Issue