Merge "Refactoring amphora stats driver interface"
This commit is contained in:
commit
9a732565e9
|
@ -119,12 +119,6 @@
|
||||||
# health_check_interval = 3
|
# health_check_interval = 3
|
||||||
# sock_rlimit = 0
|
# sock_rlimit = 0
|
||||||
|
|
||||||
# Health/StatsUpdate options are
|
|
||||||
# *_db
|
|
||||||
# *_logger
|
|
||||||
# health_update_driver = health_db
|
|
||||||
# stats_update_driver = stats_db
|
|
||||||
|
|
||||||
[keystone_authtoken]
|
[keystone_authtoken]
|
||||||
# This group of config options are imported from keystone middleware. Thus the
|
# This group of config options are imported from keystone middleware. Thus the
|
||||||
# option names should match the names declared in the middleware.
|
# option names should match the names declared in the middleware.
|
||||||
|
@ -341,8 +335,14 @@
|
||||||
#
|
#
|
||||||
# distributor_driver = distributor_noop_driver
|
# distributor_driver = distributor_noop_driver
|
||||||
#
|
#
|
||||||
|
# Statistics update driver options are stats_db
|
||||||
|
# stats_logger
|
||||||
|
# Multiple values may be specified as a comma-separated list.
|
||||||
|
# statistics_drivers = stats_db
|
||||||
|
|
||||||
# Load balancer topology options are SINGLE, ACTIVE_STANDBY
|
# Load balancer topology options are SINGLE, ACTIVE_STANDBY
|
||||||
# loadbalancer_topology = SINGLE
|
# loadbalancer_topology = SINGLE
|
||||||
|
|
||||||
# user_data_config_drive = False
|
# user_data_config_drive = False
|
||||||
|
|
||||||
# amphora_delete_retries = 5
|
# amphora_delete_retries = 5
|
||||||
|
|
|
@ -189,23 +189,6 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
||||||
neutron network to utilize.
|
neutron network to utilize.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def start_health_check(self, health_mixin):
|
|
||||||
"""Start health checks.
|
|
||||||
|
|
||||||
:param health_mixin: health mixin object
|
|
||||||
:type health_mixin: HealthMixin
|
|
||||||
|
|
||||||
Starts listener process and calls HealthMixin to update
|
|
||||||
databases information.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def stop_health_check(self):
|
|
||||||
"""Stop health checks.
|
|
||||||
|
|
||||||
Stops listener process and calls HealthMixin to update
|
|
||||||
databases information.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def upload_cert_amp(self, amphora, pem_file):
|
def upload_cert_amp(self, amphora, pem_file):
|
||||||
"""Upload cert info to the amphora.
|
"""Upload cert info to the amphora.
|
||||||
|
|
||||||
|
@ -242,47 +225,6 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
class HealthMixin(object, metaclass=abc.ABCMeta):
|
|
||||||
@abc.abstractmethod
|
|
||||||
def update_health(self, health):
|
|
||||||
"""Return ceilometer ready health
|
|
||||||
|
|
||||||
:param health: health information emitted from the amphora
|
|
||||||
:type health: bool
|
|
||||||
:returns: return health
|
|
||||||
|
|
||||||
At this moment, we just build the basic structure for testing, will
|
|
||||||
add more function along with the development, eventually, we want it
|
|
||||||
return:
|
|
||||||
map: {"amphora-status":HEALTHY, loadbalancers: {"loadbalancer-id":
|
|
||||||
{"loadbalancer-status": HEALTHY,
|
|
||||||
"listeners":{"listener-id":{"listener-status":HEALTHY,
|
|
||||||
"nodes":{"node-id":HEALTHY, ...}}, ...}, ...}}
|
|
||||||
only items whose health has changed need to be submitted
|
|
||||||
awesome update code
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class StatsMixin(object, metaclass=abc.ABCMeta):
|
|
||||||
@abc.abstractmethod
|
|
||||||
def update_stats(self, stats):
|
|
||||||
"""Return ceilometer ready stats
|
|
||||||
|
|
||||||
:param stats: statistic information emitted from the amphora
|
|
||||||
:type stats: string
|
|
||||||
:returns: return stats
|
|
||||||
|
|
||||||
At this moment, we just build the basic structure for testing, will
|
|
||||||
add more function along with the development, eventually, we want it
|
|
||||||
return:
|
|
||||||
uses map {"loadbalancer-id":{"listener-id":
|
|
||||||
{"bytes-in": 123, "bytes_out":123, "active_connections":123,
|
|
||||||
"total_connections", 123}, ...}
|
|
||||||
elements are named to keep it extsnsible for future versions
|
|
||||||
awesome update code and code to send to ceilometer
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
||||||
"""Abstract mixin class for VRRP support in loadbalancer amphorae
|
"""Abstract mixin class for VRRP support in loadbalancer amphorae
|
||||||
|
|
||||||
|
|
|
@ -13,42 +13,32 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
|
import datetime
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
import timeit
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import excutils
|
||||||
|
import sqlalchemy
|
||||||
from stevedore import driver as stevedore_driver
|
from stevedore import driver as stevedore_driver
|
||||||
|
|
||||||
from octavia.amphorae.backends.health_daemon import status_message
|
from octavia.amphorae.backends.health_daemon import status_message
|
||||||
|
from octavia.common import constants
|
||||||
|
from octavia.common import data_models
|
||||||
from octavia.common import exceptions
|
from octavia.common import exceptions
|
||||||
from octavia.db import repositories
|
from octavia.db import api as db_api
|
||||||
|
from octavia.db import repositories as repo
|
||||||
|
from octavia.statistics import stats_base
|
||||||
|
|
||||||
UDP_MAX_SIZE = 64 * 1024
|
UDP_MAX_SIZE = 64 * 1024
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def update_health(obj, srcaddr):
|
|
||||||
handler = stevedore_driver.DriverManager(
|
|
||||||
namespace='octavia.amphora.health_update_drivers',
|
|
||||||
name=CONF.health_manager.health_update_driver,
|
|
||||||
invoke_on_load=True
|
|
||||||
).driver
|
|
||||||
handler.update_health(obj, srcaddr)
|
|
||||||
|
|
||||||
|
|
||||||
def update_stats(obj, srcaddr):
|
|
||||||
handler = stevedore_driver.DriverManager(
|
|
||||||
namespace='octavia.amphora.stats_update_drivers',
|
|
||||||
name=CONF.health_manager.stats_update_driver,
|
|
||||||
invoke_on_load=True
|
|
||||||
).driver
|
|
||||||
handler.update_stats(obj, srcaddr)
|
|
||||||
|
|
||||||
|
|
||||||
class UDPStatusGetter(object):
|
class UDPStatusGetter(object):
|
||||||
"""This class defines methods that will gather heatbeats
|
"""This class defines methods that will gather heartbeats
|
||||||
|
|
||||||
The heartbeats are transmitted via UDP and this class will bind to a port
|
The heartbeats are transmitted via UDP and this class will bind to a port
|
||||||
and absorb them
|
and absorb them
|
||||||
|
@ -67,7 +57,7 @@ class UDPStatusGetter(object):
|
||||||
max_workers=CONF.health_manager.health_update_threads)
|
max_workers=CONF.health_manager.health_update_threads)
|
||||||
self.stats_executor = futures.ProcessPoolExecutor(
|
self.stats_executor = futures.ProcessPoolExecutor(
|
||||||
max_workers=CONF.health_manager.stats_update_threads)
|
max_workers=CONF.health_manager.stats_update_threads)
|
||||||
self.repo = repositories.Repositories().amphorahealth
|
self.health_updater = UpdateHealthDb()
|
||||||
|
|
||||||
def update(self, key, ip, port):
|
def update(self, key, ip, port):
|
||||||
"""Update the running config for the udp socket server
|
"""Update the running config for the udp socket server
|
||||||
|
@ -99,91 +89,7 @@ class UDPStatusGetter(object):
|
||||||
"""Waits for a UDP heart beat to be sent.
|
"""Waits for a UDP heart beat to be sent.
|
||||||
|
|
||||||
:return: Returns the unwrapped payload and addr that sent the
|
:return: Returns the unwrapped payload and addr that sent the
|
||||||
heartbeat. The format of the obj from the UDP sender
|
heartbeat.
|
||||||
can be seen below. Note that listener_1 has no pools
|
|
||||||
and listener_4 has no members.
|
|
||||||
|
|
||||||
Example::
|
|
||||||
|
|
||||||
{
|
|
||||||
"listeners": {
|
|
||||||
"listener_uuid_1": {
|
|
||||||
"pools": {},
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"conns": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"tx": 0
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"listener_uuid_2": {
|
|
||||||
"pools": {
|
|
||||||
"pool_uuid_1": {
|
|
||||||
"members": [{
|
|
||||||
"member_uuid_1": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_2": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_3": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_4": "DOWN"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"conns": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"tx": 0
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"listener_uuid_3": {
|
|
||||||
"pools": {
|
|
||||||
"pool_uuid_2": {
|
|
||||||
"members": [{
|
|
||||||
"member_uuid_5": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_6": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_7": "DOWN"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"member_uuid_8": "DOWN"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"conns": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"tx": 0
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"listener_uuid_4": {
|
|
||||||
"pools": {
|
|
||||||
"pool_uuid_3": {
|
|
||||||
"members": []
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"conns": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"tx": 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"id": "amphora_uuid",
|
|
||||||
"seq": 1033
|
|
||||||
}
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
|
(data, srcaddr) = self.sock.recvfrom(UDP_MAX_SIZE)
|
||||||
LOG.debug('Received packet from %s', srcaddr)
|
LOG.debug('Received packet from %s', srcaddr)
|
||||||
|
@ -211,5 +117,526 @@ class UDPStatusGetter(object):
|
||||||
'heartbeat packet. Ignoring this packet. '
|
'heartbeat packet. Ignoring this packet. '
|
||||||
'Exception: %s', e)
|
'Exception: %s', e)
|
||||||
else:
|
else:
|
||||||
self.health_executor.submit(update_health, obj, srcaddr)
|
self.health_executor.submit(self.health_updater.update_health,
|
||||||
self.stats_executor.submit(update_stats, obj, srcaddr)
|
obj, srcaddr)
|
||||||
|
self.stats_executor.submit(update_stats, obj)
|
||||||
|
|
||||||
|
|
||||||
|
def update_stats(health_message):
|
||||||
|
"""Parses the health message then passes it to the stats driver(s)
|
||||||
|
|
||||||
|
:param health_message: The health message containing the listener stats
|
||||||
|
:type health_message: dict
|
||||||
|
|
||||||
|
Example V1 message::
|
||||||
|
|
||||||
|
health = {
|
||||||
|
"id": "<amphora_id>",
|
||||||
|
"listeners": {
|
||||||
|
"<listener_id>": {
|
||||||
|
"status": "OPEN",
|
||||||
|
"stats": {
|
||||||
|
"ereq": 0,
|
||||||
|
"conns": 0,
|
||||||
|
"totconns": 0,
|
||||||
|
"rx": 0,
|
||||||
|
"tx": 0,
|
||||||
|
},
|
||||||
|
"pools": {
|
||||||
|
"<pool_id>": {
|
||||||
|
"status": "UP",
|
||||||
|
"members": {"<member_id>": "ONLINE"}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Example V2 message::
|
||||||
|
|
||||||
|
{"id": "<amphora_id>",
|
||||||
|
"seq": 67,
|
||||||
|
"listeners": {
|
||||||
|
"<listener_id>": {
|
||||||
|
"status": "OPEN",
|
||||||
|
"stats": {
|
||||||
|
"tx": 0,
|
||||||
|
"rx": 0,
|
||||||
|
"conns": 0,
|
||||||
|
"totconns": 0,
|
||||||
|
"ereq": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"pools": {
|
||||||
|
"<pool_id>:<listener_id>": {
|
||||||
|
"status": "UP",
|
||||||
|
"members": {
|
||||||
|
"<member_id>": "no check"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"ver": 2
|
||||||
|
"recv_time": time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
Example V3 message::
|
||||||
|
|
||||||
|
Same as V2 message, except values are deltas rather than absolutes.
|
||||||
|
"""
|
||||||
|
version = health_message.get("ver", 2)
|
||||||
|
|
||||||
|
deltas = False
|
||||||
|
if version >= 3:
|
||||||
|
deltas = True
|
||||||
|
|
||||||
|
amphora_id = health_message.get('id')
|
||||||
|
listeners = health_message.get('listeners', {})
|
||||||
|
listener_stats = []
|
||||||
|
for listener_id, listener in listeners.items():
|
||||||
|
listener_dict = listener.get('stats')
|
||||||
|
stats_model = data_models.ListenerStatistics(
|
||||||
|
listener_id=listener_id,
|
||||||
|
amphora_id=amphora_id,
|
||||||
|
bytes_in=listener_dict.get('rx'),
|
||||||
|
bytes_out=listener_dict.get('tx'),
|
||||||
|
active_connections=listener_dict.get('conns'),
|
||||||
|
total_connections=listener_dict.get('totconns'),
|
||||||
|
request_errors=listener_dict.get('ereq'),
|
||||||
|
received_time=health_message.get('recv_time')
|
||||||
|
)
|
||||||
|
LOG.debug("Listener %s / Amphora %s stats: %s",
|
||||||
|
listener_id, amphora_id, stats_model.get_stats())
|
||||||
|
listener_stats.append(stats_model)
|
||||||
|
stats_base.update_stats_via_driver(listener_stats, deltas=deltas)
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateHealthDb:
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
# first setup repo for amphora, listener,member(nodes),pool repo
|
||||||
|
self.amphora_repo = repo.AmphoraRepository()
|
||||||
|
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
||||||
|
self.listener_repo = repo.ListenerRepository()
|
||||||
|
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
||||||
|
self.member_repo = repo.MemberRepository()
|
||||||
|
self.pool_repo = repo.PoolRepository()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _update_status(session, repo, entity_type,
|
||||||
|
entity_id, new_op_status, old_op_status):
|
||||||
|
if old_op_status.lower() != new_op_status.lower():
|
||||||
|
LOG.debug("%s %s status has changed from %s to "
|
||||||
|
"%s, updating db.",
|
||||||
|
entity_type, entity_id, old_op_status,
|
||||||
|
new_op_status)
|
||||||
|
repo.update(session, entity_id, operating_status=new_op_status)
|
||||||
|
|
||||||
|
def update_health(self, health, srcaddr):
|
||||||
|
# The executor will eat any exceptions from the update_health code
|
||||||
|
# so we need to wrap it and log the unhandled exception
|
||||||
|
start_time = timeit.default_timer()
|
||||||
|
try:
|
||||||
|
self._update_health(health, srcaddr)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception('Health update for amphora %(amp)s encountered '
|
||||||
|
'error %(err)s. Skipping health update.',
|
||||||
|
{'amp': health['id'], 'err': e})
|
||||||
|
# TODO(johnsom) We need to set a warning threshold here
|
||||||
|
LOG.debug('Health Update finished in: %s seconds',
|
||||||
|
timeit.default_timer() - start_time)
|
||||||
|
|
||||||
|
# Health heartbeat message pre-versioning with UDP listeners
|
||||||
|
# need to adjust the expected listener count
|
||||||
|
# This is for backward compatibility with Rocky pre-versioning
|
||||||
|
# heartbeat amphora.
|
||||||
|
def _update_listener_count_for_UDP(self, session, db_lb,
|
||||||
|
expected_listener_count):
|
||||||
|
# For udp listener, the udp health won't send out by amp agent.
|
||||||
|
# Once the default_pool of udp listener have the first enabled
|
||||||
|
# member, then the health will be sent out. So during this
|
||||||
|
# period, need to figure out the udp listener and ignore them
|
||||||
|
# by changing expected_listener_count.
|
||||||
|
for list_id, list_db in db_lb.get('listeners', {}).items():
|
||||||
|
need_remove = False
|
||||||
|
if list_db['protocol'] == constants.PROTOCOL_UDP:
|
||||||
|
listener = self.listener_repo.get(session, id=list_id)
|
||||||
|
enabled_members = ([member
|
||||||
|
for member in
|
||||||
|
listener.default_pool.members
|
||||||
|
if member.enabled]
|
||||||
|
if listener.default_pool else [])
|
||||||
|
if listener.default_pool:
|
||||||
|
if not listener.default_pool.members:
|
||||||
|
need_remove = True
|
||||||
|
elif not enabled_members:
|
||||||
|
need_remove = True
|
||||||
|
else:
|
||||||
|
need_remove = True
|
||||||
|
|
||||||
|
if need_remove:
|
||||||
|
expected_listener_count = expected_listener_count - 1
|
||||||
|
return expected_listener_count
|
||||||
|
|
||||||
|
def _update_health(self, health, srcaddr):
|
||||||
|
"""This function is to update db info based on amphora status
|
||||||
|
|
||||||
|
:param health: map object that contains amphora, listener, member info
|
||||||
|
:type map: string
|
||||||
|
:returns: null
|
||||||
|
|
||||||
|
The input v1 health data structure is shown as below::
|
||||||
|
|
||||||
|
health = {
|
||||||
|
"id": self.FAKE_UUID_1,
|
||||||
|
"listeners": {
|
||||||
|
"listener-id-1": {"status": constants.OPEN, "pools": {
|
||||||
|
"pool-id-1": {"status": constants.UP,
|
||||||
|
"members": {
|
||||||
|
"member-id-1": constants.ONLINE}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Example V2 message::
|
||||||
|
|
||||||
|
{"id": "<amphora_id>",
|
||||||
|
"seq": 67,
|
||||||
|
"listeners": {
|
||||||
|
"<listener_id>": {
|
||||||
|
"status": "OPEN",
|
||||||
|
"stats": {
|
||||||
|
"tx": 0,
|
||||||
|
"rx": 0,
|
||||||
|
"conns": 0,
|
||||||
|
"totconns": 0,
|
||||||
|
"ereq": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"pools": {
|
||||||
|
"<pool_id>:<listener_id>": {
|
||||||
|
"status": "UP",
|
||||||
|
"members": {
|
||||||
|
"<member_id>": "no check"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"ver": 2
|
||||||
|
}
|
||||||
|
|
||||||
|
"""
|
||||||
|
session = db_api.get_session()
|
||||||
|
|
||||||
|
# We need to see if all of the listeners are reporting in
|
||||||
|
db_lb = self.amphora_repo.get_lb_for_health_update(session,
|
||||||
|
health['id'])
|
||||||
|
ignore_listener_count = False
|
||||||
|
|
||||||
|
if db_lb:
|
||||||
|
expected_listener_count = 0
|
||||||
|
if ('PENDING' in db_lb['provisioning_status'] or
|
||||||
|
not db_lb['enabled']):
|
||||||
|
ignore_listener_count = True
|
||||||
|
else:
|
||||||
|
for key, listener in db_lb.get('listeners', {}).items():
|
||||||
|
# disabled listeners don't report from the amphora
|
||||||
|
if listener['enabled']:
|
||||||
|
expected_listener_count += 1
|
||||||
|
|
||||||
|
# If this is a heartbeat older than versioning, handle
|
||||||
|
# UDP special for backward compatibility.
|
||||||
|
if 'ver' not in health:
|
||||||
|
udp_listeners = [
|
||||||
|
l for k, l in db_lb.get('listeners', {}).items()
|
||||||
|
if l['protocol'] == constants.PROTOCOL_UDP]
|
||||||
|
if udp_listeners:
|
||||||
|
expected_listener_count = (
|
||||||
|
self._update_listener_count_for_UDP(
|
||||||
|
session, db_lb, expected_listener_count))
|
||||||
|
else:
|
||||||
|
# If this is not a spare amp, log and skip it.
|
||||||
|
amp = self.amphora_repo.get(session, id=health['id'])
|
||||||
|
if not amp or amp.load_balancer_id:
|
||||||
|
# This is debug and not warning because this can happen under
|
||||||
|
# normal deleting operations.
|
||||||
|
LOG.debug('Received a health heartbeat from amphora %s with '
|
||||||
|
'IP %s that should not exist. This amphora may be '
|
||||||
|
'in the process of being deleted, in which case you '
|
||||||
|
'will only see this message a few '
|
||||||
|
'times', health['id'], srcaddr)
|
||||||
|
if not amp:
|
||||||
|
LOG.warning('The amphora %s with IP %s is missing from '
|
||||||
|
'the DB, so it cannot be automatically '
|
||||||
|
'deleted (the compute_id is unknown). An '
|
||||||
|
'operator must manually delete it from the '
|
||||||
|
'compute service.', health['id'], srcaddr)
|
||||||
|
return
|
||||||
|
# delete the amp right there
|
||||||
|
try:
|
||||||
|
compute = stevedore_driver.DriverManager(
|
||||||
|
namespace='octavia.compute.drivers',
|
||||||
|
name=CONF.controller_worker.compute_driver,
|
||||||
|
invoke_on_load=True
|
||||||
|
).driver
|
||||||
|
compute.delete(amp.compute_id)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
LOG.info("Error deleting amp %s with IP %s Error: %s",
|
||||||
|
health['id'], srcaddr, e)
|
||||||
|
expected_listener_count = 0
|
||||||
|
|
||||||
|
listeners = health['listeners']
|
||||||
|
|
||||||
|
# Do not update amphora health if the reporting listener count
|
||||||
|
# does not match the expected listener count
|
||||||
|
if len(listeners) == expected_listener_count or ignore_listener_count:
|
||||||
|
|
||||||
|
lock_session = db_api.get_session(autocommit=False)
|
||||||
|
|
||||||
|
# if we're running too far behind, warn and bail
|
||||||
|
proc_delay = time.time() - health['recv_time']
|
||||||
|
hb_interval = CONF.health_manager.heartbeat_interval
|
||||||
|
# TODO(johnsom) We need to set a warning threshold here, and
|
||||||
|
# escalate to critical when it reaches the
|
||||||
|
# heartbeat_interval
|
||||||
|
if proc_delay >= hb_interval:
|
||||||
|
LOG.warning('Amphora %(id)s health message was processed too '
|
||||||
|
'slowly: %(delay)ss! The system may be overloaded '
|
||||||
|
'or otherwise malfunctioning. This heartbeat has '
|
||||||
|
'been ignored and no update was made to the '
|
||||||
|
'amphora health entry. THIS IS NOT GOOD.',
|
||||||
|
{'id': health['id'], 'delay': proc_delay})
|
||||||
|
return
|
||||||
|
|
||||||
|
# if the input amphora is healthy, we update its db info
|
||||||
|
try:
|
||||||
|
self.amphora_health_repo.replace(
|
||||||
|
lock_session, health['id'],
|
||||||
|
last_update=(datetime.datetime.utcnow()))
|
||||||
|
lock_session.commit()
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
lock_session.rollback()
|
||||||
|
else:
|
||||||
|
LOG.warning('Amphora %(id)s health message reports %(found)i '
|
||||||
|
'listeners when %(expected)i expected',
|
||||||
|
{'id': health['id'], 'found': len(listeners),
|
||||||
|
'expected': expected_listener_count})
|
||||||
|
|
||||||
|
# Don't try to update status for spares pool amphora
|
||||||
|
if not db_lb:
|
||||||
|
return
|
||||||
|
|
||||||
|
processed_pools = []
|
||||||
|
potential_offline_pools = {}
|
||||||
|
|
||||||
|
# We got a heartbeat so lb is healthy until proven otherwise
|
||||||
|
if db_lb[constants.ENABLED] is False:
|
||||||
|
lb_status = constants.OFFLINE
|
||||||
|
else:
|
||||||
|
lb_status = constants.ONLINE
|
||||||
|
|
||||||
|
health_msg_version = health.get('ver', 0)
|
||||||
|
|
||||||
|
for listener_id in db_lb.get(constants.LISTENERS, {}):
|
||||||
|
db_listener = db_lb[constants.LISTENERS][listener_id]
|
||||||
|
db_op_status = db_listener[constants.OPERATING_STATUS]
|
||||||
|
listener_status = None
|
||||||
|
listener = None
|
||||||
|
|
||||||
|
if listener_id not in listeners:
|
||||||
|
if (db_listener[constants.ENABLED] and
|
||||||
|
db_lb[constants.PROVISIONING_STATUS] ==
|
||||||
|
constants.ACTIVE):
|
||||||
|
listener_status = constants.ERROR
|
||||||
|
else:
|
||||||
|
listener_status = constants.OFFLINE
|
||||||
|
else:
|
||||||
|
listener = listeners[listener_id]
|
||||||
|
|
||||||
|
# OPEN = HAProxy listener status nbconn < maxconn
|
||||||
|
if listener.get('status') == constants.OPEN:
|
||||||
|
listener_status = constants.ONLINE
|
||||||
|
# FULL = HAProxy listener status not nbconn < maxconn
|
||||||
|
elif listener.get('status') == constants.FULL:
|
||||||
|
listener_status = constants.DEGRADED
|
||||||
|
if lb_status == constants.ONLINE:
|
||||||
|
lb_status = constants.DEGRADED
|
||||||
|
else:
|
||||||
|
LOG.warning(('Listener %(list)s reported status of '
|
||||||
|
'%(status)s'),
|
||||||
|
{'list': listener_id,
|
||||||
|
'status': listener.get('status')})
|
||||||
|
|
||||||
|
try:
|
||||||
|
if (listener_status is not None and
|
||||||
|
listener_status != db_op_status):
|
||||||
|
self._update_status(
|
||||||
|
session, self.listener_repo, constants.LISTENER,
|
||||||
|
listener_id, listener_status, db_op_status)
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
LOG.error("Listener %s is not in DB", listener_id)
|
||||||
|
|
||||||
|
if not listener:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if health_msg_version < 2:
|
||||||
|
raw_pools = listener['pools']
|
||||||
|
|
||||||
|
# normalize the pool IDs. Single process listener pools
|
||||||
|
# have the listener id appended with an ':' seperator.
|
||||||
|
# Old multi-process listener pools only have a pool ID.
|
||||||
|
# This makes sure the keys are only pool IDs.
|
||||||
|
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in
|
||||||
|
raw_pools.items()}
|
||||||
|
|
||||||
|
for db_pool_id in db_lb.get('pools', {}):
|
||||||
|
# If we saw this pool already on another listener, skip it.
|
||||||
|
if db_pool_id in processed_pools:
|
||||||
|
continue
|
||||||
|
db_pool_dict = db_lb['pools'][db_pool_id]
|
||||||
|
lb_status = self._process_pool_status(
|
||||||
|
session, db_pool_id, db_pool_dict, pools,
|
||||||
|
lb_status, processed_pools, potential_offline_pools)
|
||||||
|
|
||||||
|
if health_msg_version >= 2:
|
||||||
|
raw_pools = health['pools']
|
||||||
|
|
||||||
|
# normalize the pool IDs. Single process listener pools
|
||||||
|
# have the listener id appended with an ':' seperator.
|
||||||
|
# Old multi-process listener pools only have a pool ID.
|
||||||
|
# This makes sure the keys are only pool IDs.
|
||||||
|
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in raw_pools.items()}
|
||||||
|
|
||||||
|
for db_pool_id in db_lb.get('pools', {}):
|
||||||
|
# If we saw this pool already, skip it.
|
||||||
|
if db_pool_id in processed_pools:
|
||||||
|
continue
|
||||||
|
db_pool_dict = db_lb['pools'][db_pool_id]
|
||||||
|
lb_status = self._process_pool_status(
|
||||||
|
session, db_pool_id, db_pool_dict, pools,
|
||||||
|
lb_status, processed_pools, potential_offline_pools)
|
||||||
|
|
||||||
|
for pool_id in potential_offline_pools:
|
||||||
|
# Skip if we eventually found a status for this pool
|
||||||
|
if pool_id in processed_pools:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
# If the database doesn't already show the pool offline, update
|
||||||
|
if potential_offline_pools[pool_id] != constants.OFFLINE:
|
||||||
|
self._update_status(
|
||||||
|
session, self.pool_repo, constants.POOL,
|
||||||
|
pool_id, constants.OFFLINE,
|
||||||
|
potential_offline_pools[pool_id])
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
LOG.error("Pool %s is not in DB", pool_id)
|
||||||
|
|
||||||
|
# Update the load balancer status last
|
||||||
|
try:
|
||||||
|
if lb_status != db_lb['operating_status']:
|
||||||
|
self._update_status(
|
||||||
|
session, self.loadbalancer_repo,
|
||||||
|
constants.LOADBALANCER, db_lb['id'], lb_status,
|
||||||
|
db_lb[constants.OPERATING_STATUS])
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
LOG.error("Load balancer %s is not in DB", db_lb.id)
|
||||||
|
|
||||||
|
def _process_pool_status(
|
||||||
|
self, session, pool_id, db_pool_dict, pools, lb_status,
|
||||||
|
processed_pools, potential_offline_pools):
|
||||||
|
pool_status = None
|
||||||
|
|
||||||
|
if pool_id not in pools:
|
||||||
|
# If we don't have a status update for this pool_id
|
||||||
|
# add it to the list of potential offline pools and continue.
|
||||||
|
# We will check the potential offline pool list after we
|
||||||
|
# finish processing the status updates from all of the listeners.
|
||||||
|
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
|
||||||
|
return lb_status
|
||||||
|
|
||||||
|
pool = pools[pool_id]
|
||||||
|
|
||||||
|
processed_pools.append(pool_id)
|
||||||
|
|
||||||
|
# UP = HAProxy backend has working or no servers
|
||||||
|
if pool.get('status') == constants.UP:
|
||||||
|
pool_status = constants.ONLINE
|
||||||
|
# DOWN = HAProxy backend has no working servers
|
||||||
|
elif pool.get('status') == constants.DOWN:
|
||||||
|
pool_status = constants.ERROR
|
||||||
|
lb_status = constants.ERROR
|
||||||
|
else:
|
||||||
|
LOG.warning(('Pool %(pool)s reported status of '
|
||||||
|
'%(status)s'),
|
||||||
|
{'pool': pool_id,
|
||||||
|
'status': pool.get('status')})
|
||||||
|
|
||||||
|
# Deal with the members that are reporting from
|
||||||
|
# the Amphora
|
||||||
|
members = pool['members']
|
||||||
|
for member_id in db_pool_dict.get('members', {}):
|
||||||
|
member_status = None
|
||||||
|
member_db_status = (
|
||||||
|
db_pool_dict['members'][member_id]['operating_status'])
|
||||||
|
|
||||||
|
if member_id not in members:
|
||||||
|
if member_db_status != constants.NO_MONITOR:
|
||||||
|
member_status = constants.OFFLINE
|
||||||
|
else:
|
||||||
|
status = members[member_id]
|
||||||
|
|
||||||
|
# Member status can be "UP" or "UP #/#"
|
||||||
|
# (transitional)
|
||||||
|
if status.startswith(constants.UP):
|
||||||
|
member_status = constants.ONLINE
|
||||||
|
# Member status can be "DOWN" or "DOWN #/#"
|
||||||
|
# (transitional)
|
||||||
|
elif status.startswith(constants.DOWN):
|
||||||
|
member_status = constants.ERROR
|
||||||
|
if pool_status == constants.ONLINE:
|
||||||
|
pool_status = constants.DEGRADED
|
||||||
|
if lb_status == constants.ONLINE:
|
||||||
|
lb_status = constants.DEGRADED
|
||||||
|
elif status == constants.DRAIN:
|
||||||
|
member_status = constants.DRAINING
|
||||||
|
elif status == constants.MAINT:
|
||||||
|
member_status = constants.OFFLINE
|
||||||
|
elif status == constants.NO_CHECK:
|
||||||
|
member_status = constants.NO_MONITOR
|
||||||
|
elif status == constants.RESTARTING:
|
||||||
|
# RESTARTING means that keepalived is restarting and a down
|
||||||
|
# member has been detected, the real status of the member
|
||||||
|
# is not clear, it might mean that the checker hasn't run
|
||||||
|
# yet.
|
||||||
|
# In this case, keep previous member_status, and wait for a
|
||||||
|
# non-transitional status.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
LOG.warning('Member %(mem)s reported '
|
||||||
|
'status of %(status)s',
|
||||||
|
{'mem': member_id,
|
||||||
|
'status': status})
|
||||||
|
|
||||||
|
try:
|
||||||
|
if (member_status is not None and
|
||||||
|
member_status != member_db_status):
|
||||||
|
self._update_status(
|
||||||
|
session, self.member_repo, constants.MEMBER,
|
||||||
|
member_id, member_status, member_db_status)
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
LOG.error("Member %s is not able to update "
|
||||||
|
"in DB", member_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if (pool_status is not None and
|
||||||
|
pool_status != db_pool_dict['operating_status']):
|
||||||
|
self._update_status(
|
||||||
|
session, self.pool_repo, constants.POOL,
|
||||||
|
pool_id, pool_status, db_pool_dict['operating_status'])
|
||||||
|
except sqlalchemy.orm.exc.NoResultFound:
|
||||||
|
LOG.error("Pool %s is not in DB", pool_id)
|
||||||
|
|
||||||
|
return lb_status
|
||||||
|
|
|
@ -19,18 +19,6 @@ from octavia.amphorae.drivers import driver_base
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LoggingUpdate(object):
|
|
||||||
def update_stats(self, stats):
|
|
||||||
LOG.debug("Amphora %s no-op, update stats %s",
|
|
||||||
self.__class__.__name__, stats)
|
|
||||||
self.stats = stats
|
|
||||||
|
|
||||||
def update_health(self, health):
|
|
||||||
LOG.debug("Amphora %s no-op, update health %s",
|
|
||||||
self.__class__.__name__, health)
|
|
||||||
self.health = health
|
|
||||||
|
|
||||||
|
|
||||||
class NoopManager(object):
|
class NoopManager(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
|
@ -12,13 +12,17 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
from octavia_lib.api.drivers import exceptions as driver_exceptions
|
from octavia_lib.api.drivers import exceptions as driver_exceptions
|
||||||
from octavia_lib.common import constants as lib_consts
|
from octavia_lib.common import constants as lib_consts
|
||||||
|
|
||||||
from octavia.common import constants as consts
|
from octavia.common import constants as consts
|
||||||
|
from octavia.common import data_models
|
||||||
from octavia.common import utils
|
from octavia.common import utils
|
||||||
from octavia.db import api as db_apis
|
from octavia.db import api as db_apis
|
||||||
from octavia.db import repositories as repo
|
from octavia.db import repositories as repo
|
||||||
|
from octavia.statistics import stats_base
|
||||||
|
|
||||||
|
|
||||||
class DriverUpdater(object):
|
class DriverUpdater(object):
|
||||||
|
@ -151,24 +155,34 @@ class DriverUpdater(object):
|
||||||
:returns: None
|
:returns: None
|
||||||
"""
|
"""
|
||||||
listener_stats = statistics.get(lib_consts.LISTENERS, [])
|
listener_stats = statistics.get(lib_consts.LISTENERS, [])
|
||||||
|
stats_objects = []
|
||||||
for stat in listener_stats:
|
for stat in listener_stats:
|
||||||
try:
|
try:
|
||||||
listener_id = stat.pop('id')
|
stats_obj = data_models.ListenerStatistics(
|
||||||
|
listener_id=stat['id'],
|
||||||
|
bytes_in=stat['bytes_in'],
|
||||||
|
bytes_out=stat['bytes_out'],
|
||||||
|
active_connections=stat['active_connections'],
|
||||||
|
total_connections=stat['total_connections'],
|
||||||
|
request_errors=stat['request_errors'],
|
||||||
|
received_time=time.time()
|
||||||
|
)
|
||||||
|
stats_objects.append(stats_obj)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {
|
return {
|
||||||
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
||||||
lib_consts.FAULT_STRING: str(e),
|
lib_consts.FAULT_STRING: str(e),
|
||||||
lib_consts.STATS_OBJECT: lib_consts.LISTENERS}
|
lib_consts.STATS_OBJECT: lib_consts.LISTENERS}
|
||||||
# Provider drivers other than the amphora driver do not have
|
|
||||||
# an amphora ID, use the listener ID again here to meet the
|
# Provider drivers other than the amphora driver do not have
|
||||||
# constraint requirement.
|
# an amphora ID, use the listener ID again here to meet the
|
||||||
try:
|
# constraint requirement.
|
||||||
self.listener_stats_repo.replace(self.db_session, listener_id,
|
try:
|
||||||
listener_id, **stat)
|
if stats_objects:
|
||||||
except Exception as e:
|
stats_base.update_stats_via_driver(stats_objects)
|
||||||
return {
|
except Exception as e:
|
||||||
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
return {
|
||||||
lib_consts.FAULT_STRING: str(e),
|
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
||||||
lib_consts.STATS_OBJECT: lib_consts.LISTENERS,
|
lib_consts.FAULT_STRING: str(e),
|
||||||
lib_consts.STATS_OBJECT_ID: listener_id}
|
lib_consts.STATS_OBJECT: lib_consts.LISTENERS}
|
||||||
return {lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_OK}
|
return {lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_OK}
|
||||||
|
|
|
@ -261,7 +261,7 @@ networking_opts = [
|
||||||
"neutron RBAC policies.")),
|
"neutron RBAC policies.")),
|
||||||
]
|
]
|
||||||
|
|
||||||
healthmanager_opts = [
|
health_manager_opts = [
|
||||||
cfg.IPOpt('bind_ip', default='127.0.0.1',
|
cfg.IPOpt('bind_ip', default='127.0.0.1',
|
||||||
help=_('IP address the controller will listen on for '
|
help=_('IP address the controller will listen on for '
|
||||||
'heart beats')),
|
'heart beats')),
|
||||||
|
@ -303,11 +303,12 @@ healthmanager_opts = [
|
||||||
mutable=True,
|
mutable=True,
|
||||||
help=_('Sleep time between sending heartbeats.')),
|
help=_('Sleep time between sending heartbeats.')),
|
||||||
|
|
||||||
# Used for updating health and stats
|
# Used for updating health
|
||||||
cfg.StrOpt('health_update_driver', default='health_db',
|
cfg.StrOpt('health_update_driver', default='health_db',
|
||||||
help=_('Driver for updating amphora health system.')),
|
help=_('Driver for updating amphora health system.'),
|
||||||
cfg.StrOpt('stats_update_driver', default='stats_db',
|
deprecated_for_removal=True,
|
||||||
help=_('Driver for updating amphora statistics.')),
|
deprecated_reason=_('This driver interface was removed.'),
|
||||||
|
deprecated_since='Victoria'),
|
||||||
]
|
]
|
||||||
|
|
||||||
oslo_messaging_opts = [
|
oslo_messaging_opts = [
|
||||||
|
@ -485,6 +486,11 @@ controller_worker_opts = [
|
||||||
cfg.StrOpt('distributor_driver',
|
cfg.StrOpt('distributor_driver',
|
||||||
default='distributor_noop_driver',
|
default='distributor_noop_driver',
|
||||||
help=_('Name of the distributor driver to use')),
|
help=_('Name of the distributor driver to use')),
|
||||||
|
cfg.ListOpt('statistics_drivers', default=['stats_db'],
|
||||||
|
deprecated_name='stats_update_driver',
|
||||||
|
deprecated_group='health_manager',
|
||||||
|
deprecated_since='Victoria',
|
||||||
|
help=_('List of drivers for updating amphora statistics.')),
|
||||||
cfg.StrOpt('loadbalancer_topology',
|
cfg.StrOpt('loadbalancer_topology',
|
||||||
default=constants.TOPOLOGY_SINGLE,
|
default=constants.TOPOLOGY_SINGLE,
|
||||||
choices=constants.SUPPORTED_LB_TOPOLOGIES,
|
choices=constants.SUPPORTED_LB_TOPOLOGIES,
|
||||||
|
@ -854,7 +860,7 @@ cfg.CONF.register_opts(keepalived_vrrp_opts, group='keepalived_vrrp')
|
||||||
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
|
cfg.CONF.register_opts(task_flow_opts, group='task_flow')
|
||||||
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
|
cfg.CONF.register_opts(house_keeping_opts, group='house_keeping')
|
||||||
cfg.CONF.register_opts(certificate_opts, group='certificates')
|
cfg.CONF.register_opts(certificate_opts, group='certificates')
|
||||||
cfg.CONF.register_opts(healthmanager_opts, group='health_manager')
|
cfg.CONF.register_opts(health_manager_opts, group='health_manager')
|
||||||
cfg.CONF.register_opts(nova_opts, group='nova')
|
cfg.CONF.register_opts(nova_opts, group='nova')
|
||||||
cfg.CONF.register_opts(cinder_opts, group='cinder')
|
cfg.CONF.register_opts(cinder_opts, group='cinder')
|
||||||
cfg.CONF.register_opts(glance_opts, group='glance')
|
cfg.CONF.register_opts(glance_opts, group='glance')
|
||||||
|
|
|
@ -182,7 +182,7 @@ class ListenerStatistics(BaseDataModel):
|
||||||
|
|
||||||
def __init__(self, listener_id=None, amphora_id=None, bytes_in=0,
|
def __init__(self, listener_id=None, amphora_id=None, bytes_in=0,
|
||||||
bytes_out=0, active_connections=0,
|
bytes_out=0, active_connections=0,
|
||||||
total_connections=0, request_errors=0):
|
total_connections=0, request_errors=0, received_time=0.0):
|
||||||
self.listener_id = listener_id
|
self.listener_id = listener_id
|
||||||
self.amphora_id = amphora_id
|
self.amphora_id = amphora_id
|
||||||
self.bytes_in = bytes_in
|
self.bytes_in = bytes_in
|
||||||
|
@ -190,6 +190,7 @@ class ListenerStatistics(BaseDataModel):
|
||||||
self.active_connections = active_connections
|
self.active_connections = active_connections
|
||||||
self.total_connections = total_connections
|
self.total_connections = total_connections
|
||||||
self.request_errors = request_errors
|
self.request_errors = request_errors
|
||||||
|
self.received_time = received_time
|
||||||
|
|
||||||
def get_stats(self):
|
def get_stats(self):
|
||||||
stats = {
|
stats = {
|
||||||
|
@ -201,8 +202,12 @@ class ListenerStatistics(BaseDataModel):
|
||||||
}
|
}
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
def __iadd__(self, other):
|
def db_fields(self):
|
||||||
|
fields = self.to_dict()
|
||||||
|
fields.pop('received_time')
|
||||||
|
return fields
|
||||||
|
|
||||||
|
def __iadd__(self, other):
|
||||||
if isinstance(other, ListenerStatistics):
|
if isinstance(other, ListenerStatistics):
|
||||||
self.bytes_in += other.bytes_in
|
self.bytes_in += other.bytes_in
|
||||||
self.bytes_out += other.bytes_out
|
self.bytes_out += other.bytes_out
|
||||||
|
|
|
@ -43,7 +43,13 @@ class StatsMixin(object):
|
||||||
statistics += db_l
|
statistics += db_l
|
||||||
|
|
||||||
amp = self.repo_amphora.get(session, id=db_l.amphora_id)
|
amp = self.repo_amphora.get(session, id=db_l.amphora_id)
|
||||||
if amp and amp.status == constants.AMPHORA_ALLOCATED:
|
# Amphora ID and Listener ID will be the same in the case that the
|
||||||
|
# stats are coming from a provider driver other than the `amphora`
|
||||||
|
# driver. In that case and when the current amphora is ALLOCATED
|
||||||
|
# are the only times we should include the *active* connections,
|
||||||
|
# because non-active amphora will have incorrect counts.
|
||||||
|
if (amp and amp.status == constants.AMPHORA_ALLOCATED) or (
|
||||||
|
db_l.amphora_id == db_l.listener_id):
|
||||||
statistics.active_connections += db_l.active_connections
|
statistics.active_connections += db_l.active_connections
|
||||||
return statistics
|
return statistics
|
||||||
|
|
||||||
|
|
|
@ -1,27 +0,0 @@
|
||||||
# Copyright 2018 GoDaddy
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import abc
|
|
||||||
|
|
||||||
|
|
||||||
class HealthUpdateBase(object):
|
|
||||||
@abc.abstractmethod
|
|
||||||
def update_health(self, health, srcaddr):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
|
|
||||||
class StatsUpdateBase(object):
|
|
||||||
@abc.abstractmethod
|
|
||||||
def update_stats(self, health_message, srcaddr):
|
|
||||||
raise NotImplementedError()
|
|
|
@ -1,606 +0,0 @@
|
||||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
import time
|
|
||||||
import timeit
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_log import log as logging
|
|
||||||
from oslo_utils import excutils
|
|
||||||
import sqlalchemy
|
|
||||||
from stevedore import driver as stevedore_driver
|
|
||||||
|
|
||||||
from octavia.common import constants
|
|
||||||
from octavia.common import data_models
|
|
||||||
from octavia.common import stats
|
|
||||||
from octavia.controller.healthmanager.health_drivers import update_base
|
|
||||||
from octavia.db import api as db_api
|
|
||||||
from octavia.db import repositories as repo
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateHealthDb(update_base.HealthUpdateBase):
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__()
|
|
||||||
# first setup repo for amphora, listener,member(nodes),pool repo
|
|
||||||
self.amphora_repo = repo.AmphoraRepository()
|
|
||||||
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
|
||||||
self.listener_repo = repo.ListenerRepository()
|
|
||||||
self.loadbalancer_repo = repo.LoadBalancerRepository()
|
|
||||||
self.member_repo = repo.MemberRepository()
|
|
||||||
self.pool_repo = repo.PoolRepository()
|
|
||||||
|
|
||||||
def _update_status(self, session, repo, entity_type,
|
|
||||||
entity_id, new_op_status, old_op_status):
|
|
||||||
message = {}
|
|
||||||
if old_op_status.lower() != new_op_status.lower():
|
|
||||||
LOG.debug("%s %s status has changed from %s to "
|
|
||||||
"%s, updating db.",
|
|
||||||
entity_type, entity_id, old_op_status,
|
|
||||||
new_op_status)
|
|
||||||
repo.update(session, entity_id, operating_status=new_op_status)
|
|
||||||
# Map the status for neutron-lbaas compatibility
|
|
||||||
if new_op_status == constants.DRAINING:
|
|
||||||
new_op_status = constants.ONLINE
|
|
||||||
message.update({constants.OPERATING_STATUS: new_op_status})
|
|
||||||
|
|
||||||
def update_health(self, health, srcaddr):
|
|
||||||
# The executor will eat any exceptions from the update_health code
|
|
||||||
# so we need to wrap it and log the unhandled exception
|
|
||||||
start_time = timeit.default_timer()
|
|
||||||
try:
|
|
||||||
self._update_health(health, srcaddr)
|
|
||||||
except Exception as e:
|
|
||||||
LOG.exception('Health update for amphora %(amp)s encountered '
|
|
||||||
'error %(err)s. Skipping health update.',
|
|
||||||
{'amp': health['id'], 'err': e})
|
|
||||||
# TODO(johnsom) We need to set a warning threshold here
|
|
||||||
LOG.debug('Health Update finished in: %s seconds',
|
|
||||||
timeit.default_timer() - start_time)
|
|
||||||
|
|
||||||
# Health heartbeat messsage pre-versioning with UDP listeners
|
|
||||||
# need to adjust the expected listener count
|
|
||||||
# This is for backward compatibility with Rocky pre-versioning
|
|
||||||
# heartbeat amphora.
|
|
||||||
def _update_listener_count_for_UDP(self, session, db_lb,
|
|
||||||
expected_listener_count):
|
|
||||||
# For udp listener, the udp health won't send out by amp agent.
|
|
||||||
# Once the default_pool of udp listener have the first enabled
|
|
||||||
# member, then the health will be sent out. So during this
|
|
||||||
# period, need to figure out the udp listener and ignore them
|
|
||||||
# by changing expected_listener_count.
|
|
||||||
for list_id, list_db in db_lb.get('listeners', {}).items():
|
|
||||||
need_remove = False
|
|
||||||
if list_db['protocol'] == constants.PROTOCOL_UDP:
|
|
||||||
listener = self.listener_repo.get(session, id=list_id)
|
|
||||||
enabled_members = ([member
|
|
||||||
for member in
|
|
||||||
listener.default_pool.members
|
|
||||||
if member.enabled]
|
|
||||||
if listener.default_pool else [])
|
|
||||||
if listener.default_pool:
|
|
||||||
if not listener.default_pool.members:
|
|
||||||
need_remove = True
|
|
||||||
elif not enabled_members:
|
|
||||||
need_remove = True
|
|
||||||
else:
|
|
||||||
need_remove = True
|
|
||||||
|
|
||||||
if need_remove:
|
|
||||||
expected_listener_count = expected_listener_count - 1
|
|
||||||
return expected_listener_count
|
|
||||||
|
|
||||||
def _update_health(self, health, srcaddr):
|
|
||||||
"""This function is to update db info based on amphora status
|
|
||||||
|
|
||||||
:param health: map object that contains amphora, listener, member info
|
|
||||||
:type map: string
|
|
||||||
:returns: null
|
|
||||||
|
|
||||||
The input v1 health data structure is shown as below::
|
|
||||||
|
|
||||||
health = {
|
|
||||||
"id": self.FAKE_UUID_1,
|
|
||||||
"listeners": {
|
|
||||||
"listener-id-1": {"status": constants.OPEN, "pools": {
|
|
||||||
"pool-id-1": {"status": constants.UP,
|
|
||||||
"members": {
|
|
||||||
"member-id-1": constants.ONLINE}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Example V2 message::
|
|
||||||
|
|
||||||
{"id": "<amphora_id>",
|
|
||||||
"seq": 67,
|
|
||||||
"listeners": {
|
|
||||||
"<listener_id>": {
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"tx": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"conns": 0,
|
|
||||||
"totconns": 0,
|
|
||||||
"ereq": 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"pools": {
|
|
||||||
"<pool_id>:<listener_id>": {
|
|
||||||
"status": "UP",
|
|
||||||
"members": {
|
|
||||||
"<member_id>": "no check"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"ver": 2
|
|
||||||
}
|
|
||||||
|
|
||||||
"""
|
|
||||||
session = db_api.get_session()
|
|
||||||
|
|
||||||
# We need to see if all of the listeners are reporting in
|
|
||||||
db_lb = self.amphora_repo.get_lb_for_health_update(session,
|
|
||||||
health['id'])
|
|
||||||
ignore_listener_count = False
|
|
||||||
|
|
||||||
if db_lb:
|
|
||||||
expected_listener_count = 0
|
|
||||||
if ('PENDING' in db_lb['provisioning_status'] or
|
|
||||||
not db_lb['enabled']):
|
|
||||||
ignore_listener_count = True
|
|
||||||
else:
|
|
||||||
for key, listener in db_lb.get('listeners', {}).items():
|
|
||||||
# disabled listeners don't report from the amphora
|
|
||||||
if listener['enabled']:
|
|
||||||
expected_listener_count += 1
|
|
||||||
|
|
||||||
# If this is a heartbeat older than versioning, handle
|
|
||||||
# UDP special for backward compatibility.
|
|
||||||
if 'ver' not in health:
|
|
||||||
udp_listeners = [
|
|
||||||
l for k, l in db_lb.get('listeners', {}).items()
|
|
||||||
if l['protocol'] == constants.PROTOCOL_UDP]
|
|
||||||
if udp_listeners:
|
|
||||||
expected_listener_count = (
|
|
||||||
self._update_listener_count_for_UDP(
|
|
||||||
session, db_lb, expected_listener_count))
|
|
||||||
else:
|
|
||||||
# If this is not a spare amp, log and skip it.
|
|
||||||
amp = self.amphora_repo.get(session, id=health['id'])
|
|
||||||
if not amp or amp.load_balancer_id:
|
|
||||||
# This is debug and not warning because this can happen under
|
|
||||||
# normal deleting operations.
|
|
||||||
LOG.debug('Received a health heartbeat from amphora %s with '
|
|
||||||
'IP %s that should not exist. This amphora may be '
|
|
||||||
'in the process of being deleted, in which case you '
|
|
||||||
'will only see this message a few '
|
|
||||||
'times', health['id'], srcaddr)
|
|
||||||
if not amp:
|
|
||||||
LOG.warning('The amphora %s with IP %s is missing from '
|
|
||||||
'the DB, so it cannot be automatically '
|
|
||||||
'deleted (the compute_id is unknown). An '
|
|
||||||
'operator must manually delete it from the '
|
|
||||||
'compute service.', health['id'], srcaddr)
|
|
||||||
return
|
|
||||||
# delete the amp right there
|
|
||||||
try:
|
|
||||||
compute = stevedore_driver.DriverManager(
|
|
||||||
namespace='octavia.compute.drivers',
|
|
||||||
name=CONF.controller_worker.compute_driver,
|
|
||||||
invoke_on_load=True
|
|
||||||
).driver
|
|
||||||
compute.delete(amp.compute_id)
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
LOG.info("Error deleting amp %s with IP %s Error: %s",
|
|
||||||
health['id'], srcaddr, e)
|
|
||||||
expected_listener_count = 0
|
|
||||||
|
|
||||||
listeners = health['listeners']
|
|
||||||
|
|
||||||
# Do not update amphora health if the reporting listener count
|
|
||||||
# does not match the expected listener count
|
|
||||||
if len(listeners) == expected_listener_count or ignore_listener_count:
|
|
||||||
|
|
||||||
lock_session = db_api.get_session(autocommit=False)
|
|
||||||
|
|
||||||
# if we're running too far behind, warn and bail
|
|
||||||
proc_delay = time.time() - health['recv_time']
|
|
||||||
hb_interval = CONF.health_manager.heartbeat_interval
|
|
||||||
# TODO(johnsom) We need to set a warning threshold here, and
|
|
||||||
# escalate to critical when it reaches the
|
|
||||||
# heartbeat_interval
|
|
||||||
if proc_delay >= hb_interval:
|
|
||||||
LOG.warning('Amphora %(id)s health message was processed too '
|
|
||||||
'slowly: %(delay)ss! The system may be overloaded '
|
|
||||||
'or otherwise malfunctioning. This heartbeat has '
|
|
||||||
'been ignored and no update was made to the '
|
|
||||||
'amphora health entry. THIS IS NOT GOOD.',
|
|
||||||
{'id': health['id'], 'delay': proc_delay})
|
|
||||||
return
|
|
||||||
|
|
||||||
# if the input amphora is healthy, we update its db info
|
|
||||||
try:
|
|
||||||
self.amphora_health_repo.replace(
|
|
||||||
lock_session, health['id'],
|
|
||||||
last_update=(datetime.datetime.utcnow()))
|
|
||||||
lock_session.commit()
|
|
||||||
except Exception:
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
lock_session.rollback()
|
|
||||||
else:
|
|
||||||
LOG.warning('Amphora %(id)s health message reports %(found)i '
|
|
||||||
'listeners when %(expected)i expected',
|
|
||||||
{'id': health['id'], 'found': len(listeners),
|
|
||||||
'expected': expected_listener_count})
|
|
||||||
|
|
||||||
# Don't try to update status for spares pool amphora
|
|
||||||
if not db_lb:
|
|
||||||
return
|
|
||||||
|
|
||||||
processed_pools = []
|
|
||||||
potential_offline_pools = {}
|
|
||||||
|
|
||||||
# We got a heartbeat so lb is healthy until proven otherwise
|
|
||||||
if db_lb[constants.ENABLED] is False:
|
|
||||||
lb_status = constants.OFFLINE
|
|
||||||
else:
|
|
||||||
lb_status = constants.ONLINE
|
|
||||||
|
|
||||||
health_msg_version = health.get('ver', 0)
|
|
||||||
|
|
||||||
for listener_id in db_lb.get(constants.LISTENERS, {}):
|
|
||||||
db_listener = db_lb[constants.LISTENERS][listener_id]
|
|
||||||
db_op_status = db_listener[constants.OPERATING_STATUS]
|
|
||||||
listener_status = None
|
|
||||||
listener = None
|
|
||||||
|
|
||||||
if listener_id not in listeners:
|
|
||||||
if (db_listener[constants.ENABLED] and
|
|
||||||
db_lb[constants.PROVISIONING_STATUS] ==
|
|
||||||
constants.ACTIVE):
|
|
||||||
listener_status = constants.ERROR
|
|
||||||
else:
|
|
||||||
listener_status = constants.OFFLINE
|
|
||||||
else:
|
|
||||||
listener = listeners[listener_id]
|
|
||||||
|
|
||||||
# OPEN = HAProxy listener status nbconn < maxconn
|
|
||||||
if listener.get('status') == constants.OPEN:
|
|
||||||
listener_status = constants.ONLINE
|
|
||||||
# FULL = HAProxy listener status not nbconn < maxconn
|
|
||||||
elif listener.get('status') == constants.FULL:
|
|
||||||
listener_status = constants.DEGRADED
|
|
||||||
if lb_status == constants.ONLINE:
|
|
||||||
lb_status = constants.DEGRADED
|
|
||||||
else:
|
|
||||||
LOG.warning(('Listener %(list)s reported status of '
|
|
||||||
'%(status)s'),
|
|
||||||
{'list': listener_id,
|
|
||||||
'status': listener.get('status')})
|
|
||||||
|
|
||||||
try:
|
|
||||||
if (listener_status is not None and
|
|
||||||
listener_status != db_op_status):
|
|
||||||
self._update_status(
|
|
||||||
session, self.listener_repo, constants.LISTENER,
|
|
||||||
listener_id, listener_status, db_op_status)
|
|
||||||
except sqlalchemy.orm.exc.NoResultFound:
|
|
||||||
LOG.error("Listener %s is not in DB", listener_id)
|
|
||||||
|
|
||||||
if not listener:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if health_msg_version < 2:
|
|
||||||
raw_pools = listener['pools']
|
|
||||||
|
|
||||||
# normalize the pool IDs. Single process listener pools
|
|
||||||
# have the listener id appended with an ':' seperator.
|
|
||||||
# Old multi-process listener pools only have a pool ID.
|
|
||||||
# This makes sure the keys are only pool IDs.
|
|
||||||
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in
|
|
||||||
raw_pools.items()}
|
|
||||||
|
|
||||||
for db_pool_id in db_lb.get('pools', {}):
|
|
||||||
# If we saw this pool already on another listener, skip it.
|
|
||||||
if db_pool_id in processed_pools:
|
|
||||||
continue
|
|
||||||
db_pool_dict = db_lb['pools'][db_pool_id]
|
|
||||||
lb_status = self._process_pool_status(
|
|
||||||
session, db_pool_id, db_pool_dict, pools,
|
|
||||||
lb_status, processed_pools, potential_offline_pools)
|
|
||||||
|
|
||||||
if health_msg_version >= 2:
|
|
||||||
raw_pools = health['pools']
|
|
||||||
|
|
||||||
# normalize the pool IDs. Single process listener pools
|
|
||||||
# have the listener id appended with an ':' seperator.
|
|
||||||
# Old multi-process listener pools only have a pool ID.
|
|
||||||
# This makes sure the keys are only pool IDs.
|
|
||||||
pools = {(k + ' ')[:k.rfind(':')]: v for k, v in raw_pools.items()}
|
|
||||||
|
|
||||||
for db_pool_id in db_lb.get('pools', {}):
|
|
||||||
# If we saw this pool already, skip it.
|
|
||||||
if db_pool_id in processed_pools:
|
|
||||||
continue
|
|
||||||
db_pool_dict = db_lb['pools'][db_pool_id]
|
|
||||||
lb_status = self._process_pool_status(
|
|
||||||
session, db_pool_id, db_pool_dict, pools,
|
|
||||||
lb_status, processed_pools, potential_offline_pools)
|
|
||||||
|
|
||||||
for pool_id in potential_offline_pools:
|
|
||||||
# Skip if we eventually found a status for this pool
|
|
||||||
if pool_id in processed_pools:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
# If the database doesn't already show the pool offline, update
|
|
||||||
if potential_offline_pools[pool_id] != constants.OFFLINE:
|
|
||||||
self._update_status(
|
|
||||||
session, self.pool_repo, constants.POOL,
|
|
||||||
pool_id, constants.OFFLINE,
|
|
||||||
potential_offline_pools[pool_id])
|
|
||||||
except sqlalchemy.orm.exc.NoResultFound:
|
|
||||||
LOG.error("Pool %s is not in DB", pool_id)
|
|
||||||
|
|
||||||
# Update the load balancer status last
|
|
||||||
try:
|
|
||||||
if lb_status != db_lb['operating_status']:
|
|
||||||
self._update_status(
|
|
||||||
session, self.loadbalancer_repo,
|
|
||||||
constants.LOADBALANCER, db_lb['id'], lb_status,
|
|
||||||
db_lb[constants.OPERATING_STATUS])
|
|
||||||
except sqlalchemy.orm.exc.NoResultFound:
|
|
||||||
LOG.error("Load balancer %s is not in DB", db_lb.id)
|
|
||||||
|
|
||||||
def _process_pool_status(
|
|
||||||
self, session, pool_id, db_pool_dict, pools, lb_status,
|
|
||||||
processed_pools, potential_offline_pools):
|
|
||||||
pool_status = None
|
|
||||||
|
|
||||||
if pool_id not in pools:
|
|
||||||
# If we don't have a status update for this pool_id
|
|
||||||
# add it to the list of potential offline pools and continue.
|
|
||||||
# We will check the potential offline pool list after we
|
|
||||||
# finish processing the status updates from all of the listeners.
|
|
||||||
potential_offline_pools[pool_id] = db_pool_dict['operating_status']
|
|
||||||
return lb_status
|
|
||||||
|
|
||||||
pool = pools[pool_id]
|
|
||||||
|
|
||||||
processed_pools.append(pool_id)
|
|
||||||
|
|
||||||
# UP = HAProxy backend has working or no servers
|
|
||||||
if pool.get('status') == constants.UP:
|
|
||||||
pool_status = constants.ONLINE
|
|
||||||
# DOWN = HAProxy backend has no working servers
|
|
||||||
elif pool.get('status') == constants.DOWN:
|
|
||||||
pool_status = constants.ERROR
|
|
||||||
lb_status = constants.ERROR
|
|
||||||
else:
|
|
||||||
LOG.warning(('Pool %(pool)s reported status of '
|
|
||||||
'%(status)s'),
|
|
||||||
{'pool': pool_id,
|
|
||||||
'status': pool.get('status')})
|
|
||||||
|
|
||||||
# Deal with the members that are reporting from
|
|
||||||
# the Amphora
|
|
||||||
members = pool['members']
|
|
||||||
for member_id in db_pool_dict.get('members', {}):
|
|
||||||
member_status = None
|
|
||||||
member_db_status = (
|
|
||||||
db_pool_dict['members'][member_id]['operating_status'])
|
|
||||||
|
|
||||||
if member_id not in members:
|
|
||||||
if member_db_status != constants.NO_MONITOR:
|
|
||||||
member_status = constants.OFFLINE
|
|
||||||
else:
|
|
||||||
status = members[member_id]
|
|
||||||
|
|
||||||
# Member status can be "UP" or "UP #/#"
|
|
||||||
# (transitional)
|
|
||||||
if status.startswith(constants.UP):
|
|
||||||
member_status = constants.ONLINE
|
|
||||||
# Member status can be "DOWN" or "DOWN #/#"
|
|
||||||
# (transitional)
|
|
||||||
elif status.startswith(constants.DOWN):
|
|
||||||
member_status = constants.ERROR
|
|
||||||
if pool_status == constants.ONLINE:
|
|
||||||
pool_status = constants.DEGRADED
|
|
||||||
if lb_status == constants.ONLINE:
|
|
||||||
lb_status = constants.DEGRADED
|
|
||||||
elif status == constants.DRAIN:
|
|
||||||
member_status = constants.DRAINING
|
|
||||||
elif status == constants.MAINT:
|
|
||||||
member_status = constants.OFFLINE
|
|
||||||
elif status == constants.NO_CHECK:
|
|
||||||
member_status = constants.NO_MONITOR
|
|
||||||
elif status == constants.RESTARTING:
|
|
||||||
# RESTARTING means that keepalived is restarting and a down
|
|
||||||
# member has been detected, the real status of the member
|
|
||||||
# is not clear, it might mean that the checker hasn't run
|
|
||||||
# yet.
|
|
||||||
# In this case, keep previous member_status, and wait for a
|
|
||||||
# non-transitional status.
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
LOG.warning('Member %(mem)s reported '
|
|
||||||
'status of %(status)s',
|
|
||||||
{'mem': member_id,
|
|
||||||
'status': status})
|
|
||||||
|
|
||||||
try:
|
|
||||||
if (member_status is not None and
|
|
||||||
member_status != member_db_status):
|
|
||||||
self._update_status(
|
|
||||||
session, self.member_repo, constants.MEMBER,
|
|
||||||
member_id, member_status, member_db_status)
|
|
||||||
except sqlalchemy.orm.exc.NoResultFound:
|
|
||||||
LOG.error("Member %s is not able to update "
|
|
||||||
"in DB", member_id)
|
|
||||||
|
|
||||||
try:
|
|
||||||
if (pool_status is not None and
|
|
||||||
pool_status != db_pool_dict['operating_status']):
|
|
||||||
self._update_status(
|
|
||||||
session, self.pool_repo, constants.POOL,
|
|
||||||
pool_id, pool_status, db_pool_dict['operating_status'])
|
|
||||||
except sqlalchemy.orm.exc.NoResultFound:
|
|
||||||
LOG.error("Pool %s is not in DB", pool_id)
|
|
||||||
|
|
||||||
return lb_status
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateStatsDb(update_base.StatsUpdateBase, stats.StatsMixin):
|
|
||||||
|
|
||||||
def update_stats(self, health_message, srcaddr):
|
|
||||||
# The executor will eat any exceptions from the update_stats code
|
|
||||||
# so we need to wrap it and log the unhandled exception
|
|
||||||
try:
|
|
||||||
self._update_stats(health_message, srcaddr)
|
|
||||||
except Exception:
|
|
||||||
LOG.exception('update_stats encountered an unknown error '
|
|
||||||
'processing stats for amphora %s with IP '
|
|
||||||
'%s', health_message['id'], srcaddr)
|
|
||||||
|
|
||||||
def _update_stats(self, health_message, srcaddr):
|
|
||||||
"""This function is to update the db with listener stats
|
|
||||||
|
|
||||||
:param health_message: The health message containing the listener stats
|
|
||||||
:type map: string
|
|
||||||
:returns: null
|
|
||||||
|
|
||||||
Example V1 message::
|
|
||||||
|
|
||||||
health = {
|
|
||||||
"id": self.FAKE_UUID_1,
|
|
||||||
"listeners": {
|
|
||||||
"listener-id-1": {
|
|
||||||
"status": constants.OPEN,
|
|
||||||
"stats": {
|
|
||||||
"ereq":0,
|
|
||||||
"conns": 0,
|
|
||||||
"totconns": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"tx": 0,
|
|
||||||
},
|
|
||||||
"pools": {
|
|
||||||
"pool-id-1": {
|
|
||||||
"status": constants.UP,
|
|
||||||
"members": {"member-id-1": constants.ONLINE}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Example V2 message::
|
|
||||||
|
|
||||||
{"id": "<amphora_id>",
|
|
||||||
"seq": 67,
|
|
||||||
"listeners": {
|
|
||||||
"<listener_id>": {
|
|
||||||
"status": "OPEN",
|
|
||||||
"stats": {
|
|
||||||
"tx": 0,
|
|
||||||
"rx": 0,
|
|
||||||
"conns": 0,
|
|
||||||
"totconns": 0,
|
|
||||||
"ereq": 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"pools": {
|
|
||||||
"<pool_id>:<listener_id>": {
|
|
||||||
"status": "UP",
|
|
||||||
"members": {
|
|
||||||
"<member_id>": "no check"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"ver": 2
|
|
||||||
}
|
|
||||||
|
|
||||||
Example V3 message::
|
|
||||||
|
|
||||||
See V2 message, except values are deltas rather than absolutes.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
version = health_message.get("ver", 1)
|
|
||||||
|
|
||||||
if version <= 2:
|
|
||||||
self.version2(health_message)
|
|
||||||
elif version == 3:
|
|
||||||
self.version3(health_message)
|
|
||||||
else:
|
|
||||||
LOG.warning("Unknown message version: %s, ignoring...", version)
|
|
||||||
|
|
||||||
def version2(self, health_message):
|
|
||||||
"""Parse version 1 and 2 of the health message.
|
|
||||||
|
|
||||||
:param health_message: health message dictionary
|
|
||||||
:type health_message: dict
|
|
||||||
"""
|
|
||||||
|
|
||||||
session = db_api.get_session()
|
|
||||||
|
|
||||||
amphora_id = health_message['id']
|
|
||||||
listeners = health_message['listeners']
|
|
||||||
for listener_id, listener in listeners.items():
|
|
||||||
|
|
||||||
stats = listener.get('stats')
|
|
||||||
stats = {'bytes_in': stats['rx'], 'bytes_out': stats['tx'],
|
|
||||||
'active_connections': stats['conns'],
|
|
||||||
'total_connections': stats['totconns'],
|
|
||||||
'request_errors': stats['ereq']}
|
|
||||||
LOG.debug("Updating listener stats in db."
|
|
||||||
"Listener %s / Amphora %s stats: %s",
|
|
||||||
listener_id, amphora_id, stats)
|
|
||||||
self.listener_stats_repo.replace(
|
|
||||||
session, listener_id, amphora_id, **stats)
|
|
||||||
|
|
||||||
def version3(self, health_message):
|
|
||||||
"""Parse version 3 of the health message.
|
|
||||||
|
|
||||||
:param health_message: health message dictionary
|
|
||||||
:type health_message: dict
|
|
||||||
"""
|
|
||||||
|
|
||||||
session = db_api.get_session()
|
|
||||||
|
|
||||||
amphora_id = health_message['id']
|
|
||||||
listeners = health_message['listeners']
|
|
||||||
for listener_id, listener in listeners.items():
|
|
||||||
|
|
||||||
delta_stats = listener.get('stats')
|
|
||||||
delta_stats_model = data_models.ListenerStatistics(
|
|
||||||
listener_id=listener_id,
|
|
||||||
amphora_id=amphora_id,
|
|
||||||
bytes_in=delta_stats['rx'],
|
|
||||||
bytes_out=delta_stats['tx'],
|
|
||||||
active_connections=delta_stats['conns'],
|
|
||||||
total_connections=delta_stats['totconns'],
|
|
||||||
request_errors=delta_stats['ereq']
|
|
||||||
)
|
|
||||||
LOG.debug("Updating listener stats in db."
|
|
||||||
"Listener %s / Amphora %s stats: %s",
|
|
||||||
listener_id, amphora_id, delta_stats_model.to_dict())
|
|
||||||
self.listener_stats_repo.increment(session, delta_stats_model)
|
|
|
@ -176,7 +176,6 @@ class ListenerStatistics(base_models.BASE):
|
||||||
return value
|
return value
|
||||||
|
|
||||||
def __iadd__(self, other):
|
def __iadd__(self, other):
|
||||||
|
|
||||||
if isinstance(other, (ListenerStatistics,
|
if isinstance(other, (ListenerStatistics,
|
||||||
data_models.ListenerStatistics)):
|
data_models.ListenerStatistics)):
|
||||||
self.bytes_in += other.bytes_in
|
self.bytes_in += other.bytes_in
|
||||||
|
|
|
@ -1206,21 +1206,30 @@ class ListenerRepository(BaseRepository):
|
||||||
class ListenerStatisticsRepository(BaseRepository):
|
class ListenerStatisticsRepository(BaseRepository):
|
||||||
model_class = models.ListenerStatistics
|
model_class = models.ListenerStatistics
|
||||||
|
|
||||||
def replace(self, session, listener_id, amphora_id, **model_kwargs):
|
def replace(self, session, stats_obj):
|
||||||
"""replace or insert listener into database."""
|
"""Create or override a listener's statistics (insert/update)
|
||||||
|
|
||||||
|
:param session: A Sql Alchemy database session
|
||||||
|
:param stats_obj: Listener statistics object to store
|
||||||
|
:type stats_obj: octavia.common.data_models.ListenerStatistics
|
||||||
|
"""
|
||||||
|
if not stats_obj.amphora_id:
|
||||||
|
# amphora_id can't be null, so clone the listener_id
|
||||||
|
stats_obj.amphora_id = stats_obj.listener_id
|
||||||
|
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
|
# TODO(johnsom): This can be simplified/optimized using an "upsert"
|
||||||
count = session.query(self.model_class).filter_by(
|
count = session.query(self.model_class).filter_by(
|
||||||
listener_id=listener_id, amphora_id=amphora_id).count()
|
listener_id=stats_obj.listener_id,
|
||||||
|
amphora_id=stats_obj.amphora_id).count()
|
||||||
if count:
|
if count:
|
||||||
session.query(self.model_class).filter_by(
|
session.query(self.model_class).filter_by(
|
||||||
listener_id=listener_id,
|
listener_id=stats_obj.listener_id,
|
||||||
amphora_id=amphora_id).update(
|
amphora_id=stats_obj.amphora_id).update(
|
||||||
model_kwargs,
|
stats_obj.get_stats(),
|
||||||
synchronize_session=False)
|
synchronize_session=False)
|
||||||
else:
|
else:
|
||||||
model_kwargs['listener_id'] = listener_id
|
self.create(session, **stats_obj.db_fields())
|
||||||
model_kwargs['amphora_id'] = amphora_id
|
|
||||||
self.create(session, **model_kwargs)
|
|
||||||
|
|
||||||
def increment(self, session, delta_stats):
|
def increment(self, session, delta_stats):
|
||||||
"""Updates a listener's statistics, incrementing by the passed deltas.
|
"""Updates a listener's statistics, incrementing by the passed deltas.
|
||||||
|
@ -1228,10 +1237,13 @@ class ListenerStatisticsRepository(BaseRepository):
|
||||||
:param session: A Sql Alchemy database session
|
:param session: A Sql Alchemy database session
|
||||||
:param delta_stats: Listener statistics deltas to add
|
:param delta_stats: Listener statistics deltas to add
|
||||||
:type delta_stats: octavia.common.data_models.ListenerStatistics
|
:type delta_stats: octavia.common.data_models.ListenerStatistics
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
if not delta_stats.amphora_id:
|
||||||
|
# amphora_id can't be null, so clone the listener_id
|
||||||
|
delta_stats.amphora_id = delta_stats.listener_id
|
||||||
|
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
|
# TODO(johnsom): This can be simplified/optimized using an "upsert"
|
||||||
count = session.query(self.model_class).filter_by(
|
count = session.query(self.model_class).filter_by(
|
||||||
listener_id=delta_stats.listener_id,
|
listener_id=delta_stats.listener_id,
|
||||||
amphora_id=delta_stats.amphora_id).count()
|
amphora_id=delta_stats.amphora_id).count()
|
||||||
|
@ -1244,7 +1256,7 @@ class ListenerStatisticsRepository(BaseRepository):
|
||||||
existing_stats.active_connections = (
|
existing_stats.active_connections = (
|
||||||
delta_stats.active_connections)
|
delta_stats.active_connections)
|
||||||
else:
|
else:
|
||||||
self.create(session, **delta_stats.to_dict())
|
self.create(session, **delta_stats.db_fields())
|
||||||
|
|
||||||
def update(self, session, listener_id, **model_kwargs):
|
def update(self, session, listener_id, **model_kwargs):
|
||||||
"""Updates a listener's statistics, overriding with the passed values.
|
"""Updates a listener's statistics, overriding with the passed values.
|
||||||
|
|
|
@ -32,7 +32,7 @@ def list_opts():
|
||||||
('networking', octavia.common.config.networking_opts),
|
('networking', octavia.common.config.networking_opts),
|
||||||
('oslo_messaging', octavia.common.config.oslo_messaging_opts),
|
('oslo_messaging', octavia.common.config.oslo_messaging_opts),
|
||||||
('haproxy_amphora', octavia.common.config.haproxy_amphora_opts),
|
('haproxy_amphora', octavia.common.config.haproxy_amphora_opts),
|
||||||
('health_manager', octavia.common.config.healthmanager_opts),
|
('health_manager', octavia.common.config.health_manager_opts),
|
||||||
('controller_worker', octavia.common.config.controller_worker_opts),
|
('controller_worker', octavia.common.config.controller_worker_opts),
|
||||||
('task_flow', octavia.common.config.task_flow_opts),
|
('task_flow', octavia.common.config.task_flow_opts),
|
||||||
('certificates', itertools.chain(
|
('certificates', itertools.chain(
|
||||||
|
|
|
@ -14,16 +14,16 @@
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from octavia.controller.healthmanager.health_drivers import update_base
|
from octavia.statistics import stats_base
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class HealthUpdateLogger(update_base.HealthUpdateBase):
|
class StatsLogger(stats_base.StatsDriverMixin):
|
||||||
def update_health(self, health, srcaddr):
|
def update_stats(self, listener_stats, deltas=False):
|
||||||
LOG.info("Health update triggered for: %s", health.get('id'))
|
for stats_object in listener_stats:
|
||||||
|
LOG.info("Logging listener stats%s for listener `%s` / "
|
||||||
|
"amphora `%s`: %s",
|
||||||
class StatsUpdateLogger(update_base.StatsUpdateBase):
|
' deltas' if deltas else '',
|
||||||
def update_stats(self, health_message, srcaddr):
|
stats_object.listener_id, stats_object.amphora_id,
|
||||||
LOG.info("Stats update triggered for: %s", health_message.get('id'))
|
stats_object.get_stats())
|
|
@ -0,0 +1,43 @@
|
||||||
|
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from octavia.db import api as db_api
|
||||||
|
from octavia.db import repositories as repo
|
||||||
|
from octavia.statistics import stats_base
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class StatsUpdateDb(stats_base.StatsDriverMixin):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.listener_stats_repo = repo.ListenerStatisticsRepository()
|
||||||
|
|
||||||
|
def update_stats(self, listener_stats, deltas=False):
|
||||||
|
"""This function is to update the db with listener stats"""
|
||||||
|
session = db_api.get_session()
|
||||||
|
for stats_object in listener_stats:
|
||||||
|
LOG.debug("Updating listener stats in db for listener `%s` / "
|
||||||
|
"amphora `%s`: %s",
|
||||||
|
stats_object.listener_id, stats_object.amphora_id,
|
||||||
|
stats_object.get_stats())
|
||||||
|
if deltas:
|
||||||
|
self.listener_stats_repo.increment(session, stats_object)
|
||||||
|
else:
|
||||||
|
self.listener_stats_repo.replace(session, stats_object)
|
|
@ -0,0 +1,60 @@
|
||||||
|
# Copyright 2011-2014 OpenStack Foundation,author: Min Wang,German Eichberger
|
||||||
|
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from stevedore import named as stevedore_named
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
_STATS_HANDLERS = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_stats_handlers():
|
||||||
|
global _STATS_HANDLERS
|
||||||
|
if _STATS_HANDLERS is None:
|
||||||
|
_STATS_HANDLERS = stevedore_named.NamedExtensionManager(
|
||||||
|
namespace='octavia.statistics.drivers',
|
||||||
|
names=CONF.controller_worker.statistics_drivers,
|
||||||
|
invoke_on_load=True,
|
||||||
|
propagate_map_exceptions=False
|
||||||
|
)
|
||||||
|
return _STATS_HANDLERS
|
||||||
|
|
||||||
|
|
||||||
|
def update_stats_via_driver(listener_stats, deltas=False):
|
||||||
|
"""Send listener stats to the enabled stats driver(s)
|
||||||
|
|
||||||
|
:param listener_stats: A list of ListenerStatistics objects
|
||||||
|
:type listener_stats: list
|
||||||
|
:param deltas: Indicates whether the stats are deltas (false==absolute)
|
||||||
|
:type deltas: bool
|
||||||
|
"""
|
||||||
|
handlers = _get_stats_handlers()
|
||||||
|
handlers.map_method('update_stats', listener_stats, deltas=deltas)
|
||||||
|
|
||||||
|
|
||||||
|
class StatsDriverMixin(object, metaclass=abc.ABCMeta):
|
||||||
|
@abc.abstractmethod
|
||||||
|
def update_stats(self, listener_stats, deltas=False):
|
||||||
|
"""Return a stats object formatted for a generic backend
|
||||||
|
|
||||||
|
:param listener_stats: A list of data_model.ListenerStatistics objects
|
||||||
|
:type listener_stats: list
|
||||||
|
:param deltas: Indicates whether the stats are deltas (false==absolute)
|
||||||
|
:type deltas: bool
|
||||||
|
"""
|
|
@ -3174,17 +3174,21 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
|
||||||
request_errors = random.randrange(1000000000)
|
request_errors = random.randrange(1000000000)
|
||||||
self.assertIsNone(self.listener_stats_repo.get(
|
self.assertIsNone(self.listener_stats_repo.get(
|
||||||
self.session, listener_id=self.listener.id))
|
self.session, listener_id=self.listener.id))
|
||||||
self.listener_stats_repo.replace(self.session, self.listener.id,
|
stats_obj = data_models.ListenerStatistics(
|
||||||
self.amphora.id,
|
listener_id=self.listener.id,
|
||||||
bytes_in=bytes_in,
|
amphora_id=self.amphora.id,
|
||||||
bytes_out=bytes_out,
|
bytes_in=bytes_in,
|
||||||
active_connections=active_conns,
|
bytes_out=bytes_out,
|
||||||
total_connections=total_conns,
|
active_connections=active_conns,
|
||||||
request_errors=request_errors)
|
total_connections=total_conns,
|
||||||
|
request_errors=request_errors
|
||||||
|
)
|
||||||
|
self.listener_stats_repo.replace(self.session, stats_obj)
|
||||||
obj = self.listener_stats_repo.get(self.session,
|
obj = self.listener_stats_repo.get(self.session,
|
||||||
listener_id=self.listener.id)
|
listener_id=self.listener.id)
|
||||||
self.assertIsNotNone(obj)
|
self.assertIsNotNone(obj)
|
||||||
self.assertEqual(self.listener.id, obj.listener_id)
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.amphora.id, obj.amphora_id)
|
||||||
self.assertEqual(bytes_in, obj.bytes_in)
|
self.assertEqual(bytes_in, obj.bytes_in)
|
||||||
self.assertEqual(bytes_out, obj.bytes_out)
|
self.assertEqual(bytes_out, obj.bytes_out)
|
||||||
self.assertEqual(active_conns, obj.active_connections)
|
self.assertEqual(active_conns, obj.active_connections)
|
||||||
|
@ -3197,23 +3201,49 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
|
||||||
active_conns_2 = random.randrange(1000000000)
|
active_conns_2 = random.randrange(1000000000)
|
||||||
total_conns_2 = random.randrange(1000000000)
|
total_conns_2 = random.randrange(1000000000)
|
||||||
request_errors_2 = random.randrange(1000000000)
|
request_errors_2 = random.randrange(1000000000)
|
||||||
self.listener_stats_repo.replace(self.session, self.listener.id,
|
stats_obj_2 = data_models.ListenerStatistics(
|
||||||
self.amphora.id,
|
listener_id=self.listener.id,
|
||||||
bytes_in=bytes_in_2,
|
amphora_id=self.amphora.id,
|
||||||
bytes_out=bytes_out_2,
|
bytes_in=bytes_in_2,
|
||||||
active_connections=active_conns_2,
|
bytes_out=bytes_out_2,
|
||||||
total_connections=total_conns_2,
|
active_connections=active_conns_2,
|
||||||
request_errors=request_errors_2)
|
total_connections=total_conns_2,
|
||||||
|
request_errors=request_errors_2
|
||||||
|
)
|
||||||
|
self.listener_stats_repo.replace(self.session, stats_obj_2)
|
||||||
obj = self.listener_stats_repo.get(self.session,
|
obj = self.listener_stats_repo.get(self.session,
|
||||||
listener_id=self.listener.id)
|
listener_id=self.listener.id)
|
||||||
self.assertIsNotNone(obj)
|
self.assertIsNotNone(obj)
|
||||||
self.assertEqual(self.listener.id, obj.listener_id)
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.amphora.id, obj.amphora_id)
|
||||||
self.assertEqual(bytes_in_2, obj.bytes_in)
|
self.assertEqual(bytes_in_2, obj.bytes_in)
|
||||||
self.assertEqual(bytes_out_2, obj.bytes_out)
|
self.assertEqual(bytes_out_2, obj.bytes_out)
|
||||||
self.assertEqual(active_conns_2, obj.active_connections)
|
self.assertEqual(active_conns_2, obj.active_connections)
|
||||||
self.assertEqual(total_conns_2, obj.total_connections)
|
self.assertEqual(total_conns_2, obj.total_connections)
|
||||||
self.assertEqual(request_errors_2, obj.request_errors)
|
self.assertEqual(request_errors_2, obj.request_errors)
|
||||||
|
|
||||||
|
# Test uses listener_id as amphora_id if not passed
|
||||||
|
stats_obj = data_models.ListenerStatistics(
|
||||||
|
listener_id=self.listener.id,
|
||||||
|
bytes_in=bytes_in,
|
||||||
|
bytes_out=bytes_out,
|
||||||
|
active_connections=active_conns,
|
||||||
|
total_connections=total_conns,
|
||||||
|
request_errors=request_errors
|
||||||
|
)
|
||||||
|
self.listener_stats_repo.replace(self.session, stats_obj)
|
||||||
|
obj = self.listener_stats_repo.get(self.session,
|
||||||
|
listener_id=self.listener.id,
|
||||||
|
amphora_id=self.listener.id)
|
||||||
|
self.assertIsNotNone(obj)
|
||||||
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.listener.id, obj.amphora_id)
|
||||||
|
self.assertEqual(bytes_in, obj.bytes_in)
|
||||||
|
self.assertEqual(bytes_out, obj.bytes_out)
|
||||||
|
self.assertEqual(active_conns, obj.active_connections)
|
||||||
|
self.assertEqual(total_conns, obj.total_connections)
|
||||||
|
self.assertEqual(request_errors, obj.request_errors)
|
||||||
|
|
||||||
def test_increment(self):
|
def test_increment(self):
|
||||||
# Test the create path
|
# Test the create path
|
||||||
bytes_in = random.randrange(1000000000)
|
bytes_in = random.randrange(1000000000)
|
||||||
|
@ -3237,6 +3267,7 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
|
||||||
listener_id=self.listener.id)
|
listener_id=self.listener.id)
|
||||||
self.assertIsNotNone(obj)
|
self.assertIsNotNone(obj)
|
||||||
self.assertEqual(self.listener.id, obj.listener_id)
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.amphora.id, obj.amphora_id)
|
||||||
self.assertEqual(bytes_in, obj.bytes_in)
|
self.assertEqual(bytes_in, obj.bytes_in)
|
||||||
self.assertEqual(bytes_out, obj.bytes_out)
|
self.assertEqual(bytes_out, obj.bytes_out)
|
||||||
self.assertEqual(active_conns, obj.active_connections)
|
self.assertEqual(active_conns, obj.active_connections)
|
||||||
|
@ -3263,12 +3294,35 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
|
||||||
listener_id=self.listener.id)
|
listener_id=self.listener.id)
|
||||||
self.assertIsNotNone(obj)
|
self.assertIsNotNone(obj)
|
||||||
self.assertEqual(self.listener.id, obj.listener_id)
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.amphora.id, obj.amphora_id)
|
||||||
self.assertEqual(bytes_in + bytes_in_2, obj.bytes_in)
|
self.assertEqual(bytes_in + bytes_in_2, obj.bytes_in)
|
||||||
self.assertEqual(bytes_out + bytes_out_2, obj.bytes_out)
|
self.assertEqual(bytes_out + bytes_out_2, obj.bytes_out)
|
||||||
self.assertEqual(active_conns_2, obj.active_connections) # not a delta
|
self.assertEqual(active_conns_2, obj.active_connections) # not a delta
|
||||||
self.assertEqual(total_conns + total_conns_2, obj.total_connections)
|
self.assertEqual(total_conns + total_conns_2, obj.total_connections)
|
||||||
self.assertEqual(request_errors + request_errors_2, obj.request_errors)
|
self.assertEqual(request_errors + request_errors_2, obj.request_errors)
|
||||||
|
|
||||||
|
# Test uses listener_id as amphora_id if not passed
|
||||||
|
stats_obj = data_models.ListenerStatistics(
|
||||||
|
listener_id=self.listener.id,
|
||||||
|
bytes_in=bytes_in,
|
||||||
|
bytes_out=bytes_out,
|
||||||
|
active_connections=active_conns,
|
||||||
|
total_connections=total_conns,
|
||||||
|
request_errors=request_errors
|
||||||
|
)
|
||||||
|
self.listener_stats_repo.increment(self.session, stats_obj)
|
||||||
|
obj = self.listener_stats_repo.get(self.session,
|
||||||
|
listener_id=self.listener.id,
|
||||||
|
amphora_id=self.listener.id)
|
||||||
|
self.assertIsNotNone(obj)
|
||||||
|
self.assertEqual(self.listener.id, obj.listener_id)
|
||||||
|
self.assertEqual(self.listener.id, obj.amphora_id)
|
||||||
|
self.assertEqual(bytes_in, obj.bytes_in)
|
||||||
|
self.assertEqual(bytes_out, obj.bytes_out)
|
||||||
|
self.assertEqual(active_conns, obj.active_connections)
|
||||||
|
self.assertEqual(total_conns, obj.total_connections)
|
||||||
|
self.assertEqual(request_errors, obj.request_errors)
|
||||||
|
|
||||||
|
|
||||||
class HealthMonitorRepositoryTest(BaseRepositoryTest):
|
class HealthMonitorRepositoryTest(BaseRepositoryTest):
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -25,20 +25,6 @@ from octavia.tests.unit import base
|
||||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
|
||||||
class TestLoggingUpdate(base.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.mixin = driver.LoggingUpdate()
|
|
||||||
|
|
||||||
def test_update_stats(self):
|
|
||||||
self.mixin.update_stats('test update stats')
|
|
||||||
self.assertEqual('test update stats', self.mixin.stats)
|
|
||||||
|
|
||||||
def test_update_health(self):
|
|
||||||
self.mixin.update_health('test update health')
|
|
||||||
self.assertEqual('test update health', self.mixin.health)
|
|
||||||
|
|
||||||
|
|
||||||
class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
|
class TestNoopAmphoraLoadBalancerDriver(base.TestCase):
|
||||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ from octavia_lib.api.drivers import exceptions as driver_exceptions
|
||||||
from octavia_lib.common import constants as lib_consts
|
from octavia_lib.common import constants as lib_consts
|
||||||
|
|
||||||
from octavia.api.drivers.driver_agent import driver_updater
|
from octavia.api.drivers.driver_agent import driver_updater
|
||||||
|
from octavia.common import data_models
|
||||||
import octavia.tests.unit.base as base
|
import octavia.tests.unit.base as base
|
||||||
|
|
||||||
|
|
||||||
|
@ -242,37 +243,52 @@ class TestDriverUpdater(base.TestCase):
|
||||||
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
||||||
lib_consts.FAULT_STRING: 'boom'}, result)
|
lib_consts.FAULT_STRING: 'boom'}, result)
|
||||||
|
|
||||||
@mock.patch('octavia.db.repositories.ListenerStatisticsRepository.replace')
|
@mock.patch('time.time')
|
||||||
def test_update_listener_statistics(self, mock_replace):
|
@mock.patch('octavia.statistics.stats_base.update_stats_via_driver')
|
||||||
listener_stats_list = [{"id": 1, "active_connections": 10,
|
def test_update_listener_statistics(self, mock_stats_base, mock_time):
|
||||||
"bytes_in": 20,
|
mock_time.return_value = 12345.6
|
||||||
"bytes_out": 30,
|
listener_stats_li = [
|
||||||
"request_errors": 40,
|
{"id": 1,
|
||||||
"total_connections": 50},
|
"active_connections": 10,
|
||||||
{"id": 2, "active_connections": 60,
|
"bytes_in": 20,
|
||||||
"bytes_in": 70,
|
"bytes_out": 30,
|
||||||
"bytes_out": 80,
|
"request_errors": 40,
|
||||||
"request_errors": 90,
|
"total_connections": 50},
|
||||||
"total_connections": 100}]
|
{"id": 2,
|
||||||
listener_stats_dict = {"listeners": listener_stats_list}
|
"active_connections": 60,
|
||||||
|
"bytes_in": 70,
|
||||||
|
"bytes_out": 80,
|
||||||
|
"request_errors": 90,
|
||||||
|
"total_connections": 100}]
|
||||||
|
listener_stats_dict = {"listeners": listener_stats_li}
|
||||||
|
|
||||||
mock_replace.side_effect = [mock.DEFAULT, mock.DEFAULT,
|
mock_stats_base.side_effect = [mock.DEFAULT, Exception('boom')]
|
||||||
Exception('boom')]
|
|
||||||
result = self.driver_updater.update_listener_statistics(
|
result = self.driver_updater.update_listener_statistics(
|
||||||
copy.deepcopy(listener_stats_dict))
|
listener_stats_dict)
|
||||||
calls = [call(self.mock_session, 1, 1, active_connections=10,
|
listener_stats_objects = [
|
||||||
bytes_in=20, bytes_out=30, request_errors=40,
|
data_models.ListenerStatistics(
|
||||||
total_connections=50),
|
listener_id=listener_stats_li[0]['id'],
|
||||||
call(self.mock_session, 2, 2, active_connections=60,
|
active_connections=listener_stats_li[0]['active_connections'],
|
||||||
bytes_in=70, bytes_out=80, request_errors=90,
|
bytes_in=listener_stats_li[0]['bytes_in'],
|
||||||
total_connections=100)]
|
bytes_out=listener_stats_li[0]['bytes_out'],
|
||||||
mock_replace.assert_has_calls(calls)
|
request_errors=listener_stats_li[0]['request_errors'],
|
||||||
|
total_connections=listener_stats_li[0]['total_connections'],
|
||||||
|
received_time=mock_time.return_value),
|
||||||
|
data_models.ListenerStatistics(
|
||||||
|
listener_id=listener_stats_li[1]['id'],
|
||||||
|
active_connections=listener_stats_li[1]['active_connections'],
|
||||||
|
bytes_in=listener_stats_li[1]['bytes_in'],
|
||||||
|
bytes_out=listener_stats_li[1]['bytes_out'],
|
||||||
|
request_errors=listener_stats_li[1]['request_errors'],
|
||||||
|
total_connections=listener_stats_li[1]['total_connections'],
|
||||||
|
received_time=mock_time.return_value)]
|
||||||
|
mock_stats_base.assert_called_once_with(listener_stats_objects)
|
||||||
self.assertEqual(self.ref_ok_response, result)
|
self.assertEqual(self.ref_ok_response, result)
|
||||||
|
|
||||||
# Test empty stats updates
|
# Test empty stats updates
|
||||||
mock_replace.reset_mock()
|
mock_stats_base.reset_mock()
|
||||||
result = self.driver_updater.update_listener_statistics({})
|
result = self.driver_updater.update_listener_statistics({})
|
||||||
mock_replace.assert_not_called()
|
mock_stats_base.assert_not_called()
|
||||||
self.assertEqual(self.ref_ok_response, result)
|
self.assertEqual(self.ref_ok_response, result)
|
||||||
|
|
||||||
# Test missing ID
|
# Test missing ID
|
||||||
|
@ -286,9 +302,9 @@ class TestDriverUpdater(base.TestCase):
|
||||||
|
|
||||||
# Test for replace exception
|
# Test for replace exception
|
||||||
result = self.driver_updater.update_listener_statistics(
|
result = self.driver_updater.update_listener_statistics(
|
||||||
copy.deepcopy(listener_stats_dict))
|
listener_stats_dict)
|
||||||
ref_update_listener_stats_error = {
|
ref_update_listener_stats_error = {
|
||||||
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
lib_consts.STATUS_CODE: lib_consts.DRVR_STATUS_CODE_FAILED,
|
||||||
lib_consts.STATS_OBJECT: lib_consts.LISTENERS,
|
lib_consts.STATS_OBJECT: lib_consts.LISTENERS,
|
||||||
lib_consts.FAULT_STRING: 'boom', lib_consts.STATS_OBJECT_ID: 1}
|
lib_consts.FAULT_STRING: 'boom'}
|
||||||
self.assertEqual(ref_update_listener_stats_error, result)
|
self.assertEqual(ref_update_listener_stats_error, result)
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
# Copyright 2018 GoDaddy
|
|
||||||
# Copyright (c) 2015 Rackspace
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
from octavia.controller.healthmanager.health_drivers import update_base
|
|
||||||
from octavia.tests.unit import base
|
|
||||||
|
|
||||||
|
|
||||||
class TestHealthUpdateBase(base.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.logger = update_base.HealthUpdateBase()
|
|
||||||
|
|
||||||
def test_update_health(self):
|
|
||||||
self.assertRaises(NotImplementedError,
|
|
||||||
self.logger.update_health, {'id': 1}, '192.0.2.1')
|
|
||||||
|
|
||||||
|
|
||||||
class TestStatsUpdateBase(base.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.logger = update_base.StatsUpdateBase()
|
|
||||||
|
|
||||||
def test_update_stats(self):
|
|
||||||
self.assertRaises(NotImplementedError,
|
|
||||||
self.logger.update_stats, {'id': 1}, '192.0.2.1')
|
|
File diff suppressed because it is too large
Load Diff
|
@ -15,30 +15,20 @@
|
||||||
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from octavia.controller.healthmanager.health_drivers import update_logging
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from octavia.common import data_models
|
||||||
|
from octavia.statistics.drivers import logger
|
||||||
from octavia.tests.unit import base
|
from octavia.tests.unit import base
|
||||||
|
|
||||||
|
|
||||||
class TestHealthUpdateLogger(base.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.logger = update_logging.HealthUpdateLogger()
|
|
||||||
|
|
||||||
@mock.patch('octavia.controller.healthmanager.health_drivers'
|
|
||||||
'.update_logging.LOG')
|
|
||||||
def test_update_health(self, mock_log):
|
|
||||||
self.logger.update_health({'id': 1}, '192.0.2.1')
|
|
||||||
self.assertEqual(1, mock_log.info.call_count)
|
|
||||||
|
|
||||||
|
|
||||||
class TestStatsUpdateLogger(base.TestCase):
|
class TestStatsUpdateLogger(base.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
self.logger = update_logging.StatsUpdateLogger()
|
self.logger = logger.StatsLogger()
|
||||||
|
self.amphora_id = uuidutils.generate_uuid()
|
||||||
|
|
||||||
@mock.patch('octavia.controller.healthmanager.health_drivers'
|
@mock.patch('octavia.statistics.drivers.logger.LOG')
|
||||||
'.update_logging.LOG')
|
|
||||||
def test_update_stats(self, mock_log):
|
def test_update_stats(self, mock_log):
|
||||||
self.logger.update_stats({'id': 1}, '192.0.2.1')
|
self.logger.update_stats([data_models.ListenerStatistics()])
|
||||||
self.assertEqual(1, mock_log.info.call_count)
|
self.assertEqual(1, mock_log.info.call_count)
|
|
@ -0,0 +1,78 @@
|
||||||
|
# Copyright 2018 GoDaddy
|
||||||
|
# Copyright (c) 2015 Rackspace
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import random
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from octavia.common import data_models
|
||||||
|
from octavia.statistics.drivers import update_db
|
||||||
|
from octavia.tests.unit import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatsUpdateDb(base.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestStatsUpdateDb, self).setUp()
|
||||||
|
self.amphora_id = uuidutils.generate_uuid()
|
||||||
|
self.listener_id = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
@mock.patch('octavia.db.repositories.ListenerStatisticsRepository')
|
||||||
|
@mock.patch('octavia.db.api.get_session')
|
||||||
|
def test_update_stats(self, mock_get_session, mock_listener_stats_repo):
|
||||||
|
bytes_in1 = random.randrange(1000000000)
|
||||||
|
bytes_out1 = random.randrange(1000000000)
|
||||||
|
active_conns1 = random.randrange(1000000000)
|
||||||
|
total_conns1 = random.randrange(1000000000)
|
||||||
|
request_errors1 = random.randrange(1000000000)
|
||||||
|
stats_1 = data_models.ListenerStatistics(
|
||||||
|
listener_id=self.listener_id,
|
||||||
|
amphora_id=self.amphora_id,
|
||||||
|
bytes_in=bytes_in1,
|
||||||
|
bytes_out=bytes_out1,
|
||||||
|
active_connections=active_conns1,
|
||||||
|
total_connections=total_conns1,
|
||||||
|
request_errors=request_errors1
|
||||||
|
)
|
||||||
|
bytes_in2 = random.randrange(1000000000)
|
||||||
|
bytes_out2 = random.randrange(1000000000)
|
||||||
|
active_conns2 = random.randrange(1000000000)
|
||||||
|
total_conns2 = random.randrange(1000000000)
|
||||||
|
request_errors2 = random.randrange(1000000000)
|
||||||
|
stats_2 = data_models.ListenerStatistics(
|
||||||
|
listener_id=self.listener_id,
|
||||||
|
amphora_id=self.amphora_id,
|
||||||
|
bytes_in=bytes_in2,
|
||||||
|
bytes_out=bytes_out2,
|
||||||
|
active_connections=active_conns2,
|
||||||
|
total_connections=total_conns2,
|
||||||
|
request_errors=request_errors2
|
||||||
|
)
|
||||||
|
|
||||||
|
update_db.StatsUpdateDb().update_stats(
|
||||||
|
[stats_1, stats_2], deltas=False)
|
||||||
|
|
||||||
|
mock_listener_stats_repo().replace.assert_has_calls([
|
||||||
|
mock.call(mock_get_session(), stats_1),
|
||||||
|
mock.call(mock_get_session(), stats_2)
|
||||||
|
])
|
||||||
|
|
||||||
|
update_db.StatsUpdateDb().update_stats(
|
||||||
|
[stats_1, stats_2], deltas=True)
|
||||||
|
|
||||||
|
mock_listener_stats_repo().increment.assert_has_calls([
|
||||||
|
mock.call(mock_get_session(), stats_1),
|
||||||
|
mock.call(mock_get_session(), stats_2)
|
||||||
|
])
|
|
@ -0,0 +1,97 @@
|
||||||
|
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||||
|
# Copyright (c) 2015 Rackspace
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import random
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_config import fixture as oslo_fixture
|
||||||
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from octavia.common import data_models
|
||||||
|
from octavia.statistics import stats_base
|
||||||
|
from octavia.tests.unit import base
|
||||||
|
|
||||||
|
STATS_DRIVERS = ['stats_db', 'stats_logger']
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatsBase(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestStatsBase, self).setUp()
|
||||||
|
|
||||||
|
self.conf = oslo_fixture.Config(cfg.CONF)
|
||||||
|
self.conf.config(group="controller_worker",
|
||||||
|
statistics_drivers=STATS_DRIVERS)
|
||||||
|
self.amphora_id = uuidutils.generate_uuid()
|
||||||
|
self.listener_id = uuidutils.generate_uuid()
|
||||||
|
self.listener_stats = data_models.ListenerStatistics(
|
||||||
|
amphora_id=self.amphora_id,
|
||||||
|
listener_id=self.listener_id,
|
||||||
|
bytes_in=random.randrange(1000000000),
|
||||||
|
bytes_out=random.randrange(1000000000),
|
||||||
|
active_connections=random.randrange(1000000000),
|
||||||
|
total_connections=random.randrange(1000000000),
|
||||||
|
request_errors=random.randrange(1000000000))
|
||||||
|
self.listener_stats_dict = {
|
||||||
|
self.listener_id: {
|
||||||
|
"request_errors": self.listener_stats.request_errors,
|
||||||
|
"active_connections":
|
||||||
|
self.listener_stats.active_connections,
|
||||||
|
"total_connections": self.listener_stats.total_connections,
|
||||||
|
"bytes_in": self.listener_stats.bytes_in,
|
||||||
|
"bytes_out": self.listener_stats.bytes_out,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@mock.patch('octavia.statistics.drivers.update_db.StatsUpdateDb')
|
||||||
|
@mock.patch('octavia.statistics.drivers.logger.StatsLogger')
|
||||||
|
def test_update_stats(self, mock_stats_logger, mock_stats_db):
|
||||||
|
stats_base._STATS_HANDLERS = None
|
||||||
|
|
||||||
|
# Test with update success
|
||||||
|
stats_base.update_stats_via_driver([self.listener_stats], deltas=True)
|
||||||
|
|
||||||
|
mock_stats_db().update_stats.assert_called_once_with(
|
||||||
|
[self.listener_stats], deltas=True)
|
||||||
|
mock_stats_logger().update_stats.assert_called_once_with(
|
||||||
|
[self.listener_stats], deltas=True)
|
||||||
|
|
||||||
|
# Test with update failure (should still run both drivers)
|
||||||
|
mock_stats_db.reset_mock()
|
||||||
|
mock_stats_logger.reset_mock()
|
||||||
|
mock_stats_db().update_stats.side_effect = Exception
|
||||||
|
mock_stats_logger().update_stats.side_effect = Exception
|
||||||
|
stats_base.update_stats_via_driver(
|
||||||
|
[self.listener_stats])
|
||||||
|
|
||||||
|
mock_stats_db().update_stats.assert_called_once_with(
|
||||||
|
[self.listener_stats], deltas=False)
|
||||||
|
mock_stats_logger().update_stats.assert_called_once_with(
|
||||||
|
[self.listener_stats], deltas=False)
|
||||||
|
|
||||||
|
@mock.patch('octavia.statistics.drivers.update_db.StatsUpdateDb')
|
||||||
|
@mock.patch('octavia.statistics.drivers.logger.StatsLogger')
|
||||||
|
def test__get_stats_handlers(self, mock_stats_logger, mock_stats_db):
|
||||||
|
stats_base._STATS_HANDLERS = None
|
||||||
|
|
||||||
|
# Test that this function implements a singleton
|
||||||
|
first_call_handlers = stats_base._get_stats_handlers()
|
||||||
|
second_call_handlers = stats_base._get_stats_handlers()
|
||||||
|
|
||||||
|
self.assertEqual(first_call_handlers, second_call_handlers)
|
||||||
|
|
||||||
|
# Drivers should only load once (this is a singleton)
|
||||||
|
mock_stats_db.assert_called_once_with()
|
||||||
|
mock_stats_logger.assert_called_once_with()
|
|
@ -0,0 +1,17 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Loadbalancer statistics can now be reported to multiple backend locations
|
||||||
|
simply by specifying multiple statistics drivers in config.
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
The internal interface for loadbalancer statistics collection has moved.
|
||||||
|
When upgrading, see deprecation notes for the ``stats_update_driver``
|
||||||
|
config option, as it will need to be moved and renamed.
|
||||||
|
deprecations:
|
||||||
|
- |
|
||||||
|
The option ``health_manager.health_update_driver`` has been deprecated as
|
||||||
|
it was never really used, so the driver layer was removed.
|
||||||
|
The option ``health_manager.stats_update_driver`` was moved and renamed
|
||||||
|
to ``controller_worker.statistics_drivers`` (note it is now plural). It
|
||||||
|
can now contain a list of multiple drivers for handling statistics.
|
|
@ -62,12 +62,9 @@ octavia.api.drivers =
|
||||||
octavia.amphora.drivers =
|
octavia.amphora.drivers =
|
||||||
amphora_noop_driver = octavia.amphorae.drivers.noop_driver.driver:NoopAmphoraLoadBalancerDriver
|
amphora_noop_driver = octavia.amphorae.drivers.noop_driver.driver:NoopAmphoraLoadBalancerDriver
|
||||||
amphora_haproxy_rest_driver = octavia.amphorae.drivers.haproxy.rest_api_driver:HaproxyAmphoraLoadBalancerDriver
|
amphora_haproxy_rest_driver = octavia.amphorae.drivers.haproxy.rest_api_driver:HaproxyAmphoraLoadBalancerDriver
|
||||||
octavia.amphora.health_update_drivers =
|
octavia.statistics.drivers =
|
||||||
health_logger = octavia.controller.healthmanager.health_drivers.update_logging:HealthUpdateLogger
|
stats_logger = octavia.statistics.drivers.logger:StatsLogger
|
||||||
health_db = octavia.controller.healthmanager.health_drivers.update_db:UpdateHealthDb
|
stats_db = octavia.statistics.drivers.update_db:StatsUpdateDb
|
||||||
octavia.amphora.stats_update_drivers =
|
|
||||||
stats_logger = octavia.controller.healthmanager.health_drivers.update_logging:StatsUpdateLogger
|
|
||||||
stats_db = octavia.controller.healthmanager.health_drivers.update_db:UpdateStatsDb
|
|
||||||
octavia.amphora.udp_api_server =
|
octavia.amphora.udp_api_server =
|
||||||
keepalived_lvs = octavia.amphorae.backends.agent.api_server.keepalivedlvs:KeepalivedLvs
|
keepalived_lvs = octavia.amphorae.backends.agent.api_server.keepalivedlvs:KeepalivedLvs
|
||||||
octavia.compute.drivers =
|
octavia.compute.drivers =
|
||||||
|
|
Loading…
Reference in New Issue