Merge "Add journal and maintenance skeleton"

This commit is contained in:
Jenkins 2017-06-13 06:59:00 +00:00 committed by Gerrit Code Review
commit 3f6da12f13
10 changed files with 594 additions and 1 deletions

141
networking_ovn/db/db.py Normal file
View File

@ -0,0 +1,141 @@
# Copyright (c) 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from neutron.db import api as db_api
from oslo_db import api as oslo_db_api
from sqlalchemy import asc
from sqlalchemy import func
from networking_ovn.db import models
from networking_ovn.journal import constants as journal_const
#
# Journal functions
#
# Retry deadlock exception for Galera DB.
# If two (or more) different threads call this method at the same time, they
# might both succeed in changing the same row to pending, but at least one
# of them will get a deadlock from Galera and will have to retry the operation.
@db_api.retry_db_errors
def get_oldest_pending_db_row_with_lock(session):
with session.begin():
row = session.query(models.OVNJournal).filter_by(
state=journal_const.PENDING).order_by(
asc(models.OVNJournal.last_retried)).with_for_update(
).first()
if row:
update_db_row_state(session, row, journal_const.PROCESSING)
return row
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
def update_db_row_state(session, row, state):
row.state = state
session.merge(row)
session.flush()
def update_pending_db_row_retry(session, row, retry_count):
if row.retry_count >= retry_count:
update_db_row_state(session, row, journal_const.FAILED)
else:
row.retry_count += 1
update_db_row_state(session, row, journal_const.PENDING)
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
def create_pending_row(session, object_type, object_uuid,
operation, data):
row = models.OVNJournal(object_type=object_type,
object_uuid=object_uuid,
operation=operation, data=data,
created_at=func.now(),
state=journal_const.PENDING)
session.add(row)
session.flush()
#
# Journal maintenance functions
#
@db_api.retry_db_errors
def _update_maintenance_state(session, expected_state, state):
with session.begin():
row = session.query(models.OVNMaintenance).filter_by(
state=expected_state).with_for_update().one_or_none()
if row is None:
return False
row.state = state
return True
def lock_maintenance(session):
return _update_maintenance_state(session, journal_const.PENDING,
journal_const.PROCESSING)
def unlock_maintenance(session):
return _update_maintenance_state(session, journal_const.PROCESSING,
journal_const.PENDING)
def update_maintenance_operation(session, operation=None):
"""Update the current maintenance operation details.
The function assumes the lock is held, so it mustn't be run outside
of a locked context.
"""
op_text = None
if operation:
op_text = operation.__name__
with session.begin():
row = session.query(models.OVNMaintenance).one_or_none()
row.processing_operation = op_text
#
# Journal clean up functions
#
def delete_rows_by_state_and_time(session, state, time_delta):
with session.begin():
now = session.execute(func.now()).scalar()
session.query(models.OVNJournal).filter(
models.OVNJournal.state == state,
models.OVNJournal.last_retried < now - time_delta).delete(
synchronize_session=False)
session.expire_all()
def reset_processing_rows(session, max_timedelta):
with session.begin():
now = session.execute(func.now()).scalar()
max_timedelta = datetime.timedelta(seconds=max_timedelta)
rows = session.query(models.OVNJournal).filter(
models.OVNJournal.last_retried < now - max_timedelta,
models.OVNJournal.state == journal_const.PROCESSING,
).update({'state': journal_const.PENDING})
return rows

View File

@ -1 +1 @@
ac094507b7f4
e229b8aad9f2

View File

