Merge "Add active status fields in the storage state table"

This commit is contained in:
Zuul 2021-11-29 15:07:30 +00:00 committed by Gerrit Code Review
commit 83e89239a8
13 changed files with 398 additions and 31 deletions

View File

@ -47,6 +47,8 @@ class ScopeState(base.BaseResource):
api_utils.MultiQueryParam(str),
voluptuous.Optional('collector', default=[]):
api_utils.MultiQueryParam(str),
voluptuous.Optional('active', default=[]):
api_utils.MultiQueryParam(int),
})
@api_utils.add_output_schema({'results': [{
voluptuous.Required('scope_id'): vutils.get_string_type(),
@ -57,14 +59,12 @@ class ScopeState(base.BaseResource):
'last_processed_timestamp'): vutils.get_string_type(),
# This "state" property should be removed in the next release.
voluptuous.Optional('state'): vutils.get_string_type(),
voluptuous.Required('active'): bool,
voluptuous.Optional('scope_activation_toggle_date'):
vutils.get_string_type(),
}]})
def get(self,
offset=0,
limit=100,
scope_id=None,
scope_key=None,
fetcher=None,
collector=None):
def get(self, offset=0, limit=100, scope_id=None, scope_key=None,
fetcher=None, collector=None, active=None):
policy.authorize(
flask.request.context,
@ -72,13 +72,9 @@ class ScopeState(base.BaseResource):
{'project_id': scope_id or flask.request.context.project_id}
)
results = self._storage_state.get_all(
identifier=scope_id,
scope_key=scope_key,
fetcher=fetcher,
collector=collector,
offset=offset,
limit=limit,
)
identifier=scope_id, scope_key=scope_key, fetcher=fetcher,
collector=collector, offset=offset, limit=limit, active=active)
if len(results) < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
@ -91,6 +87,10 @@ class ScopeState(base.BaseResource):
'state': r.last_processed_timestamp.isoformat(),
'last_processed_timestamp':
r.last_processed_timestamp.isoformat(),
'active': r.active,
'scope_activation_toggle_date':
r.scope_activation_toggle_date.isoformat() if
r.scope_activation_toggle_date else None
} for r in results]
}
@ -165,3 +165,67 @@ class ScopeState(base.BaseResource):
})
return {}, 202
@api_utils.add_input_schema('body', {
voluptuous.Required('scope_id'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('scope_key'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('fetcher'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('collector'):
api_utils.SingleQueryParam(str),
voluptuous.Optional('active'):
api_utils.SingleQueryParam(bool),
})
@api_utils.add_output_schema({
voluptuous.Required('scope_id'): vutils.get_string_type(),
voluptuous.Required('scope_key'): vutils.get_string_type(),
voluptuous.Required('fetcher'): vutils.get_string_type(),
voluptuous.Required('collector'): vutils.get_string_type(),
voluptuous.Required('state'): vutils.get_string_type(),
voluptuous.Required('active'): bool,
voluptuous.Required('scope_activation_toggle_date'):
vutils.get_string_type()
})
def patch(self, scope_id, scope_key=None, fetcher=None,
collector=None, active=None):
policy.authorize(
flask.request.context,
'scope:patch_state',
{'tenant_id': scope_id or flask.request.context.project_id}
)
results = self._storage_state.get_all(identifier=scope_id)
if len(results) < 1:
raise http_exceptions.NotFound(
"No resource found for provided filters.")
if len(results) > 1:
LOG.debug("Too many resources found with the same scope_id [%s], "
"scopes found: [%s].", scope_id, results)
raise http_exceptions.NotFound("Too many resources found with "
"the same scope_id: %s." % scope_id)
scope_to_update = results[0]
LOG.debug("Executing update of storage scope: [%s].", scope_to_update)
self._storage_state.update_storage_scope(scope_to_update,
scope_key=scope_key,
fetcher=fetcher,
collector=collector,
active=active)
storage_scopes = self._storage_state.get_all(identifier=scope_id)
update_storage_scope = storage_scopes[0]
return {
'scope_id': update_storage_scope.identifier,
'scope_key': update_storage_scope.scope_key,
'fetcher': update_storage_scope.fetcher,
'collector': update_storage_scope.collector,
'state': update_storage_scope.state.isoformat(),
'active': update_storage_scope.active,
'scope_activation_toggle_date':
update_storage_scope.scope_activation_toggle_date.isoformat()
}

View File

@ -30,6 +30,13 @@ scope_policies = [
description='Reset the state of one or several scopes',
operations=[{'path': '/v2/scope',
'method': 'PUT'}]),
policy.DocumentedRuleDefault(
name='scope:patch_state',
check_str=base.ROLE_ADMIN,
description='Enables operators to patch a storage scope',
operations=[{'path': '/v2/scope',
'method': 'PATCH'}]),
]

View File

@ -309,18 +309,42 @@ class Worker(BaseWorker):
def run(self):
while True:
timestamp = self._check_state()
LOG.debug("Processing timestamp [%s] for storage scope [%s].",
timestamp, self._tenant_id)
if not timestamp:
break
if self._state.get_state(self._tenant_id):
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] "
"because it is marked as inactive.",
self._tenant_id)
break
else:
LOG.debug("No need to check if [%s] is de-activated. "
"We have never processed it before.")
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] because "
"it is marked as inactive.", self._tenant_id)
break
metrics = list(self._conf['metrics'].keys())
# Collection
usage_data = self._do_collection(metrics, timestamp)
LOG.debug("Usage data [%s] found for storage scope [%s] in "
"timestamp [%s].", usage_data, self._tenant_id,
timestamp)
start_time = timestamp
end_time = tzutils.add_delta(timestamp,
timedelta(seconds=self._period))
frame = dataframe.DataFrame(
start=timestamp,
end=tzutils.add_delta(timestamp,
timedelta(seconds=self._period)),
start=start_time,
end=end_time,
usage=usage_data,
)
# Rating
@ -328,6 +352,10 @@ class Worker(BaseWorker):
frame = processor.obj.process(frame)
# Writing
LOG.debug("Persisting processed frames [%s] for tenant [%s] and "
"time [start=%s,end=%s]", frame, self._tenant_id,
start_time, end_time)
self._storage.push([frame], self._tenant_id)
self._state.set_state(self._tenant_id, timestamp)
@ -400,6 +428,8 @@ class Orchestrator(cotyledon.Service):
w=self._worker_id, lck=lock_name)
)
state = self._check_state(tenant_id)
LOG.debug("Next timestamp [%s] found for processing for "
"storage scope [%s].", state, tenant_id)
if state:
worker = Worker(
self.collector,
@ -411,6 +441,8 @@ class Orchestrator(cotyledon.Service):
lock.release()
LOG.debug("Finished processing all storage scopes.")
# FIXME(sheeprine): We may cause a drift here
time.sleep(CONF.collect.period)

View File

@ -33,6 +33,20 @@ CONF.import_opt('collector', 'cloudkitty.collector', 'collect')
CONF.import_opt('scope_key', 'cloudkitty.collector', 'collect')
def to_list_if_needed(value):
if not isinstance(value, list):
value = [value]
return value
def apply_offset_and_limit(limit, offset, q):
if offset:
q = q.offset(offset)
if limit:
q = q.limit(limit)
return q
class StateManager(object):
"""Class allowing state management in CloudKitty"""
@ -43,7 +57,9 @@ class StateManager(object):
fetcher=None,
collector=None,
scope_key=None,
limit=100, offset=0):
active=1,
limit=100,
offset=0):
"""Returns the state of all scopes.
This function returns the state of all scopes with support for optional
@ -59,20 +75,33 @@ class StateManager(object):
:type fetcher: list
:param scope_key: optional scope_keys to filter on
:type scope_key: list
:param active: optional active to filter scopes by status
(active/deactivated)
:type active: int
:param limit: optional to restrict the projection
:type limit: int
:param offset: optional to shift the projection
:type offset: int
"""
session = db.get_session()
session.begin()
q = utils.model_query(self.model, session)
if identifier:
q = q.filter(self.model.identifier.in_(identifier))
q = q.filter(
self.model.identifier.in_(to_list_if_needed(identifier)))
if fetcher:
q = q.filter(self.model.fetcher.in_(fetcher))
q = q.filter(
self.model.fetcher.in_(to_list_if_needed(fetcher)))
if collector:
q = q.filter(self.model.collector.in_(collector))
q = q.filter(
self.model.collector.in_(to_list_if_needed(collector)))
if scope_key:
q = q.filter(self.model.scope_key.in_(scope_key))
q = q.offset(offset).limit(limit)
q = q.filter(
self.model.scope_key.in_(to_list_if_needed(scope_key)))
if active is not None and active != []:
q = q.filter(self.model.active.in_(to_list_if_needed(active)))
q = apply_offset_and_limit(limit, offset, q)
r = q.all()
session.close()
@ -80,7 +109,8 @@ class StateManager(object):
for item in r:
item.last_processed_timestamp = tzutils.utc_to_local(
item.last_processed_timestamp)
item.scope_activation_toggle_date = tzutils.utc_to_local(
item.scope_activation_toggle_date)
return r
def _get_db_item(self, session, identifier,
@ -211,3 +241,65 @@ class StateManager(object):
q = utils.model_query(self.model, session)
session.close()
return [tenant.identifier for tenant in q]
def update_storage_scope(self, storage_scope_to_update, scope_key=None,
fetcher=None, collector=None, active=None):
"""Update storage scope data.
:param storage_scope_to_update: The storage scope to update in the DB
:type storage_scope_to_update: object
:param fetcher: Fetcher associated to the scope
:type fetcher: str
:param collector: Collector associated to the scope
:type collector: str
:param scope_key: scope_key associated to the scope
:type scope_key: str
:param active: indicates if the storage scope is active for processing
:type active: bool
"""
session = db.get_session()
session.begin()
db_scope = self._get_db_item(session,
storage_scope_to_update.identifier,
storage_scope_to_update.fetcher,
storage_scope_to_update.collector,
storage_scope_to_update.scope_key)
if scope_key:
db_scope.scope_key = scope_key
if fetcher:
db_scope.fetcher = fetcher
if collector:
db_scope.collector = collector
if active is not None and active != db_scope.active:
db_scope.active = active
now = tzutils.localized_now()
db_scope.scope_activation_toggle_date = tzutils.local_to_utc(
now, naive=True)
session.commit()
session.close()
def is_storage_scope_active(self, identifier, fetcher=None,
collector=None, scope_key=None):
"""Checks if a storage scope is active
:param identifier: Identifier of the scope
:type identifier: str
:param fetcher: Fetcher associated to the scope
:type fetcher: str
:param collector: Collector associated to the scope
:type collector: str
:param scope_key: scope_key associated to the scope
:type scope_key: str
:rtype: datetime.datetime
"""
session = db.get_session()
session.begin()
r = self._get_db_item(
session, identifier, fetcher, collector, scope_key)
session.close()
return r.active

View File

@ -0,0 +1,50 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Update storage state constraint
Revision ID: 4d69395f
Revises: 750d3050cf71
Create Date: 2019-05-15 17:02:56.595274
"""
import importlib
import sqlalchemy
from alembic import op
# revision identifiers, used by Alembic.
revision = '4d69395f'
down_revision = '750d3050cf71'
def upgrade():
down_version_module = importlib.import_module(
"cloudkitty.storage_state.alembic.versions."
"750d3050_create_last_processed_timestamp_column")
for name, table in down_version_module.Base.metadata.tables.items():
if name == 'cloudkitty_storage_states':
with op.batch_alter_table(name,
copy_from=table,
recreate='always') as batch_op:
batch_op.alter_column('identifier')
batch_op.add_column(
sqlalchemy.Column('scope_activation_toggle_date',
sqlalchemy.DateTime, nullable=False,
server_default=sqlalchemy.sql.func.now())
)
batch_op.add_column(
sqlalchemy.Column('active', sqlalchemy.Boolean,
nullable=False, default=True))
break

View File

@ -21,6 +21,11 @@ Create Date: 2021-02-08 17:00:00.000
"""
from alembic import op
import sqlalchemy
from sqlalchemy.ext import declarative
from oslo_db.sqlalchemy import models
from cloudkitty.storage_state.alembic.versions import \
c50ed2c19204_update_storage_state_constraint as down_version_module
@ -30,6 +35,8 @@ down_revision = 'c50ed2c19204'
branch_labels = None
depends_on = None
Base = declarative.declarative_base()
def upgrade():
for name, table in down_version_module.Base.metadata.tables.items():
@ -41,3 +48,37 @@ def upgrade():
'state', new_column_name='last_processed_timestamp')
break
class IdentifierTableForThisDataBaseModelChangeSet(Base, models.ModelBase):
"""Represents the state of a given identifier."""
@declarative.declared_attr
def __table_args__(cls):
return (
sqlalchemy.schema.UniqueConstraint(
'identifier',
'scope_key',
'collector',
'fetcher',
name='uq_cloudkitty_storage_states_identifier'),
)
__tablename__ = 'cloudkitty_storage_states'
id = sqlalchemy.Column(sqlalchemy.Integer,
primary_key=True)
identifier = sqlalchemy.Column(sqlalchemy.String(256),
nullable=False,
unique=False)
scope_key = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
fetcher = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
collector = sqlalchemy.Column(sqlalchemy.String(40),
nullable=True,
unique=False)
last_processed_timestamp = sqlalchemy.Column(
sqlalchemy.DateTime, nullable=False)

