Deprecate `state` field and propose `last_processed_timestamp` field
Changes the `state` field in `cloudkitty_storage_states` table. The goal is to use a more descriptive and meaningful name. The proposed new column name is `last_processed_timestamp`. Depends-on: https://review.opendev.org/c/openstack/cloudkitty/+/793973 Depends-on: https://review.opendev.org/c/openstack/cloudkitty/+/797443 Change-Id: I9e29808dc5771a6455d3c564f080e8c2ec28dc72 Implements: https://review.opendev.org/c/openstack/cloudkitty-specs/+/771513
This commit is contained in:
parent
35ce237b76
commit
70114c829e
|
@ -24,6 +24,10 @@ from cloudkitty import storage_state
|
|||
from cloudkitty.utils import tz as tzutils
|
||||
from cloudkitty.utils import validation as vutils
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ScopeState(base.BaseResource):
|
||||
|
||||
|
@ -49,7 +53,10 @@ class ScopeState(base.BaseResource):
|
|||
voluptuous.Required('scope_key'): vutils.get_string_type(),
|
||||
voluptuous.Required('fetcher'): vutils.get_string_type(),
|
||||
voluptuous.Required('collector'): vutils.get_string_type(),
|
||||
voluptuous.Required('state'): vutils.get_string_type(),
|
||||
voluptuous.Optional(
|
||||
'last_processed_timestamp'): vutils.get_string_type(),
|
||||
# This "state" property should be removed in the next release.
|
||||
voluptuous.Optional('state'): vutils.get_string_type(),
|
||||
}]})
|
||||
def get(self,
|
||||
offset=0,
|
||||
|
@ -81,7 +88,9 @@ class ScopeState(base.BaseResource):
|
|||
'scope_key': r.scope_key,
|
||||
'fetcher': r.fetcher,
|
||||
'collector': r.collector,
|
||||
'state': r.state.isoformat(),
|
||||
'state': r.last_processed_timestamp.isoformat(),
|
||||
'last_processed_timestamp':
|
||||
r.last_processed_timestamp.isoformat(),
|
||||
} for r in results]
|
||||
}
|
||||
|
||||
|
@ -96,7 +105,10 @@ class ScopeState(base.BaseResource):
|
|||
api_utils.MultiQueryParam(str),
|
||||
voluptuous.Optional('collector', default=[]):
|
||||
api_utils.MultiQueryParam(str),
|
||||
voluptuous.Required('state'):
|
||||
voluptuous.Optional('last_processed_timestamp'):
|
||||
voluptuous.Coerce(tzutils.dt_from_iso),
|
||||
# This "state" property should be removed in the next release.
|
||||
voluptuous.Optional('state'):
|
||||
voluptuous.Coerce(tzutils.dt_from_iso),
|
||||
})
|
||||
def put(self,
|
||||
|
@ -105,6 +117,7 @@ class ScopeState(base.BaseResource):
|
|||
scope_key=None,
|
||||
fetcher=None,
|
||||
collector=None,
|
||||
last_processed_timestamp=None,
|
||||
state=None):
|
||||
|
||||
policy.authorize(
|
||||
|
@ -117,6 +130,15 @@ class ScopeState(base.BaseResource):
|
|||
raise http_exceptions.BadRequest(
|
||||
"Either all_scopes or a scope_id should be specified.")
|
||||
|
||||
if not state and not last_processed_timestamp:
|
||||
raise http_exceptions.BadRequest(
|
||||
"Variables 'state' and 'last_processed_timestamp' cannot be "
|
||||
"empty/None. We expect at least one of them.")
|
||||
if state:
|
||||
LOG.warning("The use of 'state' variable is deprecated, and will "
|
||||
"be removed in the next upcomming release. You should "
|
||||
"consider using 'last_processed_timestamp' variable.")
|
||||
|
||||
results = self._storage_state.get_all(
|
||||
identifier=scope_id,
|
||||
scope_key=scope_key,
|
||||
|
@ -135,8 +157,11 @@ class ScopeState(base.BaseResource):
|
|||
'collector': r.collector,
|
||||
} for r in results]
|
||||
|
||||
if not last_processed_timestamp:
|
||||
last_processed_timestamp = state
|
||||
self._client.cast({}, 'reset_state', res_data={
|
||||
'scopes': serialized_results, 'state': state.isoformat(),
|
||||
'scopes': serialized_results,
|
||||
'last_processed_timestamp': last_processed_timestamp.isoformat()
|
||||
})
|
||||
|
||||
return {}, 202
|
||||
|
|
|
@ -174,14 +174,15 @@ class ScopeEndpoint(object):
|
|||
lock_name,
|
||||
)
|
||||
)
|
||||
state_dt = tzutils.dt_from_iso(res_data['state'])
|
||||
last_processed_timestamp = tzutils.dt_from_iso(
|
||||
res_data['last_processed_timestamp'])
|
||||
try:
|
||||
self._storage.delete(begin=state_dt, end=None, filters={
|
||||
scope['scope_key']: scope['scope_id'],
|
||||
})
|
||||
self._state.set_state(
|
||||
self._storage.delete(
|
||||
begin=last_processed_timestamp, end=None, filters={
|
||||
scope['scope_key']: scope['scope_id']})
|
||||
self._state.set_last_processed_timestamp(
|
||||
scope['scope_id'],
|
||||
state_dt,
|
||||
last_processed_timestamp,
|
||||
fetcher=scope['fetcher'],
|
||||
collector=scope['collector'],
|
||||
scope_key=scope['scope_key'],
|
||||
|
|
|
@ -78,7 +78,8 @@ class StateManager(object):
|
|||
session.close()
|
||||
|
||||
for item in r:
|
||||
item.state = tzutils.utc_to_local(item.state)
|
||||
item.last_processed_timestamp = tzutils.utc_to_local(
|
||||
item.last_processed_timestamp)
|
||||
|
||||
return r
|
||||
|
||||
|
@ -119,12 +120,26 @@ class StateManager(object):
|
|||
|
||||
def set_state(self, identifier, state,
|
||||
fetcher=None, collector=None, scope_key=None):
|
||||
"""Set the state of a scope.
|
||||
"""Set the last processed timestamp of a scope.
|
||||
|
||||
This method is deprecated, consider using
|
||||
"set_last_processed_timestamp".
|
||||
"""
|
||||
LOG.warning("The method 'set_state' is deprecated."
|
||||
"Consider using the new method "
|
||||
"'set_last_processed_timestamp'.")
|
||||
self.set_last_processed_timestamp(
|
||||
identifier, state, fetcher, collector, scope_key)
|
||||
|
||||
def set_last_processed_timestamp(
|
||||
self, identifier, last_processed_timestamp, fetcher=None,
|
||||
collector=None, scope_key=None):
|
||||
"""Set the last processed timestamp of a scope.
|
||||
|
||||
:param identifier: Identifier of the scope
|
||||
:type identifier: str
|
||||
:param state: state of the scope
|
||||
:type state: datetime.datetime
|
||||
:param last_processed_timestamp: last processed timestamp of the scope
|
||||
:type last_processed_timestamp: datetime.datetime
|
||||
:param fetcher: Fetcher associated to the scope
|
||||
:type fetcher: str
|
||||
:param collector: Collector associated to the scope
|
||||
|
@ -132,20 +147,21 @@ class StateManager(object):
|
|||
:param scope_key: scope_key associated to the scope
|
||||
:type scope_key: str
|
||||
"""
|
||||
state = tzutils.local_to_utc(state, naive=True)
|
||||
last_processed_timestamp = tzutils.local_to_utc(
|
||||
last_processed_timestamp, naive=True)
|
||||
session = db.get_session()
|
||||
session.begin()
|
||||
r = self._get_db_item(
|
||||
session, identifier, fetcher, collector, scope_key)
|
||||
|
||||
if r:
|
||||
if r.state != state:
|
||||
r.state = state
|
||||
if r.last_processed_timestamp != last_processed_timestamp:
|
||||
r.last_processed_timestamp = last_processed_timestamp
|
||||
session.commit()
|
||||
else:
|
||||
state_object = self.model(
|
||||
identifier=identifier,
|
||||
state=state,
|
||||
last_processed_timestamp=last_processed_timestamp,
|
||||
fetcher=fetcher,
|
||||
collector=collector,
|
||||
scope_key=scope_key,
|
||||
|
@ -157,7 +173,15 @@ class StateManager(object):
|
|||
|
||||
def get_state(self, identifier,
|
||||
fetcher=None, collector=None, scope_key=None):
|
||||
"""Get the state of a scope.
|
||||
LOG.warning("The method 'get_state' is deprecated."
|
||||
"Consider using the new method"
|
||||
"'get_last_processed_timestamp'.")
|
||||
return self.get_last_processed_timestamp(
|
||||
identifier, fetcher, collector, scope_key)
|
||||
|
||||
def get_last_processed_timestamp(self, identifier, fetcher=None,
|
||||
collector=None, scope_key=None):
|
||||
"""Get the last processed timestamp of a scope.
|
||||
|
||||
:param identifier: Identifier of the scope
|
||||
:type identifier: str
|
||||
|
@ -174,7 +198,7 @@ class StateManager(object):
|
|||
r = self._get_db_item(
|
||||
session, identifier, fetcher, collector, scope_key)
|
||||
session.close()
|
||||
return tzutils.utc_to_local(r.state) if r else None
|
||||
return tzutils.utc_to_local(r.last_processed_timestamp) if r else None
|
||||
|
||||
def init(self):
|
||||
migration.upgrade('head')
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
# Copyright 2019 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Create last processed timestamp column
|
||||
|
||||
Revision ID: 750d3050cf71
|
||||
Revises: d9d103dd4dcf
|
||||
Create Date: 2021-02-08 17:00:00.000
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
||||
from cloudkitty.storage_state.alembic.versions import \
|
||||
c50ed2c19204_update_storage_state_constraint as down_version_module
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '750d3050cf71'
|
||||
down_revision = 'c50ed2c19204'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
for name, table in down_version_module.Base.metadata.tables.items():
|
||||
if name == 'cloudkitty_storage_states':
|
||||
with op.batch_alter_table(name,
|
||||
copy_from=table,
|
||||
recreate='always') as batch_op:
|
||||
batch_op.alter_column(
|
||||
'state', new_column_name='last_processed_timestamp')
|
||||
|
||||
break
|
|
@ -20,8 +20,10 @@ Create Date: 2019-05-15 17:02:56.595274
|
|||
|
||||
"""
|
||||
from alembic import op
|
||||
from sqlalchemy.ext import declarative
|
||||
|
||||
from cloudkitty.storage_state import models
|
||||
from oslo_db.sqlalchemy import models
|
||||
import sqlalchemy
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'c50ed2c19204'
|
||||
|
@ -29,9 +31,11 @@ down_revision = 'd9d103dd4dcf'
|
|||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
Base = declarative.declarative_base()
|
||||
|
||||
|
||||
def upgrade():
|
||||
for name, table in models.Base.metadata.tables.items():
|
||||
for name, table in Base.metadata.tables.items():
|
||||
if name == 'cloudkitty_storage_states':
|
||||
|
||||
with op.batch_alter_table(name,
|
||||
|
@ -43,3 +47,36 @@ def upgrade():
|
|||
['identifier', 'scope_key', 'collector', 'fetcher'])
|
||||
|
||||
break
|
||||
|
||||
|
||||
class IdentifierTableForThisDataBaseModelChangeSet(Base, models.ModelBase):
|
||||
"""Represents the state of a given identifier."""
|
||||
|
||||
@declarative.declared_attr
|
||||
def __table_args__(cls):
|
||||
return (
|
||||
sqlalchemy.schema.UniqueConstraint(
|
||||
'identifier',
|
||||
'scope_key',
|
||||
'collector',
|
||||
'fetcher',
|
||||
name='uq_cloudkitty_storage_states_identifier'),
|
||||
)
|
||||
|
||||
__tablename__ = 'cloudkitty_storage_states'
|
||||
|
||||
id = sqlalchemy.Column(sqlalchemy.Integer,
|
||||
primary_key=True)
|
||||
identifier = sqlalchemy.Column(sqlalchemy.String(256),
|
||||
nullable=False,
|
||||
unique=False)
|
||||
scope_key = sqlalchemy.Column(sqlalchemy.String(40),
|
||||
nullable=True,
|
||||
unique=False)
|
||||
fetcher = sqlalchemy.Column(sqlalchemy.String(40),
|
||||
nullable=True,
|
||||
unique=False)
|
||||
collector = sqlalchemy.Column(sqlalchemy.String(40),
|
||||
nullable=True,
|
||||
unique=False)
|
||||
state = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)
|
||||
|
|
|
@ -52,5 +52,5 @@ class IdentifierState(Base, models.ModelBase):
|
|||
collector = sqlalchemy.Column(sqlalchemy.String(40),
|
||||
nullable=True,
|
||||
unique=False)
|
||||
state = sqlalchemy.Column(sqlalchemy.DateTime,
|
||||
nullable=False)
|
||||
last_processed_timestamp = sqlalchemy.Column(
|
||||
sqlalchemy.DateTime, nullable=False)
|
||||
|
|
|
@ -156,8 +156,6 @@ class StorageUnitTest(TestCase):
|
|||
def test_get_total_one_scope_one_period(self):
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
[self.data[0]], self._project_id)
|
||||
expected_total, expected_qty, _ = self._expected_total_qty_len(
|
||||
[self.data[0]], self._project_id)
|
||||
|
||||
begin = datetime.datetime(2018, 1, 1)
|
||||
end = datetime.datetime(2018, 1, 1, 1)
|
||||
|
|
|
@ -54,8 +54,9 @@ class ScopeEndpointTest(tests.TestCase):
|
|||
|
||||
storage_delete_patch = mock.patch.object(
|
||||
influx.InfluxStorage, 'delete')
|
||||
|
||||
state_set_patch = mock.patch.object(
|
||||
storage_state.StateManager, 'set_state')
|
||||
storage_state.StateManager, 'set_last_processed_timestamp')
|
||||
|
||||
with coord_start_patch, lock_acquire_patch, \
|
||||
storage_delete_patch as sd, state_set_patch as ss:
|
||||
|
@ -76,7 +77,7 @@ class ScopeEndpointTest(tests.TestCase):
|
|||
'fetcher': 'gnocchi',
|
||||
},
|
||||
],
|
||||
'state': '20190716T085501Z',
|
||||
'last_processed_timestamp': '20190716T085501Z',
|
||||
})
|
||||
|
||||
sd.assert_has_calls([
|
||||
|
|
|
@ -56,12 +56,12 @@ class StateManagerTest(tests.TestCase):
|
|||
return output, mock.Mock(return_value=output)
|
||||
|
||||
@staticmethod
|
||||
def _get_r_mock(scope_key, collector, fetcher, state):
|
||||
def _get_r_mock(scope_key, collector, fetcher, last_processed_timestamp):
|
||||
r_mock = mock.Mock()
|
||||
r_mock.scope_key = scope_key
|
||||
r_mock.collector = collector
|
||||
r_mock.fetcher = fetcher
|
||||
r_mock.state = state
|
||||
r_mock.last_processed_timestamp = last_processed_timestamp
|
||||
return r_mock
|
||||
|
||||
def _test_x_state_does_update_columns(self, func):
|
||||
|
@ -127,6 +127,6 @@ class StateManagerTest(tests.TestCase):
|
|||
sm.return_value = session_mock = mock.MagicMock()
|
||||
self.assertNotEqual(r_mock.state, new_state)
|
||||
self._state.set_state('fake_identifier', new_state)
|
||||
self.assertEqual(r_mock.state, new_state)
|
||||
self.assertEqual(r_mock.last_processed_timestamp, new_state)
|
||||
session_mock.commit.assert_called_once()
|
||||
session_mock.add.assert_not_called()
|
||||
|
|
|
@ -5,21 +5,21 @@
|
|||
"fetcher": "keystone",
|
||||
"scope_id": "7a7e5183264644a7a79530eb56e59941",
|
||||
"scope_key": "project_id",
|
||||
"state": "2019-05-09 10:00:00"
|
||||
"last_processed_timestamp": "2019-05-09 10:00:00"
|
||||
},
|
||||
{
|
||||
"collector": "gnocchi",
|
||||
"fetcher": "keystone",
|
||||
"scope_id": "9084fadcbd46481788e0ad7405dcbf12",
|
||||
"scope_key": "project_id",
|
||||
"state": "2019-05-08 03:00:00"
|
||||
"last_processed_timestamp": "2019-05-08 03:00:00"
|
||||
},
|
||||
{
|
||||
"collector": "gnocchi",
|
||||
"fetcher": "keystone",
|
||||
"scope_id": "1f41d183fca5490ebda5c63fbaca026a",
|
||||
"scope_key": "project_id",
|
||||
"state": "2019-05-06 22:00:00"
|
||||
"last_processed_timestamp": "2019-05-06 22:00:00"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ Response
|
|||
- collector: collector_resp
|
||||
- fetcher: fetcher_resp
|
||||
- state: state
|
||||
- last_processed_timestamp: last_processed_timestamp
|
||||
- scope_id: scope_id_resp
|
||||
- scope_key: scope_key_resp
|
||||
|
||||
|
@ -60,6 +61,7 @@ Reset the status of several scopes.
|
|||
.. rest_parameters:: scope/scope_parameters.yml
|
||||
|
||||
- state: state
|
||||
- last_processed_timestamp: last_processed_timestamp
|
||||
- collector: collector_body
|
||||
- fetcher: fetcher_body
|
||||
- scope_id: scope_id_body
|
||||
|
|
|
@ -66,6 +66,13 @@ fetcher_resp:
|
|||
description: Fetcher for the given scope
|
||||
in: body
|
||||
|
||||
last_processed_timestamp:
|
||||
in: body
|
||||
description: |
|
||||
It represents the last processed timestamp for the storage state element.
|
||||
type: iso8601 timestamp
|
||||
required: true
|
||||
|
||||
scope_id_body:
|
||||
<<: *scope_id
|
||||
in: body
|
||||
|
@ -89,6 +96,9 @@ scope_key_resp:
|
|||
state:
|
||||
in: body
|
||||
description: |
|
||||
State of the scope.
|
||||
State of the scope. This variable represents the last processed
|
||||
timestamp for the storage state element. It is DEPRECATED, and it will
|
||||
be removed in upcoming releases. The alternative is
|
||||
`last_processed_timestamp`.
|
||||
type: iso8601 timestamp
|
||||
required: true
|
||||
|
|
Loading…
Reference in New Issue