Merge "Replace use of LegacyEngineFacade and adopt to werkzeug>3.0"

This commit is contained in:
Zuul 2024-03-30 11:22:07 +00:00 committed by Gerrit Code Review
commit 8d7da8b33f
8 changed files with 334 additions and 244 deletions

View File

@ -12,7 +12,7 @@
- vitrage-tempest-plugin-api-ipv6-only
- vitrage-tempest-plugin-datasources:
voting: false
- vitrage-grenade
# - vitrage-grenade
- openstack-tox-py38
- openstack-tox-py310
@ -22,7 +22,7 @@
- vitrage-tempest-plugin-api-ipv6-only
- vitrage-tempest-plugin-datasources:
voting: false
- vitrage-grenade
# - vitrage-grenade
- openstack-tox-py38
- openstack-tox-py310

View File

@ -109,7 +109,7 @@ class TopologyController(RootRestController):
def as_tree(graph, root=OPENSTACK_CLUSTER, reverse=False):
if nx.__version__ >= '2.0':
linked_graph = json_graph.node_link_graph(
graph, attrs={'name': 'graph_index'})
graph, name='graph_index')
else:
linked_graph = json_graph.node_link_graph(graph)
if 0 == nx.number_of_nodes(linked_graph):

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import werkzeug.http
import werkzeug.datastructures
from http import client as httplib
from keystoneauth1.identity.generic import password
@ -115,7 +115,7 @@ class BasicAndKeystoneAuth(AuthProtocol):
@staticmethod
def _get_basic_authenticator(req):
auth = werkzeug.http.parse_authorization_header(
auth = werkzeug.datastructures.Authorization.from_header(
req.headers.get("Authorization"))
return auth

View File

@ -13,14 +13,17 @@
# under the License.
from oslo_config import cfg
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log
from stevedore import driver
import tenacity
import threading
from urllib import parse as urlparse
from vitrage.utils.datetime import utcnow
_NAMESPACE = 'vitrage.storage'
_CONTEXT = threading.local()
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -52,8 +55,8 @@ def get_connection_from_config():
def _get_connection():
"""Return an open connection to the database."""
conn = mgr.driver(url)
session = conn._engine_facade.get_session()
session.execute('SELECT 1;')
with enginefacade.reader.using(_CONTEXT) as session:
session.execute('SELECT 1;')
return conn
return _get_connection()

View File

@ -52,10 +52,6 @@ class Connection(object, metaclass=abc.ABCMeta):
def changes(self):
return None
@abc.abstractmethod
def disconnect(self):
raise NotImplementedError('disconnect is not implemented')
@abc.abstractmethod
def clear(self):
raise NotImplementedError('clear is not implemented')

View File

@ -16,7 +16,9 @@ import pytz
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy import or_
import threading
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log
from oslo_utils import timeutils
@ -36,10 +38,16 @@ LIMIT = 10000
ASC = 'asc'
DESC = 'desc'
_CONTEXT = threading.local()
def _session_for_read():
session = enginefacade.reader.using(_CONTEXT)
return session
class HistoryFacadeConnection(object):
def __init__(self, engine_facade, alarms, edges, changes):
self._engine_facade = engine_facade
def __init__(self, alarms, edges, changes):
self._alarms = alarms
self._edges = edges
self._changes = changes
@ -59,28 +67,29 @@ class HistoryFacadeConnection(object):
def count_active_alarms(self, project_id=None, is_admin_project=False):
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
query = query.filter(models.Alarm.end_timestamp > db_time())
query = self._add_project_filtering_to_query(
query, project_id, is_admin_project)
with _session_for_read() as session:
query = session.query(models.Alarm)
query = query.filter(models.Alarm.end_timestamp > db_time())
query = self._add_project_filtering_to_query(
query, project_id, is_admin_project)
query_severe = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.SEVERE)
query_critical = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.CRITICAL)
query_warning = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.WARNING)
query_ok = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.OK)
query_na = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.NA)
query_severe = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.SEVERE)
query_critical = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.CRITICAL
)
query_warning = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.WARNING)
query_ok = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.OK)
query_na = query.filter(
models.Alarm.vitrage_operational_severity == OSeverity.NA)
counts = {OSeverity.SEVERE: query_severe.count(),
OSeverity.CRITICAL: query_critical.count(),
OSeverity.WARNING: query_warning.count(),
OSeverity.OK: query_ok.count(),
OSeverity.NA: query_na.count()}
counts = {OSeverity.SEVERE: query_severe.count(),
OSeverity.CRITICAL: query_critical.count(),
OSeverity.WARNING: query_warning.count(),
OSeverity.OK: query_ok.count(),
OSeverity.NA: query_na.count()}
return counts
@ -199,37 +208,38 @@ class HistoryFacadeConnection(object):
project_id=None or resource_project_id=None
"""
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
query = self._add_project_filtering_to_query(
query, project_id, is_admin_project)
with _session_for_read() as session:
query = session.query(models.Alarm)
query = self._add_project_filtering_to_query(
query, project_id, is_admin_project)
self.assert_args(start, end, filter_by, filter_vals,
only_active_alarms, sort_dirs)
self.assert_args(start, end, filter_by, filter_vals,
only_active_alarms, sort_dirs)
if only_active_alarms:
query = query.filter(models.Alarm.end_timestamp > db_time())
elif (start and end) or start:
query = self._add_time_frame_to_query(query, start, end)
if only_active_alarms:
query = query.filter(models.Alarm.end_timestamp > db_time())
elif (start and end) or start:
query = self._add_time_frame_to_query(query, start, end)
query = self._add_filtering_to_query(query, filter_by, filter_vals)
query = self._add_filtering_to_query(query, filter_by, filter_vals)
if limit:
query = self._generate_alarms_paginate_query(query,
limit,
sort_by,
sort_dirs,
next_page,
marker)
elif limit == 0:
sort_dir_func = {
ASC: sqlalchemy.asc,
DESC: sqlalchemy.desc,
}
for i in range(len(sort_by)):
query.order_by(sort_dir_func[sort_dirs[i]](
getattr(models.Alarm, sort_by[i])))
return query.all()
if limit:
query = self._generate_alarms_paginate_query(query,
limit,
sort_by,
sort_dirs,
next_page,
marker)
elif limit == 0:
sort_dir_func = {
ASC: sqlalchemy.asc,
DESC: sqlalchemy.desc,
}
for i in range(len(sort_by)):
query.order_by(sort_dir_func[sort_dirs[i]](
getattr(models.Alarm, sort_by[i])))
all_results = query.all()
return all_results
@staticmethod
def assert_args(start,
@ -322,10 +332,10 @@ class HistoryFacadeConnection(object):
limit = min(int(limit), LIMIT)
if marker:
session = self._engine_facade.get_session()
marker = session.query(models.Alarm). \
filter(models.Alarm.vitrage_id ==
marker).first()
with _session_for_read() as session:
marker = session.query(models.Alarm). \
filter(models.Alarm.vitrage_id ==
marker).first()
if HProps.VITRAGE_ID not in sort_by:
sort_by.append(HProps.VITRAGE_ID)
@ -394,15 +404,15 @@ class HistoryFacadeConnection(object):
def _rca_edges(self, filter_by, a_ids, proj_id, admin):
alarm_ids = [str(alarm) for alarm in a_ids]
session = self._engine_facade.get_session()
query = session.query(models.Edge)\
.filter(and_(getattr(models.Edge, filter_by).in_(alarm_ids),
models.Edge.label == ELabel.CAUSES))
with _session_for_read() as session:
query = session.query(models.Edge)\
.filter(and_(getattr(models.Edge, filter_by).in_(alarm_ids),
models.Edge.label == ELabel.CAUSES))
query = query.join(models.Edge.target)
query = self._add_project_filtering_to_query(query, proj_id, admin)
return query.all()
query = query.join(models.Edge.target)
query = self._add_project_filtering_to_query(query, proj_id, admin)
all_results = query.all()
return all_results
def _out_rca(self, sources, proj_id, admin):
return self._rca_edges(HProps.SOURCE_ID, sources, proj_id, admin)

View File

@ -12,12 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import threading
from oslo_config import cfg
from oslo_db.sqlalchemy import session as db_session
from oslo_db import api as oslo_db_api
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log
from sqlalchemy import and_, or_
from sqlalchemy.engine import url as sqlalchemy_url
from sqlalchemy import exc as sa_exc
from sqlalchemy import func
import tenacity
from vitrage.common.exception import VitrageInputError
from vitrage.entity_graph.mappings.operational_alarm_severity import \
@ -29,35 +36,78 @@ from vitrage.storage.sqlalchemy import models
from vitrage.storage.sqlalchemy.models import Template
CONF = cfg.CONF
DB_CONFIGURED = False
LOG = log.getLogger(__name__)
_CONTEXT = threading.local()
def _session_for_read():
session = enginefacade.reader.using(_CONTEXT)
return session
def _session_for_write():
session = enginefacade.writer.using(_CONTEXT)
return session
def wrap_sqlite_retry(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if ('sqlite' not in CONF.database.connection.lower()):
return f(*args, **kwargs)
else:
for attempt in tenacity.Retrying(
retry=(
tenacity.retry_if_exception_type(
sa_exc.OperationalError)
& tenacity.retry_if_exception(
lambda e: 'database is locked' in str(e))
),
wait=tenacity.wait_random(
min=0.1,
max=1,
),
before_sleep=(
tenacity.before_sleep_log(LOG, logging.DEBUG)
),
stop=tenacity.stop_after_delay(max_delay=10),
reraise=False
):
with attempt:
return f(*args, **kwargs)
return wrapper
class Connection(base.Connection):
def __init__(self, url):
options = dict(CONF.database.items())
# set retries to 0 , since reconnection is already implemented
# in storage.__init__.get_connection_from_config function
options['max_retries'] = 0
# add vitrage opts to database group
for opt in storage.OPTS:
options.pop(opt.name, None)
self._engine_facade = db_session.EngineFacade(self._dress_url(url),
autocommit=True,
**options)
self._active_actions = ActiveActionsConnection(self._engine_facade)
self._events = EventsConnection(self._engine_facade)
self._templates = TemplatesConnection(self._engine_facade)
self._graph_snapshots = GraphSnapshotsConnection(self._engine_facade)
self._webhooks = WebhooksConnection(
self._engine_facade)
self._alarms = AlarmsConnection(
self._engine_facade)
self._edges = EdgesConnection(
self._engine_facade)
self._changes = ChangesConnection(
self._engine_facade)
global DB_CONFIGURED
if not DB_CONFIGURED:
options = dict(CONF.database.items())
options['connection'] = self._dress_url(url)
# set retries to 0 , since reconnection is already implemented
# in storage.__init__.get_connection_from_config function
options['max_retries'] = 0
# add vitrage opts to database group
for opt in storage.OPTS:
options.pop(opt.name, None)
enginefacade.configure(**options)
DB_CONFIGURED = True
self._active_actions = ActiveActionsConnection()
self._events = EventsConnection()
self._templates = TemplatesConnection()
self._graph_snapshots = GraphSnapshotsConnection()
self._webhooks = WebhooksConnection()
self._alarms = AlarmsConnection()
self._edges = EdgesConnection()
self._changes = ChangesConnection()
self._history_facade = HistoryFacadeConnection(
self._engine_facade, self._alarms, self._edges, self._changes)
self._alarms, self._edges, self._changes)
@property
def webhooks(self):
@ -104,50 +154,46 @@ class Connection(base.Connection):
return str(url)
return url
def disconnect(self):
self._engine_facade.get_engine().dispose()
def clear(self):
engine = self._engine_facade.get_engine()
engine = enginefacade.writer.get_engine()
for table in reversed(models.Base.metadata.sorted_tables):
engine.execute(table.delete())
engine.dispose()
class BaseTableConn(object):
def __init__(self, engine_facade):
super(BaseTableConn, self).__init__()
self._engine_facade = engine_facade
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def bulk_create(self, items):
if not items:
return
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.bulk_save_objects(items)
def query_filter(self, model, **kwargs):
session = self._engine_facade.get_session()
query = session.query(model)
for keyword, arg in kwargs.items():
if arg is not None:
query = query.filter(getattr(model, keyword) == arg)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def query_filter(self, model, action, **kwargs):
with _session_for_write() as session:
query = session.query(model)
for keyword, arg in kwargs.items():
if arg is not None:
query = query.filter(getattr(model, keyword) == arg)
query = getattr(query, action)()
return query
class TemplatesConnection(base.TemplatesConnection, BaseTableConn):
def __init__(self, engine_facade):
super(TemplatesConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, template):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(template)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, uuid, var, value):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.query(Template).filter_by(uuid=uuid).update({var: value})
def query(self, name=None, file_content=None,
@ -155,6 +201,7 @@ class TemplatesConnection(base.TemplatesConnection, BaseTableConn):
template_type=None):
query = self.query_filter(
models.Template,
'all',
name=name,
file_content=file_content,
uuid=uuid,
@ -162,41 +209,43 @@ class TemplatesConnection(base.TemplatesConnection, BaseTableConn):
status_details=status_details,
template_type=template_type,
)
return query.all()
return query
@wrap_sqlite_retry
def query_with_status_not(self, name, status):
session = self._engine_facade.get_session()
query = session.query(models.Template)
query = query.filter(
and_
(
models.Template.status != status,
models.Template.name == name
with _session_for_read() as session:
query = session.query(models.Template)
query = query.filter(
and_
(
models.Template.status != status,
models.Template.name == name
)
)
)
return query.first()
result = query.first()
return result
def delete(self, name=None, uuid=None):
query = self.query_filter(
models.Template,
'delete',
name=name,
uuid=uuid,
)
return query.delete()
return query
class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
def __init__(self, engine_facade):
super(ActiveActionsConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, active_action):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(active_action)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, active_action):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.merge(active_action)
def query(self,
@ -209,6 +258,7 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
trigger=None):
query = self.query_filter(
models.ActiveAction,
'all',
action_type=action_type,
extra_info=extra_info,
source_vertex_id=source_vertex_id,
@ -216,22 +266,24 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
action_id=action_id,
score=score,
trigger=trigger)
return query.all()
return query
@wrap_sqlite_retry
def query_similar(self, actions):
"""Query DB for all actions with same properties"""
session = self._engine_facade.get_session()
query = session.query(models.ActiveAction)
with _session_for_read() as session:
query = session.query(models.ActiveAction)
filters = []
for source, target, extra_info, action_type in actions:
filters.append(
and_(models.ActiveAction.action_type == action_type,
models.ActiveAction.extra_info == extra_info,
models.ActiveAction.source_vertex_id == source,
models.ActiveAction.target_vertex_id == target,))
query = query.filter(or_(*filters))
return query.all()
filters = []
for source, target, extra_info, action_type in actions:
filters.append(
and_(models.ActiveAction.action_type == action_type,
models.ActiveAction.extra_info == extra_info,
models.ActiveAction.source_vertex_id == source,
models.ActiveAction.target_vertex_id == target,))
query = query.filter(or_(*filters))
result = query.all()
return result
def delete(self,
action_type=None,
@ -243,6 +295,7 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
trigger=None):
query = self.query_filter(
models.ActiveAction,
'delete',
action_type=action_type,
extra_info=extra_info,
source_vertex_id=source_vertex_id,
@ -250,31 +303,32 @@ class ActiveActionsConnection(base.ActiveActionsConnection, BaseTableConn):
action_id=action_id,
score=score,
trigger=trigger)
return query.delete()
return query
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def bulk_delete(self, actions):
if not actions:
return
session = self._engine_facade.get_session()
query = session.query(models.ActiveAction)
with _session_for_write() as session:
query = session.query(models.ActiveAction)
filters = []
for trigger, action_id in actions:
filters.append(
and_(models.ActiveAction.trigger == trigger,
models.ActiveAction.action_id == action_id))
query = query.filter(or_(*filters))
return query.delete()
filters = []
for trigger, action_id in actions:
filters.append(
and_(models.ActiveAction.trigger == trigger,
models.ActiveAction.action_id == action_id))
query = query.filter(or_(*filters))
result = query.delete()
return result
class WebhooksConnection(base.WebhooksConnection,
BaseTableConn):
def __init__(self, engine_facade):
super(WebhooksConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, webhook):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(webhook)
def query(self,
@ -286,47 +340,51 @@ class WebhooksConnection(base.WebhooksConnection,
regex_filter=None):
query = self.query_filter(
models.Webhooks,
'all',
id=id,
project_id=project_id,
is_admin_webhook=is_admin_webhook,
url=url,
headers=headers,
regex_filter=regex_filter)
return query.all()
return query
def delete(self, id=None):
query = self.query_filter(models.Webhooks, id=id)
return query.delete()
query = self.query_filter(models.Webhooks, 'delete', id=id)
return query
class EventsConnection(base.EventsConnection, BaseTableConn):
def __init__(self, engine_facade):
super(EventsConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, event):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(event)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, event):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.merge(event)
@wrap_sqlite_retry
def get_last_event_id(self):
session = self._engine_facade.get_session()
query = session.query(models.Event.event_id)
return query.order_by(models.Event.event_id.desc()).first()
with _session_for_read() as session:
query = session.query(models.Event.event_id)
result = query.order_by(models.Event.event_id.desc()).first()
return result
@wrap_sqlite_retry
def get_replay_events(self, event_id):
"""Get all events that occurred after the specified event_id
:rtype: list of vitrage.storage.sqlalchemy.models.Event
"""
session = self._engine_facade.get_session()
query = session.query(models.Event)
query = query.filter(models.Event.event_id > event_id)
return query.order_by(models.Event.event_id.asc()).all()
with _session_for_read() as session:
query = session.query(models.Event)
query = query.filter(models.Event.event_id > event_id)
result = query.order_by(models.Event.event_id.asc()).all()
return result
def query(self,
event_id=None,
@ -371,73 +429,85 @@ class EventsConnection(base.EventsConnection, BaseTableConn):
lt_collector_timestamp)
return query
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def delete(self, event_id=None):
"""Delete all events older than event_id"""
session = self._engine_facade.get_session()
query = session.query(models.Event)
if event_id:
query = query.filter(models.Event.event_id < event_id)
query.delete()
with _session_for_write() as session:
query = session.query(models.Event)
if event_id:
query = query.filter(models.Event.event_id < event_id)
query.delete()
class GraphSnapshotsConnection(base.GraphSnapshotsConnection, BaseTableConn):
def __init__(self, engine_facade):
super(GraphSnapshotsConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, graph_snapshot):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(graph_snapshot)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, graph_snapshot):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.merge(graph_snapshot)
@wrap_sqlite_retry
def query(self):
query = self.query_filter(models.GraphSnapshot)
return query.first()
with _session_for_read() as session:
query = session.query(models.GraphSnapshot)
result = query.first()
return result
@wrap_sqlite_retry
def query_snapshot_event_id(self):
"""Select the event_id of the stored snapshot"""
session = self._engine_facade.get_session()
query = session.query(models.GraphSnapshot.event_id)
result = query.first()
with _session_for_read() as session:
query = session.query(models.GraphSnapshot.event_id)
result = query.first()
return result[0] if result else None
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def delete(self):
"""Delete all graph snapshots"""
query = self.query_filter(models.GraphSnapshot)
query.delete()
with _session_for_write() as session:
query = session.query(models.GraphSnapshot)
query.delete()
class AlarmsConnection(base.AlarmsConnection, BaseTableConn):
def __init__(self, engine_facade):
super(AlarmsConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, alarm):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(alarm)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, vitrage_id, key, val):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
query = session.query(models.Alarm).filter(
models.Alarm.vitrage_id == vitrage_id)
query.update({getattr(models.Alarm, key): val})
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def end_all_alarms(self, end_time):
session = self._engine_facade.get_session()
query = session.query(models.Alarm).filter(
models.Alarm.end_timestamp > end_time)
query.update({models.Alarm.end_timestamp: end_time})
with _session_for_write() as session:
query = session.query(models.Alarm).filter(
models.Alarm.end_timestamp > end_time)
query.update({models.Alarm.end_timestamp: end_time})
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def delete_expired(self, expire_by=None):
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
query = query.filter(models.Alarm.end_timestamp < expire_by)
return query.delete()
with _session_for_write() as session:
query = session.query(models.Alarm)
query = query.filter(models.Alarm.end_timestamp < expire_by)
del_query = query.delete()
return del_query
def delete(self,
vitrage_id=None,
@ -445,47 +515,51 @@ class AlarmsConnection(base.AlarmsConnection, BaseTableConn):
end_timestamp=None):
query = self.query_filter(
models.Alarm,
'delete',
vitrage_id=vitrage_id,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp)
return query.delete()
return query
class EdgesConnection(base.EdgesConnection, BaseTableConn):
def __init__(self, engine_facade):
super(EdgesConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, edge):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(edge)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def update(self, source_id, target_id, end_timestamp):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
query = session.query(models.Edge).filter(and_(
models.Edge.source_id == source_id,
models.Edge.target_id == target_id))
query.update({models.Edge.end_timestamp: end_timestamp})
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def end_all_edges(self, end_time):
session = self._engine_facade.get_session()
query = session.query(models.Edge).filter(
models.Edge.end_timestamp > end_time)
query.update({models.Edge.end_timestamp: end_time})
with _session_for_write() as session:
query = session.query(models.Edge).filter(
models.Edge.end_timestamp > end_time)
query.update({models.Edge.end_timestamp: end_time})
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def delete(self):
query = self.query_filter(models.Edge)
return query.delete()
with _session_for_write() as session:
query = session.query(models.Edge)
result = query.delete()
return result
class ChangesConnection(base.ChangesConnection, BaseTableConn):
def __init__(self, engine_facade):
super(ChangesConnection, self).__init__(engine_facade)
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def create(self, change):
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.add(change)
def add_end_changes(self, vitrage_ids, end_time):
@ -499,15 +573,16 @@ class ChangesConnection(base.ChangesConnection, BaseTableConn):
payload=change.payload)
self.create(change_row)
@wrap_sqlite_retry
def _get_alarms_last_change(self, alarm_ids):
session = self._engine_facade.get_session()
query = session.query(func.max(models.Change.timestamp),
models.Change.vitrage_id,
models.Change.payload).\
filter(models.Change.vitrage_id.in_(alarm_ids)).\
group_by(models.Change.vitrage_id)
with _session_for_read() as session:
query = session.query(func.max(models.Change.timestamp),
models.Change.vitrage_id,
models.Change.payload).\
filter(models.Change.vitrage_id.in_(alarm_ids)).\
group_by(models.Change.vitrage_id)
rows = query.all()
rows = query.all()
result = {}
for change in rows:
@ -515,6 +590,10 @@ class ChangesConnection(base.ChangesConnection, BaseTableConn):
return result
@wrap_sqlite_retry
@oslo_db_api.retry_on_deadlock
def delete(self):
query = self.query_filter(models.Change)
return query.delete()
with _session_for_write() as session:
query = session.query(models.Change)
result = query.delete()
return result

View File

@ -15,6 +15,8 @@ import os
import sys
import yaml
from oslo_db.sqlalchemy import enginefacade
from vitrage.common.constants import TemplateStatus
from vitrage.common.constants import TemplateTypes as TType
from vitrage.evaluator.template_db.template_repository import \
@ -30,7 +32,7 @@ class TestConfiguration(object):
sys.version_info[0])
self.config(group='database', connection=db_name)
self._db = storage.get_connection_from_config()
engine = self._db._engine_facade.get_engine()
engine = enginefacade.writer.get_engine()
models.Base.metadata.drop_all(engine)
models.Base.metadata.create_all(engine)
return self._db