View File

@ -20,10 +20,11 @@ Create Date: 2019-05-15 17:02:56.595274
"""
from alembic import op
from sqlalchemy.ext import declarative
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
# revision identifiers, used by Alembic.
revision = 'c50ed2c19204'

View File

@ -16,8 +16,6 @@
from oslo_db.sqlalchemy import models
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import schema
Base = declarative.declarative_base()
@ -28,7 +26,7 @@ class IdentifierState(Base, models.ModelBase):
@declarative.declared_attr
def __table_args__(cls):
return (
schema.UniqueConstraint(
sqlalchemy.schema.UniqueConstraint(
'identifier',
'scope_key',
'collector',
@ -54,3 +52,8 @@ class IdentifierState(Base, models.ModelBase):
unique=False)
last_processed_timestamp = sqlalchemy.Column(
sqlalchemy.DateTime, nullable=False)
scope_activation_toggle_date = sqlalchemy.Column(
'scope_activation_toggle_date', sqlalchemy.DateTime, nullable=False,
server_default=sqlalchemy.sql.func.now())
active = sqlalchemy.Column('active', sqlalchemy.Boolean, nullable=False,
default=True)

View File

@ -97,6 +97,10 @@
# PUT /v2/scope
#"scope:reset_state": "role:admin"
# Enables operators to patch a storage scope
# PATCH /v2/scope
#"scope:patch_state": "role:admin"
# Get a rating summary
# GET /v2/summary
#"summary:get_summary": "rule:admin_or_owner"

View File

@ -5,21 +5,24 @@
"fetcher": "keystone",
"scope_id": "7a7e5183264644a7a79530eb56e59941",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-09 10:00:00"
"last_processed_timestamp": "2019-05-09 10:00:00",
"active": true
},
{
"collector": "gnocchi",
"fetcher": "keystone",
"scope_id": "9084fadcbd46481788e0ad7405dcbf12",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-08 03:00:00"
"last_processed_timestamp": "2019-05-08 03:00:00",
"active": true
},
{
"collector": "gnocchi",
"fetcher": "keystone",
"scope_id": "1f41d183fca5490ebda5c63fbaca026a",
"scope_key": "project_id",
"last_processed_timestamp": "2019-05-06 22:00:00"
"last_processed_timestamp": "2019-05-06 22:00:00",
"active": true
}
]
}

View File

@ -43,6 +43,7 @@ Response
- last_processed_timestamp: last_processed_timestamp
- scope_id: scope_id_resp
- scope_key: scope_key_resp
- active: active_key_resp
Response Example
----------------
@ -81,3 +82,53 @@ Status codes
- 403
- 404
- 405
Patch a scope
================================
Patches/updates a scope.
.. rest_method:: PATCH /v2/scope
.. rest_parameters:: scope/scope_parameters.yml
- collector: collector
- fetcher: fetcher
- limit: limit
- offset: offset
- scope_id: scope_id
- scope_key: scope_key
- active: active_body
Status codes
------------
.. rest_status_code:: success http_status.yml
- 200
.. rest_status_code:: error http_status.yml
- 400
- 403
- 404
- 405
Response
--------
.. rest_parameters:: scope/scope_parameters.yml
- collector: collector_resp
- fetcher: fetcher_resp
- state: state
- scope_id: scope_id_resp
- scope_key: scope_key_resp
- active: active_key_resp
Response Example
----------------
.. literalinclude:: ./api_samples/scope/scope_get.json
:language: javascript

View File

@ -40,6 +40,21 @@ scope_key: &scope_key
type: string
required: false
active_anchor_query: &active_query
in: body
description: |
Defines if a scope should be processed or not; `True` means that
CloudKitty must process the scope.
type: bool
required: true
active_body:
<<: *active_query
required: false
active_key_resp:
<<: *active_query
all_scopes: &all_scopes
in: body
description: |

View File

@ -0,0 +1,4 @@
---
features:
- |
Add active status option in the storage state table and API.