Fix maintence mode

Added MaintenceHook to app configuration
Refactored maintence service
Fixed sql queries
Fix maintence controller

Closes-Bug: #2058009
Signed-off-by: Vadim Zelenevsky <wortellen@gmail.com>
Change-Id: Ica41e8208b29aeba6e781801540843d46375c3d1
This commit is contained in:
Vadim Zelenevsky 2024-03-22 22:11:12 +03:00 committed by Zelenevskii Vadim
parent 9a011cb277
commit cadfe9dca2
10 changed files with 156 additions and 124 deletions

View File

@ -21,6 +21,7 @@ import osprofiler.web
import pecan
from mistral.api import access_control
from mistral.api.hooks import maintenance
from mistral import config as m_config
from mistral import context as ctx
from mistral.db.v2 import api as db_api_v2
@ -65,7 +66,8 @@ def setup_app(config=None):
app = pecan.make_app(
app_conf.pop('root'),
hooks=lambda: [ctx.AuthHook(), ctx.ContextHook()],
hooks=lambda: [ctx.AuthHook(), maintenance.MaintenanceHook(),
ctx.ContextHook()],
logging=getattr(config, 'logging', {}),
**app_conf
)

View File

@ -54,6 +54,8 @@ class MaintenanceController(rest.RestController):
def put(self, new_maintenance_status):
context.set_ctx(None)
maintenance.change_maintenance_mode(new_maintenance_status.status)
new_maintenance_status.status = maintenance.change_maintenance_mode(
new_maintenance_status.status
)
return new_maintenance_status

View File

@ -67,7 +67,8 @@ class ActionsController(rest.RestController, hooks.HookController):
# TODO(nmakhotkin): Have a discussion with pecan/WSME folks in order
# to have requests and response of different content types. Then
# delete ContentTypeHook.
__hooks__ = [ct_hook.ContentTypeHook("application/json", ['POST', 'PUT'])]
__hooks__ = [ct_hook.ContentTypeHook("application/json;charset=utf-8",
['POST', 'PUT'])]
validate = validation.SpecValidationController(
spec_parser.get_action_list_spec_from_yaml)

View File

@ -37,8 +37,8 @@ LOG = logging.getLogger(__name__)
class WorkbooksController(rest.RestController, hooks.HookController):
__hooks__ = [ct_hook.ContentTypeHook("application/json", ['POST', 'PUT'])]
__hooks__ = [ct_hook.ContentTypeHook("application/json;charset=utf-8",
['POST', 'PUT'])]
validate = validation.SpecValidationController(
spec_parser.get_workbook_spec_from_yaml
)

View File

