Add test for k8s network checker

Created new system test for the mcp-netchecker-server
and mcp-netchecker-agent projects.

Change-Id: I9ba91e78ae1f39d874560ec43374b4354adeb563
This commit is contained in:
Artem Panchenko 2016-08-26 16:48:46 +03:00
parent 3b20f63587
commit fe0518443e
4 changed files with 430 additions and 6 deletions

View File

@ -31,7 +31,7 @@ class K8sPod(K8sBaseResource):
def phase(self):
return self.status.phase
def wait_phase(self, phase, timeout=60):
def wait_phase(self, phase, timeout=60, interval=5):
"""Wait phase of pod_name from namespace while timeout
:param list or str: phase
@ -46,15 +46,15 @@ class K8sPod(K8sBaseResource):
self._add_details(self._manager.get(name=self.name))
return self.phase in phase
helpers.wait(check, timeout=timeout,
helpers.wait(check, timeout=timeout, interval=interval,
timeout_msg='Timeout waiting({timeout}s), pod {pod_name} '
'is not in "{phase}" phase'.format(
timeout=timeout,
pod_name=self.name,
phase=phase))
def wait_running(self, timeout=60):
self.wait_phase(['Running'], timeout=timeout)
def wait_running(self, timeout=60, interval=5):
self.wait_phase(['Running'], timeout=timeout, interval=interval)
class K8sPodManager(K8sBaseManager):

View File

@ -121,3 +121,10 @@ CCP_DEFAULT_GLOBALS = {
"neutron_external_interface": "eth2"
}
}
NETCHECKER_SERVER_DIR = os.environ.get(
'NETCHECKER_SERVER_DIR', os.path.join(os.getcwd(), 'mcp-netchecker-server')
)
NETCHECKER_AGENT_DIR = os.environ.get(
'NETCHECKER_AGENT_DIR', os.path.join(os.getcwd(), 'mcp-netchecker-agent')
)

View File

@ -14,6 +14,8 @@
import yaml
from devops.helpers.helpers import wait
from fuel_ccp_tests import logger
LOG = logger.logger
@ -70,11 +72,21 @@ class SystemBaseTest(object):
LOG.debug("Timeout for creation is set to {}".format(timeout))
LOG.debug("Checking interval is set to {}".format(interval))
pod = k8sclient.pods.create(body=body)
pod.wait_running()
pod.wait_running(timeout=300, interval=5)
LOG.info("Pod '{}' is created".format(pod.metadata.name))
return k8sclient.pods.get(name=pod.metadata.name)
def check_pod_delete(self, k8s_pod, k8sclient):
@staticmethod
def wait_pod_deleted(k8sclient, podname, timeout=60, interval=5):
wait(
lambda: podname not in [pod.name for pod in k8sclient.pods.list()],
timeout=timeout,
interval=interval,
timeout_msg="Pod deletion timeout reached!"
)
@staticmethod
def check_pod_delete(k8s_pod, k8sclient, timeout=300, interval=5):
"""Deleting pod from k8s
:param k8s_pod: fuel_ccp_tests.managers.k8s.nodes.K8sNode
@ -82,9 +94,47 @@ class SystemBaseTest(object):
"""
LOG.info("Deleting pod '{}'".format(k8s_pod.name))
LOG.debug("Pod status:\n{}".format(k8s_pod.status))
LOG.debug("Timeout for deletion is set to {}".format(timeout))
LOG.debug("Checking interval is set to {}".format(interval))
k8sclient.pods.delete(body=k8s_pod, name=k8s_pod.name)
SystemBaseTest.wait_pod_deleted(k8sclient, k8s_pod.name, timeout,
interval)
LOG.debug("Pod '{}' is deleted".format(k8s_pod.name))
@staticmethod
def check_service_create(body, k8sclient):
"""Check creating k8s service
:param body: dict, service spec
:param k8sclient: K8sCluster object
:rtype: K8sService object
"""
LOG.info("Creating service in k8s cluster")
LOG.debug(
"Service spec to create:\n{}".format(
yaml.dump(body, default_flow_style=False))
)
service = k8sclient.services.create(body=body)
LOG.info("Service '{}' is created".format(service.metadata.name))
return k8sclient.services.get(name=service.metadata.name)
@staticmethod
def check_ds_create(body, k8sclient):
"""Check creating k8s DaemonSet
:param body: dict, DaemonSet spec
:param k8sclient: K8sCluster object
:rtype: K8sDaemonSet object
"""
LOG.info("Creating DaemonSet in k8s cluster")
LOG.debug(
"DaemonSet spec to create:\n{}".format(
yaml.dump(body, default_flow_style=False))
)
ds = k8sclient.daemonsets.create(body=body)
LOG.info("DaemonSet '{}' is created".format(ds.metadata.name))
return k8sclient.daemonsets.get(name=ds.metadata.name)
def check_number_kube_nodes(self, underlay, k8sclient):
"""Check number of slaves"""
LOG.info("Check number of nodes")

View File

@ -0,0 +1,367 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import pytest
import random
import time
import os
import re
import yaml
from devops.helpers.helpers import wait, wait_pass
from k8sclient.client.rest import ApiException
from base_test import SystemBaseTest
from fuel_ccp_tests import logger
from fuel_ccp_tests import settings
from fuel_ccp_tests.helpers import ext
LOG = logger.logger
@pytest.fixture(scope='class')
def check_netchecker_files(request):
files_missing = []
for arg in request.cls.netchecker_files:
if not os.path.isfile(arg):
files_missing.append(arg)
assert len(files_missing) == 0, \
("The following netchecker files not found: "
"{0}!".format(', '.join(files_missing)))
class TestFuelCCPNetCheckerMixin:
pod_yaml_file = os.path.join(
settings.NETCHECKER_SERVER_DIR,
'k8s_resources/netchecker-server_pod.yaml')
svc_yaml_file = os.path.join(
settings.NETCHECKER_SERVER_DIR,
'k8s_resources/netchecker-server_svc.yaml')
ds_yaml_file = os.path.join(
settings.NETCHECKER_AGENT_DIR, 'netchecker-agent.yaml')
netchecker_files = (pod_yaml_file, svc_yaml_file, ds_yaml_file)
@pytest.mark.usefixtures("check_netchecker_files")
class TestFuelCCPNetChecker(SystemBaseTest, TestFuelCCPNetCheckerMixin):
"""Test class for network connectivity verification in k8s"""
@staticmethod
def dir_upload(underlay, host, source, destination):
with underlay.remote(node_name=host) as remote:
remote.upload(source, destination)
@staticmethod
def get_ds_status(k8sclient, dsname):
ds = k8sclient.daemonsets.get(name=dsname)
return (ds.status.current_number_scheduled ==
ds.status.desired_number_scheduled)
@staticmethod
def wait_ds_running(k8sclient, dsname, timeout=60, interval=5):
wait(
lambda: TestFuelCCPNetChecker.get_ds_status(k8sclient, dsname),
timeout=timeout, interval=interval)
@staticmethod
def build_netchecker(underlay, stype, source_dir):
if stype == 'agent':
source_dir = '/'.join((source_dir, 'docker'))
underlay.sudo_check_call(
'cd {0} && docker build -t 127.0.0.1:31500/netchecker/'
'{1}:latest .'.format(source_dir, stype),
node_name='master')
@staticmethod
def push_netchecker(underlay, stype, registry='127.0.0.1:31500'):
underlay.sudo_check_call(
'docker push {0}/netchecker/{1}:latest'.format(registry, stype),
node_name='master')
def start_netchecker_server(self, k8sclient):
with open(self.pod_yaml_file) as pod_conf:
for pod_spec in yaml.load_all(pod_conf):
try:
if k8sclient.pods.get(name=pod_spec['metadata']['name']):
LOG.debug('Network checker server pod {} is '
'already running! Skipping resource creation'
'.'.format(pod_spec['metadata']['name']))
continue
except ApiException as e:
if e.status == 404:
self.check_pod_create(body=pod_spec,
k8sclient=k8sclient)
else:
raise e
with open(self.svc_yaml_file) as svc_conf:
for svc_spec in yaml.load_all(svc_conf):
try:
if k8sclient.services.get(
name=svc_spec['metadata']['name']):
LOG.debug('Network checker server pod {} is '
'already running! Skipping resource creation'
'.'.format(svc_spec['metadata']['name']))
continue
except ApiException as e:
if e.status == 404:
self.check_service_create(body=svc_spec,
k8sclient=k8sclient)
else:
raise e
def start_netchecker_agent(self, underlay, k8sclient):
# TODO(apanchenko): use python API client here when it will have
# TODO(apanchenko): needed functionality (able work with labels)
underlay.sudo_check_call(
"kubectl get nodes | awk '/Ready/{print $1}' | "
"xargs -I {} kubectl label nodes {} netchecker=agent --overwrite",
node_name='master')
with open(self.ds_yaml_file) as ds_conf:
for daemon_set_spec in yaml.load_all(ds_conf):
self.check_ds_create(body=daemon_set_spec,
k8sclient=k8sclient)
self.wait_ds_running(
k8sclient,
dsname=daemon_set_spec['metadata']['name'])
@staticmethod
def get_netchecker_status(underlay):
raw_status = underlay.sudo_check_call(
'curl -m 5 -s localhost:31081/api/v1/agents/',
node_name='master').stdout
return json.loads(''.join(raw_status))
@staticmethod
def wait_netchecker_running(underlay, timeout=60, interval=5):
wait_pass(
lambda: TestFuelCCPNetChecker.get_netchecker_status(underlay),
timeout=timeout, interval=interval)
@staticmethod
def get_netchecker_pods(k8sclient):
all_pods = k8sclient.pods.list()
return len([p for p in all_pods
if 'netchecker-agent' in p.metadata.name])
def check_network(self, underlay, k8sclient, works=True):
assert (self.get_netchecker_pods(k8sclient) ==
len(self.get_netchecker_status(underlay))) == works
@staticmethod
def get_random_slave(underlay):
slave_nodes = [n for n in underlay.node_names() if n != 'master']
if not slave_nodes:
return None
random.shuffle(slave_nodes)
return slave_nodes.pop()
@staticmethod
def block_traffic_on_slave(underlay, slave_node):
LOG.info('Blocked traffic to the network checker service from '
'containers on node "{}".'.format(slave_node))
underlay.sudo_check_call(
'iptables -A FORWARD -p tcp --dport 8081 -j DROP',
node_name=slave_node)
@staticmethod
def unblock_traffic_on_slave(underlay, slave_node):
LOG.info('Unblocked traffic to the network checker service from '
'containers on node "{}".'.format(slave_node))
underlay.sudo_check_call(
'iptables -D FORWARD -p tcp --dport 8081 -j DROP',
node_name=slave_node)
@staticmethod
def parse_test_doc(docstring):
test_case = {}
parse_regex = re.compile(r'(?P<title>^(.*\S.*\n)+)+'
r'(?P<empty_line1>\s*\n)'
r'\s*Scenario:\s*\n(?P<scenario>(.+\n)+)'
r'(?P<empty_line2>\s*(\n|$))?'
r'(\s*Duration:\s+(?P<duration>\d+).*\n)?')
doc_match = re.match(parse_regex, docstring)
if not doc_match:
LOG.error("Can't parse test docstring, unknown format!")
return test_case
test_case['title'] = re.sub(r'[\n\s]+', # replace multiple spaces and
' ', # line breaks by single space
doc_match.group('title')
).strip()
test_case['steps'] = []
for raw_step in re.split(r'\s+\d+\.\s*', doc_match.group('scenario')):
if not raw_step:
# start or end of the string
continue
test_case['steps'].append(
re.sub(r'[\n\s]+', # replace multiple spaces and
' ', # line breaks by single space
raw_step
).strip()
)
# TODO(apanchenko): now it works only with 'seconds'
duration = doc_match.group('duration') or 1000
test_case['duration'] = int(duration)
return test_case
@staticmethod
def show_step(func, step_num):
if not func.__doc__:
LOG.error("Can't show step #{0}: docstring for method {1} not "
"found!".format(step_num, func.__name__))
test_case_steps = TestFuelCCPNetChecker.parse_test_doc(
func.__doc__)['steps']
try:
LOG.info(" *** [STEP#{0}] {1} ***".format(
step_num,
test_case_steps[step_num - 1]))
except IndexError:
LOG.error("Can't show step #{0}: docstring for method {1} does't "
"contain it!".format(step_num, func.__name__))
@pytest.mark.fail_snapshot
@pytest.mark.snapshot_needed
@pytest.mark.revert_snapshot(ext.SNAPSHOT.k8s_deployed)
def test_k8s_netchecker_calico(self, underlay, k8scluster):
"""Test for deploying an k8s environment with Calico and check
connectivity between its networks
Scenario:
1. Install k8s.
2. Create docker registry service
3. Upload local copy of the 'mcp-netchecker-server' repository
to the kubernetes master node via SSH(SFTP)
4. Build docker image with netchecker-server
5. Push the image with netchecker-server to the registry
6. Go to kubernetes master node via SSH and upload local copy of
the 'mcp-netchecker-agent' repository to the remote directory
7. Build docker image with netchecker-agent
8. Push the image with netchecker-agent to the registry
9. Run netchecker-server service
10. Run netchecker-agent replication cluster
11. Get network verification status. Check status is 'OK'
12. Randomly choose some slave, login to it via SSH, add blocking
iptables rule. Restart network checker server
13. Get network verification status, Check status is 'FAIL'
14. Recover iptables state on the slave
15. Get network verification status. Check status is 'OK'
Duration: 600 seconds
"""
me = self.test_k8s_netchecker_calico
# STEP #1
self.show_step(me, 1)
k8sclient = k8scluster.api
# STEP #2
self.show_step(me, 2)
k8scluster.create_registry()
# STEP #3
self.show_step(me, 3)
self.dir_upload(underlay,
host='master',
source=settings.NETCHECKER_SERVER_DIR,
destination='/tmp/mcp-netchecker-server')
# STEP #4
self.show_step(me, 4)
self.build_netchecker(underlay,
stype='server',
source_dir='/tmp/mcp-netchecker-server')
# STEP #5
self.show_step(me, 5)
self.push_netchecker(underlay, stype='server')
# STEP #6
self.show_step(me, 6)
self.dir_upload(underlay,
host='master',
source=settings.NETCHECKER_AGENT_DIR,
destination='/tmp/mcp-netchecker-agent')
# STEP #7
self.show_step(me, 7)
self.build_netchecker(underlay,
stype='agent',
source_dir='/tmp/mcp-netchecker-agent')
# STEP #8
self.show_step(me, 8)
self.push_netchecker(underlay, stype='agent')
# STEP #9
self.show_step(me, 9)
self.start_netchecker_server(k8sclient=k8sclient)
self.wait_netchecker_running(underlay, timeout=240)
# STEP #10
self.show_step(me, 10)
self.start_netchecker_agent(underlay, k8sclient)
# STEP #11
# currently agents need some time to start reporting to the server
self.show_step(me, 11)
time.sleep(120)
self.check_network(underlay, k8sclient, works=True)
# STEP #12
self.show_step(me, 12)
target_slave = self.get_random_slave(underlay)
# stop netchecker-server
# FIXME(apanchenko): uncomment and remove deletion via CLI below
# currently it fails due to labels:
# AttributeError: 'object' object has no attribute 'swagger_types'
# need a new version of k8sclient released with the following patch
# https://review.openstack.org/#/c/366908/
# self.check_pod_delete(
# k8s_pod=k8sclient.pods.get(name='netchecker-server'),
# k8sclient=k8sclient)
underlay.sudo_check_call(
'kubectl delete pod/netchecker-server',
node_name='master')
self.wait_pod_deleted(k8sclient, 'netchecker-server')
self.block_traffic_on_slave(underlay, target_slave)
# start netchecker-server
self.start_netchecker_server(k8sclient=k8sclient)
self.wait_netchecker_running(underlay, timeout=240)
# STEP #13
self.show_step(me, 13)
# currently agents need some time to start reporting to the server
time.sleep(120)
self.check_network(underlay, k8sclient, works=False)
# STEP #14
self.show_step(me, 14)
self.unblock_traffic_on_slave(underlay, target_slave)
# STEP #15
self.show_step(me, 15)
# currently agents need some time to start reporting to the server
time.sleep(240)
self.check_network(underlay, k8sclient, works=True)