@ -0,0 +1,73 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add ovn_journal and ovn_maintenance tables
Revision ID: e229b8aad9f2
Revises: ac094507b7f4
Create Date: 2017-04-28 11:41:47.487584
"""
# revision identifiers, used by Alembic.
revision = 'e229b8aad9f2'
down_revision = 'ac094507b7f4'
from alembic import op
from oslo_utils import uuidutils
import sqlalchemy as sa
from networking_ovn.journal import constants as journal_const
def upgrade():
op.create_table(
'ovn_journal',
sa.Column('seqnum', sa.BigInteger(),
primary_key=True, autoincrement=True),
sa.Column('object_type', sa.String(36), nullable=False),
sa.Column('object_uuid', sa.String(36), nullable=False),
sa.Column('operation', sa.String(36), nullable=False),
sa.Column('data', sa.PickleType, nullable=True),
sa.Column('state', sa.Enum(journal_const.PENDING,
journal_const.PROCESSING,
journal_const.FAILED,
journal_const.COMPLETED,
name='state'),
nullable=False, default='pending'),
sa.Column('retry_count', sa.Integer, default=0),
sa.Column('created_at', sa.DateTime, default=sa.func.now()),
sa.Column('last_retried', sa.TIMESTAMP, server_default=sa.func.now(),
onupdate=sa.func.now()),
)
maint_table = op.create_table(
'ovn_maintenance',
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('state', sa.Enum(journal_const.PENDING,
journal_const.PROCESSING,
name='state'),
nullable=False),
sa.Column('processing_operation', sa.String(70)),
sa.Column('lock_updated', sa.TIMESTAMP, nullable=False,
server_default=sa.func.now(),
onupdate=sa.func.now())
)
# Insert the only row here that is used to synchronize the lock between
# different Neutron processes.
op.bulk_insert(maint_table,
[{'id': uuidutils.generate_uuid(),
'state': journal_const.PENDING}])

View File

@ -0,0 +1,54 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import model_base
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
from networking_ovn.journal import constants as journal_const
class OVNJournal(model_base.BASEV2):
__tablename__ = 'ovn_journal'
seqnum = sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'),
primary_key=True, autoincrement=True)
object_type = sa.Column(sa.String(36), nullable=False)
object_uuid = sa.Column(sa.String(36), nullable=False)
operation = sa.Column(sa.String(36), nullable=False)
data = sa.Column(sa.PickleType, nullable=True)
state = sa.Column(sa.Enum(journal_const.PENDING,
journal_const.FAILED,
journal_const.PROCESSING,
journal_const.COMPLETED),
nullable=False, default=journal_const.PENDING)
retry_count = sa.Column(sa.Integer, default=0)
created_at = sa.Column(
sa.DateTime().with_variant(
sqlite.DATETIME(truncate_microseconds=True), 'sqlite'),
server_default=sa.func.now())
last_retried = sa.Column(sa.TIMESTAMP, server_default=sa.func.now(),
onupdate=sa.func.now())
class OVNMaintenance(model_base.BASEV2, model_base.HasId):
__tablename__ = 'ovn_maintenance'
state = sa.Column(sa.Enum(journal_const.PENDING, journal_const.PROCESSING),
nullable=False)
processing_operation = sa.Column(sa.String(70))
lock_updated = sa.Column(sa.TIMESTAMP, nullable=False,
server_default=sa.func.now(),
onupdate=sa.func.now())

View File

View File

@ -0,0 +1,50 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from oslo_log import log as logging
from networking_ovn.db import db
from networking_ovn.journal import constants as journal_const
LOG = logging.getLogger(__name__)
class JournalCleanup(object):
def __init__(self, completed_rows_retention, processing_timeout):
"""Journal maintenance operation for deleting completed rows.
:param completed_rows_retention: Time (in seconds) to keep rows
marked as COMPLETED in the database.
:param processing_timeout: Time (in seconds) to wait before a
row maked as PROCESSING is set back to PENDING.
"""
self._completed_rows_retention = completed_rows_retention
self._processing_timeout = processing_timeout
def delete_completed_rows(self, session):
if self._completed_rows_retention > 0:
LOG.debug('Journal clean up: Deleting completed rows')
db.delete_rows_by_state_and_time(
session, journal_const.COMPLETED,
timedelta(seconds=self._completed_rows_retention))
def cleanup_processing_rows(self, session):
row_count = db.reset_processing_rows(session, self._processing_timeout)
if row_count:
LOG.info('Reset %(num)s orphaned rows back to pending',
{'num': row_count})

View File

@ -0,0 +1,25 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# States
PENDING = 'pending'
PROCESSING = 'processing'
FAILED = 'failed'
COMPLETED = 'completed'
# Operations
CREATE = 'create'
UPDATE = 'update'
DELETE = 'delete'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions as n_exc
class JournalAlreadyStarted(n_exc.NeutronException):
message = _('Journal thread already started')
class NonRetryableError(n_exc.NeutronException):
"""A generic non-retryable exception."""
pass

View File

@ -0,0 +1,155 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import random
import threading
import time
from neutron.db import api as neutron_db_api
from oslo_log import log
from oslo_utils import uuidutils
import six
from networking_ovn.db import db
from networking_ovn.journal import constants
from networking_ovn.journal import exceptions
LOG = log.getLogger(__name__)
WAKE_UP_EVENTS = {}
def _wake_random_journal_thread():
"""Wake up a random journal sync thread."""
try:
thread_id = random.choice(list(WAKE_UP_EVENTS))
except IndexError:
return
WAKE_UP_EVENTS[thread_id].set()
def wake_journal_thread_on_end(func):
def new_func(obj, *args, **kwargs):
return_value = func(obj, *args, **kwargs)
_wake_random_journal_thread()
return return_value
return new_func
def record(plugin_context, object_type, object_uuid, operation, data):
db.create_pending_row(plugin_context.session, object_type, object_uuid,
operation, data)
@six.add_metaclass(abc.ABCMeta)
class JournalThread(object):
"""Thread worker for the Journal Database."""
def __init__(self, sync_timeout, retry_count):
self._sync_timeout = sync_timeout
self._retry_count = retry_count
self._stop_event = threading.Event()
self._sync_thread = None
self.uuid = uuidutils.generate_uuid()
def start(self):
"""Start the journal sync thread."""
global WAKE_UP_EVENTS
if self._sync_thread is not None:
raise exceptions.JournalAlreadyStarted()
LOG.debug('Starting the journal sync thread')
WAKE_UP_EVENTS[self.uuid] = threading.Event()
self._stop_event.clear()
self._sync_thread = threading.Thread(name='sync', target=self._run)
self._sync_thread.start()
def stop(self):
"""Stop the journal sync thread."""
global WAKE_UP_EVENTS
LOG.debug('Stopping the journal sync thread')
self._stop_event.set()
if self.uuid in WAKE_UP_EVENTS:
WAKE_UP_EVENTS[self.uuid].set()
del WAKE_UP_EVENTS[self.uuid]
@abc.abstractmethod
def validate_dependencies(self, session, entry):
"""Validate resource dependency in journaled operations.
:returns: Boolean value. True if validation suceed, False
otherwise.
"""
@abc.abstractmethod
def sync_entry(self, entry):
"""Performance a synchronization operation on a given entry.
:raises: NonRetryableError
"""
def _run(self):
while not self._stop_event.is_set():
try:
session = neutron_db_api.get_writer_session()
self._sync_pending_entries(session)
except Exception:
LOG.exception('Unknown error while running the journal sync')
WAKE_UP_EVENTS[self.uuid].wait(timeout=self._sync_timeout)
WAKE_UP_EVENTS[self.uuid].clear()
# Clear the _sync_thread after it's fully stopped
self._sync_thread = None
def _sync_pending_entries(self, session):
entry = db.get_oldest_pending_db_row_with_lock(session)
if entry is None:
return
LOG.debug('Start processing journal entries')
while entry is not None:
log_dict = {'op': entry.operation, 'type': entry.object_type,
'id': entry.object_uuid}
if not self.validate_dependencies(session, entry):
db.update_db_row_state(session, entry, constants.PENDING)
LOG.info('Skipping %(op)s %(type)s %(id)s due to '
'unprocessed dependencies', log_dict)
break
LOG.info('Processing - %(op)s %(type)s %(id)s', log_dict)
try:
self.sync_entry(entry)
db.update_db_row_state(session, entry, constants.COMPLETED)
except exceptions.NonRetryableError as e:
log_dict['error'] = e
db.update_db_row_state(session, entry, constants.PENDING)
LOG.error('Non-retryable error while processing %(op)s '
'%(type)s %(id)s, will not process additional '
'entries. Error: %(error)s. ', log_dict)
break
except Exception:
LOG.exception('Error while processing %(op)s %(type)s %(id)s',
log_dict)
db.update_pending_db_row_retry(session, entry,
self._retry_count)
# TODO(lucasagomes): Make this interval configurable ?!
time.sleep(1)
entry = db.get_oldest_pending_db_row_with_lock(session)
LOG.debug('Finished processing journal entries')

View File

@ -0,0 +1,70 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import api as neutron_db_api
from oslo_log import log as logging
from oslo_service import loopingcall
from networking_ovn.db import db
LOG = logging.getLogger(__name__)
class MaintenanceThread(object):
def __init__(self, interval):
self._timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
self._interval = interval
self._operations = []
def start(self):
self._timer.start(self._interval, stop_on_exception=False)
def _execute_op(self, operation, session):
op_details = operation.__name__
if operation.__doc__:
op_details += ' (%s)' % operation.func_doc
try:
LOG.info('Starting maintenance operation %s', op_details)
db.update_maintenance_operation(session, operation=operation)
operation(session=session)
LOG.info('Finished maintenance operation %s', op_details)
except Exception:
LOG.exception('Unknown error during maintenance operation %s',
op_details)
def execute_ops(self):
LOG.debug('Starting journal maintenance run')
session = neutron_db_api.get_writer_session()
if not db.lock_maintenance(session):
return
try:
for operation in self._operations:
self._execute_op(operation, session)
finally:
db.update_maintenance_operation(session, operation=None)
db.unlock_maintenance(session)
LOG.debug('Finished journal maintenance run')
def register_operation(self, func):
"""Register a function to be run by the maintenance thread.
:param f: Function to call when the thread runs. The function will
receive a DB session to use for DB operations.
"""
self._operations.append(func)