@ -44,7 +44,8 @@ class WorkflowsController(rest.RestController, hooks.HookController):
# TODO(nmakhotkin): Have a discussion with pecan/WSME folks in order
# to have requests and response of different content types. Then
# delete ContentTypeHook.
__hooks__ = [ct_hook.ContentTypeHook("application/json", ['POST', 'PUT'])]
__hooks__ = [ct_hook.ContentTypeHook("application/json;charset=utf-8",
['POST', 'PUT'])]
validate = validation.SpecValidationController(
spec_parser.get_workflow_list_spec_from_yaml

View File

@ -10,6 +10,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pecan
from pecan import hooks
@ -19,6 +20,25 @@ from mistral.services import maintenance as maintenance_service
ALLOWED_WITHOUT_AUTH = ['/', '/v2/', '/health', '/maintenance']
def _complete_async_action(maintenance_mode, state):
return (state.request.method == 'PUT' and
'/v2/action_executions' in state.request.path and
maintenance_mode == maintenance_service.PAUSING
)
def _put_method_executions(maintenance_mode, state):
if not (state.request.method == 'PUT' and
'/v2/executions' in state.request.path):
return False
body = json.loads(state.request.body.decode('utf8').replace("'", '"'))
return ('state' in body and body['state'] == 'CANCELLED' and
(maintenance_mode == maintenance_service.PAUSING or
maintenance_mode == maintenance_service.PAUSED
))
class MaintenanceHook(hooks.PecanHook):
def before(self, state):
@ -27,14 +47,10 @@ class MaintenanceHook(hooks.PecanHook):
return
cluster_state = db_api.get_maintenance_status()
is_complete_async_actions = (
state.request.method == 'PUT' and
'/v2/action_executions' in state.request.path and
cluster_state == maintenance_service.PAUSING
)
if is_complete_async_actions or \
cluster_state == maintenance_service.RUNNING:
if (cluster_state == maintenance_service.RUNNING or
_complete_async_action(cluster_state, state) or
_put_method_executions(cluster_state, state)):
return
msg = "Current Mistral state is {}. Method is not allowed".format(

View File

@ -0,0 +1,59 @@
# Copyright 2023 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""recreate mistral metrics table
Revision ID: 044
Revises: 043
Create Date: 2024-03-22 12:02:23.374515
"""
# revision identifiers, used by Alembic.
revision = '044'
down_revision = '043'
from alembic import op
import sqlalchemy as sa
from mistral.services import maintenance
from mistral_lib import utils
def upgrade():
op.drop_table('mistral_metrics')
mistral_metrics_table = op.create_table(
'mistral_metrics',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('value', sa.String(length=255), nullable=False),
sa.UniqueConstraint('name'),
)
op.bulk_insert(mistral_metrics_table,
[
{
'created_at': utils.utc_now_sec(),
'updated_at': utils.utc_now_sec(),
'id': 1,
'name': 'maintenance_status',
'value': maintenance.RUNNING
}
]
)

View File

@ -37,6 +37,7 @@ from mistral.db import utils as m_dbutils
from mistral.db.v2.sqlalchemy import filters as db_filters
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import maintenance as m
from mistral.services import security
from mistral.workflow import states
from mistral_lib import utils
@ -66,7 +67,10 @@ def setup_db():
try:
models.Workbook.metadata.create_all(b.get_engine())
# NOTE(wortellen): We need this in case if we use sqlite3.
# We have to fill this table, but I haven't found any other
# appropriate place where this can be done.
fill_metrics_table()
_initialized = True
except sa.exc.OperationalError as e:
raise exc.DBError("Failed to setup database: %s" % e)
@ -2130,21 +2134,36 @@ def named_lock(name):
delete_named_lock(lock_id)
def get_maintenance_status():
query = b.get_engine().execute('SELECT status FROM mistral_metrics '
'WHERE name = %s', ("maintenance_status",))
rows = query.fetchall()
return rows[0][0] if rows else None
@b.session_aware()
def get_maintenance_status(session=None):
maintenance_entry = (
session.query(models.MistralMetrics)
.filter_by(name="maintenance_status")
.first()
)
return maintenance_entry.value if maintenance_entry else None
def update_maintenance_status(status):
b.get_engine().execute(
"INSERT into mistral_metrics (id, name, value)"
"VALUES (1, %s, %s)"
"ON CONFLICT (name) DO UPDATE"
"SET value = EXCLUDED.value",
("maintenance_status", status,))
@b.session_aware()
def update_maintenance_status(status, session=None):
maintenance_entry = session.query(models.MistralMetrics).get(1)
maintenance_entry.value = status
session.commit()
@b.session_aware()
def fill_metrics_table(session=None):
maintenance_entry = session.query(models.MistralMetrics).get(1)
if not maintenance_entry:
maintenance = models.MistralMetrics()
maintenance.update(
{
'id': 1,
'name': 'maintenance_status',
'value': m.RUNNING
}
)
maintenance.save(session=session)
@b.session_aware()

View File

@ -220,6 +220,18 @@ class CodeSource(mb.MistralSecureModelBase):
tags = sa.Column(st.JsonListType())
class MistralMetrics(mb.MistralModelBase):
"""Table for maintenance mode."""
__tablename__ = 'mistral_metrics'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(255), nullable=False)
value = sa.Column(sa.String(255), nullable=False)
class DynamicActionDefinition(mb.MistralSecureModelBase):
"""Contains info about registered Dynamic Actions."""

View File

@ -40,41 +40,23 @@ _ALL_STATES = [
LOG = logging.getLogger(__name__)
_PAUSE_EXECUTIONS_PATH = 'mistral.services.maintenance._pause_executions'
_PAUSE_EXECUTION_PATH = 'mistral.services.maintenance._pause_execution'
_RESUME_EXECUTIONS_PATH = 'mistral.services.maintenance._resume_executions'
_RESUME_EXECUTION_PATH = 'mistral.services.maintenance._resume_execution'
_AWAIT_PAUSE_EXECUTION_PATH = \
'mistral.services.maintenance.await_pause_executions'
def is_valid_transition(old_state, new_state):
return new_state in _VALID_TRANSITIONS.get(old_state, [])
def pause_running_executions(skip_tx=False):
def pause_running_executions():
execution_ids = [(ex.id, ex.project_id) for ex in
db_api.get_workflow_executions(state=states.RUNNING,
insecure=True)]
LOG.info("Number of find workflow executions is {}",
len(execution_ids))
if skip_tx:
sched = sched_base.get_system_scheduler()
for wf_ex_id, project_id in execution_ids:
job = sched_base.SchedulerJob(
func_name=_PAUSE_EXECUTION_PATH,
func_args={
'wf_ex_id': wf_ex_id,
'project_id': project_id
}
)
sched.schedule(job)
return
LOG.info("Number of find workflow executions is %s", len(execution_ids))
for wf_ex_id, project_id in execution_ids:
try:
with db_api.transaction(skip=skip_tx):
with db_api.transaction():
_pause_execution(wf_ex_id, project_id)
except BaseException as e:
LOG.error(str(e))
@ -82,11 +64,12 @@ def pause_running_executions(skip_tx=False):
return True
def _pause_execution(wf_ex_id, project_id, skip_tx=False):
def _pause_execution(wf_ex_id, project_id):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
auth_token=None,
project_id=project_id,
is_admin=True
)
)
@ -98,24 +81,12 @@ def _pause_execution(wf_ex_id, project_id, skip_tx=False):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=project_id,
trace_uuid=trace_uuid
)
)
if states.is_running(wf_ex.state):
workflow_handler.pause_workflow(wf_ex)
LOG.info('Execution {} was paused', wf_ex_id)
LOG.info('Execution %s was paused', wf_ex_id)
def await_pause_executions(skip_tx=False):
def await_pause_executions():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
@ -124,36 +95,8 @@ def await_pause_executions(skip_tx=False):
)
)
if skip_tx:
current_state = db_api.get_maintenance_status()
if current_state != PAUSING:
return False
tasks = db_api.get_task_executions(
state=states.RUNNING, insecure=True
)
if not tasks:
if db_api.get_maintenance_status() == PAUSING:
db_api.update_maintenance_status(PAUSED)
return
LOG.info('The following tasks have RUNNING state: {}', [
task.id for task in tasks
])
sched = sched_base.get_system_scheduler()
job = sched_base.SchedulerJob(
run_after=1,
func_name=_AWAIT_PAUSE_EXECUTION_PATH,
func_args={'skip_tx': True}
)
sched.schedule(job)
return
while True:
with db_api.transaction(skip=skip_tx):
with db_api.transaction():
current_state = db_api.get_maintenance_status()
if current_state != PAUSING:
@ -166,10 +109,8 @@ def await_pause_executions(skip_tx=False):
if not tasks:
return True
LOG.info('The following tasks have RUNNING state: {}', [
task.id for task in tasks
])
LOG.info('The following tasks have RUNNING state: %s',
[task.id for task in tasks])
eventlet.sleep(1)
@ -210,7 +151,7 @@ def change_maintenance_mode(new_state):
@post_tx_queue.run
def _pause_executions(skip_tx=False):
def _pause_executions():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
@ -219,11 +160,6 @@ def _pause_executions(skip_tx=False):
)
)
if skip_tx:
pause_running_executions(skip_tx)
await_pause_executions(skip_tx)
return
if pause_running_executions() and await_pause_executions():
with db_api.transaction():
if db_api.get_maintenance_status() == PAUSING:
@ -231,7 +167,7 @@ def _pause_executions(skip_tx=False):
@post_tx_queue.run
def _resume_executions(skip_tx=False):
def _resume_executions():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
@ -239,9 +175,8 @@ def _resume_executions(skip_tx=False):
is_admin=True
)
)
sched = sched_base.get_system_scheduler()
with db_api.transaction(skip=skip_tx):
with db_api.transaction():
current_state = db_api.get_maintenance_status()
if current_state != RUNNING:
@ -255,33 +190,18 @@ def _resume_executions(skip_tx=False):
return
for ex in paused_executions:
if skip_tx:
job = sched_base.SchedulerJob(
func_name=_RESUME_EXECUTION_PATH,
func_args={
'wf_ex_id': ex.id
}
)
sched.schedule(job)
else:
_resume_execution(wf_ex_id=ex.id)
_resume_execution(wf_ex_id=ex.id)
def _resume_execution(wf_ex_id, skip_tx=False):
def _resume_execution(wf_ex_id):
wf_ex = db_api.get_workflow_execution(wf_ex_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=wf_ex.project_id,
trace_uuid=trace_uuid
project_id=wf_ex.project_id
)
)
workflow_handler.resume_workflow(wf_ex)
LOG.info('The following execution was resumed: {}', [wf_ex.id])
LOG.info('The following execution was resumed: %s', [wf_ex.id])