Add an outdated flag for changes

If we detect that a change sync has failed, mark that change as
outdated in the local database so that if it has failed in such a
way that Gertty continues to process its queue and eventually stamp
a project or query as up-to-date, it can go back later and try to
sync the failed change again.

NB: This change contains a schema migration.

Change-Id: Ieb22a510a8096d77c12d30aefbedd36f7f3037b0
This commit is contained in:
James E. Blair 2016-08-09 09:30:45 -07:00
parent 85740c5885
commit eeef69ed5f
3 changed files with 87 additions and 4 deletions

View File

@ -0,0 +1,37 @@
"""add change.outdated
Revision ID: 7ef7dfa2ca3a
Revises: 37a702b7f58e
Create Date: 2016-08-09 08:59:04.441926
"""
# revision identifiers, used by Alembic.
revision = '7ef7dfa2ca3a'
down_revision = '37a702b7f58e'
import warnings
from alembic import op
import sqlalchemy as sa
from gertty.dbsupport import sqlite_alter_columns
def upgrade():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
op.add_column('change', sa.Column('outdated', sa.Boolean()))
connection = op.get_bind()
change = sa.sql.table('change',
sa.sql.column('outdated', sa.Boolean()))
connection.execute(change.update().values({'outdated':False}))
sqlite_alter_columns('change', [
sa.Column('outdated', sa.Boolean(), index=True, nullable=False),
])
def downgrade():
pass

View File

@ -82,6 +82,7 @@ change_table = Table(
Column('pending_status', Boolean, index=True, nullable=False),
Column('pending_status_message', Text),
Column('last_seen', DateTime, index=True),
Column('outdated', Boolean, index=True, nullable=False),
)
change_conflict_table = Table(
'change_conflict', metadata,
@ -257,7 +258,7 @@ class Change(object):
hidden=False, reviewed=False, starred=False, held=False,
pending_rebase=False, pending_topic=False,
pending_starred=False, pending_status=False,
pending_status_message=None):
pending_status_message=None, outdated=False):
self.project_key = project.key
self.account_key = owner.key
self.id = id
@ -278,6 +279,7 @@ class Change(object):
self.pending_starred = pending_starred
self.pending_status = pending_status
self.pending_status_message = pending_status_message
self.outdated = outdated
def getCategories(self):
categories = set([label.category for label in self.labels])
@ -933,6 +935,9 @@ class DatabaseSession(object):
def getHeld(self):
return self.session().query(Change).filter_by(held=True).all()
def getOutdated(self):
return self.session().query(Change).filter_by(outdated=True).all()
def getPendingMessages(self):
return self.session().query(Message).filter_by(pending=True).all()

View File

@ -528,6 +528,24 @@ class SyncChangeByNumberTask(Task):
sync.submitTask(task)
self.log.debug("Sync change %s because it is number %s" % (c['id'], self.number))
class SyncOutdatedChangesTask(Task):
def __init__(self, priority=NORMAL_PRIORITY):
super(SyncOutdatedChangesTask, self).__init__(priority)
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def __repr__(self):
return '<SyncOutdatedChangesTask>'
def run(self, sync):
with sync.app.db.getSession() as session:
for change in session.getOutdated():
self.log.debug("Sync outdated change %s" % (change.id,))
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class SyncChangeTask(Task):
def __init__(self, change_id, force_fetch=False, priority=NORMAL_PRIORITY):
super(SyncChangeTask, self).__init__(priority)
@ -546,6 +564,22 @@ class SyncChangeTask(Task):
def run(self, sync):
start_time = time.time()
try:
self._syncChange(sync)
end_time = time.time()
total_time = end_time - start_time
self.log.info("Synced change %s in %0.5f seconds.", self.change_id, total_time)
except Exception:
try:
self.log.error("Marking change %s outdated" % (self.change_id,))
with sync.app.db.getSession() as session:
change = session.getChangeByID(self.change_id)
change.outdated = True
except Exception:
self.log.exception("Error while marking change %s as outdated" % (self.change_id,))
raise
def _syncChange(self, sync):
app = sync.app
remote_change = sync.get('changes/%s?o=DETAILED_LABELS&o=ALL_REVISIONS&o=ALL_COMMITS&o=MESSAGES&o=DETAILED_ACCOUNTS&o=CURRENT_ACTIONS&o=ALL_FILES' % self.change_id)
# Perform subqueries this task will need outside of the db session
@ -870,6 +904,7 @@ class SyncChangeTask(Task):
change.reviewed = False
result.review_flag_changed = True
app.project_cache.clear(change.project)
change.outdated = False
for url, refs in fetches.items():
self.log.debug("Fetching from %s with refs %s", url, refs)
try:
@ -881,9 +916,6 @@ class SyncChangeTask(Task):
for ref in refs:
self.log.debug("git fetch %s %s" % (url, ref))
repo.fetch(url, ref)
end_time = time.time()
total_time = end_time - start_time
self.log.info("Synced change %s in %0.5f seconds.", self.change_id, total_time)
class CheckReposTask(Task):
# on startup, check all projects
@ -1340,6 +1372,7 @@ class Sync(object):
self.submitTask(SyncProjectListTask(HIGH_PRIORITY))
self.submitTask(SyncSubscribedProjectsTask(NORMAL_PRIORITY))
self.submitTask(SyncSubscribedProjectBranchesTask(LOW_PRIORITY))
self.submitTask(SyncOutdatedChangesTask(LOW_PRIORITY))
self.submitTask(PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY))
self.periodic_thread = threading.Thread(target=self.periodicSync)
self.periodic_thread.daemon = True
@ -1355,6 +1388,7 @@ class Sync(object):
if now-hourly > 3600:
hourly = now
self.pruneDatabase()
self.syncOutdatedChanges()
except Exception:
self.log.exception('Exception in periodicSync')
@ -1484,6 +1518,13 @@ class Sync(object):
for subtask in task.tasks:
subtask.wait()
def syncOutdatedChanges(self):
task = SyncOutdatedChangesTask(LOW_PRIORITY)
self.submitTask(task)
if task.wait():
for subtask in task.tasks:
subtask.wait()
def _syncChangeByCommit(self, commit, priority):
# Accumulate sync change by commit tasks because they often
# come in batches. This method assumes it is being called