Merge "Limit spares pool to the spare_amphora_pool_size"

This commit is contained in:
Zuul 2019-03-20 21:31:54 +00:00 committed by Gerrit Code Review
commit a635dd6bc9
6 changed files with 108 additions and 18 deletions

View File

@ -17,6 +17,7 @@ import datetime
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy.orm import exc as sqlalchemy_exceptions
from octavia.controller.worker import controller_worker as cw
@ -30,6 +31,7 @@ CONF = cfg.CONF
class SpareAmphora(object):
def __init__(self):
self.amp_repo = repo.AmphoraRepository()
self.spares_repo = repo.SparesPoolRepository()
self.cw = cw.ControllerWorker()
def spare_check(self):
@ -37,26 +39,42 @@ class SpareAmphora(object):
If it's less than the requirement, starts new amphora.
"""
lock_session = db_api.get_session(autocommit=False)
session = db_api.get_session()
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
diff_count = conf_spare_cnt - curr_spare_cnt
try:
# Lock the spares_pool record for read and write
spare_amp_row = self.spares_repo.get_for_update(lock_session)
# When the current spare amphora is less than required
if diff_count > 0:
LOG.info("Initiating creation of %d spare amphora.", diff_count)
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(session)
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
LOG.debug("Current Spare Amphora count : %d", curr_spare_cnt)
diff_count = conf_spare_cnt - curr_spare_cnt
# Call Amphora Create Flow diff_count times
with futures.ThreadPoolExecutor(
max_workers=CONF.house_keeping.spare_amphora_pool_size
) as executor:
for i in range(1, diff_count + 1):
LOG.debug("Starting amphorae number %d ...", i)
executor.submit(self.cw.create_amphora)
else:
LOG.debug("Current spare amphora count satisfies the requirement")
# When the current spare amphora is less than required
amp_booting = []
if diff_count > 0:
LOG.info("Initiating creation of %d spare amphora.",
diff_count)
# Call Amphora Create Flow diff_count times
with futures.ThreadPoolExecutor(
max_workers=CONF.house_keeping.spare_amphora_pool_size
) as executor:
for i in range(1, diff_count + 1):
LOG.debug("Starting amphorae number %d ...", i)
amp_booting.append(
executor.submit(self.cw.create_amphora))
else:
LOG.debug("Current spare amphora count satisfies the "
"requirement")
# Wait for the amphora boot threads to finish
futures.wait(amp_booting)
spare_amp_row.updated_at = timeutils.utcnow()
lock_session.commit()
except Exception:
lock_session.rollback()
class DatabaseCleanup(object):

View File

@ -0,0 +1,35 @@
# Copyright 2019 Michael Johnson
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Spares pool table
Revision ID: 6ffc710674ef
Revises: 7432f1d4ea83
Create Date: 2019-03-11 10:45:43.296236
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ffc710674ef'
down_revision = '7432f1d4ea83'
def upgrade():
op.create_table(
u'spares_pool',
sa.Column(u'updated_at', sa.DateTime(), nullable=True,
server_default=sa.func.current_timestamp()))

View File

@ -782,3 +782,10 @@ class ClientAuthenticationMode(base_models.BASE):
__tablename__ = "client_authentication_mode"
name = sa.Column(sa.String(10), primary_key=True, nullable=False)
class SparesPool(base_models.BASE):
__tablename__ = "spares_pool"
updated_at = sa.Column(sa.DateTime, primary_key=True, nullable=True)

View File

@ -226,6 +226,7 @@ class Repositories(object):
self.quotas = QuotasRepository()
self.flavor = FlavorRepository()
self.flavor_profile = FlavorProfileRepository()
self.spares_pool = SparesPoolRepository()
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict):
"""Inserts load balancer and vip entities into the database.
@ -1785,3 +1786,17 @@ class FlavorRepository(BaseRepository):
class FlavorProfileRepository(BaseRepository):
model_class = models.FlavorProfile
class SparesPoolRepository(BaseRepository):
model_class = models.SparesPool
def get_for_update(self, lock_session):
"""Queries and locks the SparesPool record.
This call will query for the SparesPool table record and lock it
so that other processes cannot read or write it.
:returns: expected_spares_count, updated_at
"""
row = lock_session.query(models.SparesPool).with_for_update().one()
return row

View File

@ -119,7 +119,7 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'listener_stats', 'amphora', 'sni',
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
'amp_build_slots', 'amp_build_req', 'quotas',
'flavor', 'flavor_profile')
'flavor', 'flavor_profile', 'spares_pool')
for repo_attr in repo_attr_names:
single_repo = getattr(self.repos, repo_attr, None)
message = ("Class Repositories should have %s instance"

View File

@ -83,6 +83,21 @@ class TestSpareCheck(base.TestCase):
self.assertEqual(0, DIFF_CNT)
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
@mock.patch('octavia.db.repositories.SparesPoolRepository.get_for_update')
@mock.patch('octavia.db.api.get_session')
def test_spare_check_rollback(self, mock_session, mock_update):
"""When spare amphora count meets the requirement."""
lock_session = mock.MagicMock()
session = mock.MagicMock()
mock_session.side_effect = [lock_session, session]
mock_update.side_effect = [Exception('boom')]
# self.CONF.config(group="house_keeping",
# spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
# self.amp_repo.get_spare_amphora_count.return_value = (
# self.FAKE_CUR_SPAR2)
self.spare_amp.spare_check()
lock_session.rollback.assert_called_once()
class TestDatabaseCleanup(base.TestCase):
FAKE_IP = "10.0.0.1"