207 lines
8.6 KiB
Python
207 lines
8.6 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from hamcrest import (assert_that, has_item, has_entry, is_not, empty,
|
|
has_property, contains_inanyorder) # noqa: H301
|
|
import jmespath
|
|
import pycontrail.types as types
|
|
import pytest
|
|
from stepler.third_party import utils
|
|
from stepler.third_party import waiter
|
|
|
|
from vapor.helpers import contrail_status
|
|
from vapor.helpers import asserts
|
|
from vapor.helpers.asserts import superset_of
|
|
from vapor import settings
|
|
|
|
|
|
def test_contrail_node_services_status(os_faults_steps):
|
|
contrail_status.check_services_statuses(os_faults_steps)
|
|
|
|
|
|
@pytest.mark.parametrize('role', settings.CONTRAIL_ROLES_DISTRIBUTION)
|
|
def test_contrail_service_distribution(os_faults_steps, role):
|
|
"""Check that contrail services are running on correct nodes."""
|
|
services_statuses = contrail_status.get_services_statuses(os_faults_steps)
|
|
nodes = settings.CONTRAIL_ROLES_DISTRIBUTION[role]
|
|
expected_services = settings.CONTRAIL_ROLES_SERVICES_MAPPING[role]
|
|
with asserts.AssertsCollector() as collector:
|
|
for node, services in services_statuses.items():
|
|
if node not in nodes:
|
|
continue
|
|
services = [x.service for x in services]
|
|
collector.check(services, superset_of(expected_services))
|
|
|
|
|
|
def test_ifmap_service(os_faults_steps):
|
|
"""Verify ifmap service."""
|
|
fqdns = settings.CONTRAIL_ROLES_DISTRIBUTION[
|
|
settings.ROLE_CONTRAIL_CONTROLLER]
|
|
nodes = os_faults_steps.get_nodes(fqdns)
|
|
os_faults_steps.execute_cmd(nodes, 'ifmap-view visual visual')
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_network_cleanup')
|
|
def test_add_virtual_network(contrail_api_client):
|
|
network_name, = utils.generate_ids()
|
|
net = types.VirtualNetwork(network_name)
|
|
contrail_api_client.virtual_network_create(net)
|
|
networks = contrail_api_client.virtual_networks_list()
|
|
assert_that(networks['virtual-networks'],
|
|
has_item(has_entry('uuid', net.uuid)))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_network_cleanup')
|
|
def test_delete_virtual_network(contrail_api_client, contrail_network):
|
|
contrail_api_client.virtual_network_delete(id=contrail_network.uuid)
|
|
networks = contrail_api_client.virtual_networks_list()
|
|
assert_that(networks['virtual-networks'],
|
|
is_not(has_item(has_entry('uuid', contrail_network.uuid))))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_network_cleanup')
|
|
def test_update_virtual_network(contrail_api_client, contrail_network):
|
|
new_display_name, = utils.generate_ids()
|
|
contrail_network.display_name = new_display_name
|
|
contrail_api_client.virtual_network_update(contrail_network)
|
|
network_data = contrail_api_client.virtual_network_read(
|
|
id=contrail_network.uuid)
|
|
assert_that(network_data, has_property('display_name', new_display_name))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_security_groups_cleanup')
|
|
def test_add_security_group(contrail_api_client):
|
|
security_group_name, = utils.generate_ids()
|
|
security_group = types.SecurityGroup(security_group_name)
|
|
contrail_api_client.security_group_create(security_group)
|
|
groups = contrail_api_client.security_groups_list()
|
|
assert_that(groups['security-groups'],
|
|
has_item(has_entry('uuid', security_group.uuid)))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_security_groups_cleanup')
|
|
def test_delete_security_group(contrail_api_client, contrail_security_group):
|
|
contrail_api_client.security_group_delete(id=contrail_security_group.uuid)
|
|
groups = contrail_api_client.security_groups_list()
|
|
assert_that(
|
|
groups['security-groups'],
|
|
is_not(has_item(has_entry('uuid', contrail_security_group.uuid))))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_security_groups_cleanup')
|
|
def test_update_security_group(contrail_api_client, contrail_security_group):
|
|
new_display_name, = utils.generate_ids()
|
|
contrail_security_group.display_name = new_display_name
|
|
contrail_api_client.security_group_update(contrail_security_group)
|
|
group_data = contrail_api_client.security_group_read(
|
|
id=contrail_security_group.uuid)
|
|
assert_that(group_data, has_property('display_name', new_display_name))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_policies_cleanup')
|
|
def test_add_network_policy(contrail_api_client):
|
|
network_policy_name, = utils.generate_ids()
|
|
network_policy = types.NetworkPolicy(network_policy_name)
|
|
contrail_api_client.network_policy_create(network_policy)
|
|
policies = contrail_api_client.network_policys_list()
|
|
assert_that(policies['network-policys'],
|
|
has_item(has_entry('uuid', network_policy.uuid)))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_policies_cleanup')
|
|
def test_delete_network_policy(contrail_api_client, contrail_network_policy):
|
|
contrail_api_client.network_policy_delete(id=contrail_network_policy.uuid)
|
|
policies = contrail_api_client.network_policys_list()
|
|
assert_that(
|
|
policies['network-policys'],
|
|
is_not(has_item(has_entry('uuid', contrail_network_policy.uuid))))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_policies_cleanup')
|
|
def test_update_network_policy(contrail_api_client, contrail_network_policy):
|
|
new_display_name, = utils.generate_ids()
|
|
contrail_network_policy.display_name = new_display_name
|
|
contrail_api_client.network_policy_update(contrail_network_policy)
|
|
policy_data = contrail_api_client.network_policy_read(
|
|
id=contrail_network_policy.uuid)
|
|
assert_that(policy_data, has_property('display_name', new_display_name))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_ipams_cleanup')
|
|
def test_add_network_ipam(contrail_api_client):
|
|
network_ipam_name, = utils.generate_ids()
|
|
network_ipam = types.NetworkIpam(network_ipam_name)
|
|
contrail_api_client.network_ipam_create(network_ipam)
|
|
ipams = contrail_api_client.network_ipams_list()
|
|
assert_that(ipams['network-ipams'],
|
|
has_item(has_entry('uuid', network_ipam.uuid)))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_ipams_cleanup')
|
|
def test_delete_network_ipam(contrail_api_client, contrail_ipam):
|
|
contrail_api_client.network_ipam_delete(id=contrail_ipam.uuid)
|
|
ipams = contrail_api_client.network_ipams_list()
|
|
assert_that(
|
|
ipams['network-ipams'],
|
|
is_not(has_item(has_entry('uuid', contrail_ipam.uuid))))
|
|
|
|
|
|
@pytest.mark.usefixtures('contrail_ipams_cleanup')
|
|
def test_update_network_ipam(contrail_api_client, contrail_ipam):
|
|
new_display_name, = utils.generate_ids()
|
|
contrail_ipam.display_name = new_display_name
|
|
contrail_api_client.network_ipam_update(contrail_ipam)
|
|
ipam_data = contrail_api_client.network_ipam_read(
|
|
id=contrail_ipam.uuid)
|
|
assert_that(ipam_data, has_property('display_name', new_display_name))
|
|
|
|
|
|
def test_contrail_alarms_is_empty(client_contrail_analytics):
|
|
alarms = client_contrail_analytics.get_alarms()
|
|
query = ('*[?@.value.*.alarms[?ack!=`True`]][].'
|
|
'{Node: @.name, Type: @.value.*.alarms[].type}')
|
|
not_ack_alarms = jmespath.search(query, alarms)
|
|
assert_that(not_ack_alarms, empty())
|
|
|
|
|
|
def test_zookeeper_status(znodes_list):
|
|
expected_znodes_list = settings.ZOOKEEPER_NODES
|
|
assert_that(znodes_list, contains_inanyorder(*expected_znodes_list))
|
|
|
|
|
|
@pytest.mark.requires('contrail_control_nodes_count >= 2')
|
|
def test_contrail_services_status_after_restart_master_node(os_faults_steps):
|
|
"""Verify contrail services status after master node restart.
|
|
|
|
Steps:
|
|
#. Restart node with contrail-schema (active)
|
|
#. Wait some time
|
|
#. Check that contrail services statuses is correct
|
|
"""
|
|
services_statuses = contrail_status.get_services_statuses(os_faults_steps)
|
|
master_node_fqdn = None
|
|
for fqdn, services in services_statuses.items():
|
|
for service in services:
|
|
if (service['name'] == 'contrail-schema' and
|
|
service['status'] == contrail_status.STATUS_ACTIVE):
|
|
master_node_fqdn = fqdn
|
|
break
|
|
assert master_node_fqdn is not None, "Can't find master node"
|
|
master_node = os_faults_steps.get_node(fqdns=[master_node_fqdn])
|
|
os_faults_steps.reset_nodes(master_node)
|
|
|
|
waiter.wait(
|
|
contrail_status.check_services_statuses,
|
|
args=(os_faults_steps),
|
|
expected_exceptions=AssertionError,
|
|
timeout=settings.CONTRAIL_NODE_RESET_TIMEOUT)
|