summaryrefslogtreecommitdiff
path: root/kuryr_tempest_plugin/tests/scenario/base.py
blob: ca13bc088056226d853e0dd6d991cb72288aa91d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

from kubernetes import client as k8s_client
from kubernetes import config as k8s_config

from tempest.scenario import manager
from tempest.lib.common.utils import data_utils
from tempest import config

CONF = config.CONF


class BaseKuryrScenarioTest(manager.NetworkScenarioTest):

    @classmethod
    def skip_checks(cls):
        super(BaseKuryrScenarioTest, cls).skip_checks()
        if not CONF.service_available.kuryr:
            raise cls.skipException('Kuryr support is required')

    @classmethod
    def setup_clients(cls):
        super(BaseKuryrScenarioTest, cls).setup_clients()
        cls.k8s_client = k8s_client

    @classmethod
    def resource_setup(cls):
        super(BaseKuryrScenarioTest, cls).resource_setup()
        cls.pod_fips = []
        # TODO (dmellado): Config k8s client in a cleaner way
        k8s_config.load_kube_config()

    @classmethod
    def resource_cleanup(cls):
        super(BaseKuryrScenarioTest, cls).resource_cleanup()
        for fip in cls.pod_fips:
            cls.os_admin.floating_ips_client.delete_floatingip(
                fip['floatingip']['id'])

    def create_pod(self, name=None, image='celebdor/kuryr-demo',
                   namespace="default"):
        name = data_utils.rand_name(prefix='kuryr-pod')
        pod = self.k8s_client.V1Pod()
        pod.metadata = self.k8s_client.V1ObjectMeta(name=name)

        container = self.k8s_client.V1Container()
        container.image = image
        container.args = ["sleep", "3600"]
        container.name = name

        spec = self.k8s_client.V1PodSpec()
        spec.containers = [container]

        pod.spec = spec
        self.k8s_client.CoreV1Api().create_namespaced_pod(namespace=namespace,
                                                          body=pod)
        status = ""
        while status != "Running":
            # TODO (dmellado) add timeout config to tempest plugin
            time.sleep(1)
            status = self.get_pod_status(name, namespace)

        return name, pod

    def delete_pod(self, pod_name, body=None, namespace="default"):
        if body is None:
            body = {}
        self.k8s_client.CoreV1Api().delete_namespaced_pod(
            name=pod_name,
            body=body,
            namespace=namespace)

    def get_pod_ip(self, pod_name, namespace="default"):
        pod_list = self.k8s_client.CoreV1Api().list_namespaced_pod(
            namespace=namespace)
        for pod in pod_list.items:
            if pod.metadata.name == pod_name:
                return pod.status.pod_ip

    def get_pod_status(self, pod_name, namespace="default"):
        pod_list = self.k8s_client.CoreV1Api().list_namespaced_pod(
            namespace=namespace)
        for pod in pod_list.items:
            if pod.metadata.name == pod_name:
                return pod.status.phase

    def get_pod_port(self, pod_name, namespace="default"):
        # TODO(gcheresh) get pod port using container id, as kuryr this would
        # depend on port_debug kuryr feature
        port_list = self.os_admin.ports_client.list_ports()
        for port in port_list['ports']:
            if pod_name == port['name']:
                return port

    def assign_fip_to_pod(self, pod_name, namespace="default"):
        ext_net_id = CONF.network.public_network_id
        pod_fip = self.os_admin.floating_ips_client.create_floatingip(
            floating_network_id=ext_net_id,
            tenant_id=self.get_project_id(),
            port_id=self.get_pod_port(pod_name)['id'])
        self.pod_fips.append(pod_fip)
        return pod_fip

    def get_project_id(self, project_name='k8s'):
        projects_list = self.os_admin.projects_client.list_projects()
        for project in projects_list['projects']:
            if project_name == project['name']:
                return project['id']