Multiples
Add support for storing when the account was last polled - used as the starting time for the next poll if it exists. Change id to id_ in _get_id. Move the self.admin_context into the base RecorderEngine
This commit is contained in:
parent
7cb8c8766f
commit
f340ffd6d2
|
@ -168,6 +168,17 @@ def delete_system_account(context, account_id):
|
|||
return RPC.call(context, msg)
|
||||
|
||||
|
||||
def set_polled_at(context, account_id, time):
|
||||
msg = {
|
||||
"method": "set_polled_at",
|
||||
"args": {
|
||||
"account_id": account_id,
|
||||
"time": time
|
||||
}
|
||||
}
|
||||
return RPC.call(context, msg)
|
||||
|
||||
|
||||
def process_record(context, values):
|
||||
msg = {
|
||||
"method": "process_record",
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
# NOTE(zykes): Copied verbatim from Moniker
|
||||
from bufunfa.openstack.common import cfg
|
||||
from bufunfa.openstack.common import log
|
||||
from bufunfa.openstack.common import timeutils
|
||||
from bufunfa.openstack.common.rpc import service as rpc_service
|
||||
from bufunfa import exceptions
|
||||
from bufunfa import storage
|
||||
|
@ -80,14 +81,42 @@ class Service(rpc_service.Service):
|
|||
def delete_system_account(self, context, account_id):
|
||||
return self.storage_conn.delete_rate(context, account_id)
|
||||
|
||||
def set_polled_at(self, context, account_id, time):
|
||||
"""
|
||||
Set when the account was last polled in the system
|
||||
|
||||
:param context: RPC context
|
||||
:param account_id: The Account ID in the System
|
||||
:param time_stamp: Timestamp of when it was polled
|
||||
"""
|
||||
time = timeutils.parse_strtime(time)
|
||||
|
||||
account = self.storage_conn.get_system_account(context, account_id)
|
||||
polled_at = account['polled_at']
|
||||
|
||||
if polled_at and time < polled_at:
|
||||
raise exceptions.TooOld("Timestamp is older then the last poll")
|
||||
|
||||
return self.storage_conn.update_system_account(context, account_id,
|
||||
{'polled_at': time})
|
||||
|
||||
def process_records(self, context, records):
|
||||
"""
|
||||
Process records in a batch
|
||||
|
||||
:param context: RPC context
|
||||
:param records: A list of records
|
||||
"""
|
||||
for record in records:
|
||||
self.process_record(context, record)
|
||||
|
||||
def process_record(self, context, values):
|
||||
"""
|
||||
Process a Record
|
||||
|
||||
:param context: RPC context
|
||||
:param values: Values for the record
|
||||
"""
|
||||
# NOTE: Add the system if it doesn't exist..
|
||||
try:
|
||||
self.storage_conn.get_system_account(
|
||||
|
|
|
@ -41,3 +41,7 @@ class Duplicate(Base):
|
|||
|
||||
class NotFound(Base):
|
||||
pass
|
||||
|
||||
|
||||
class TooOld(Base):
|
||||
pass
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from stevedore import driver
|
||||
from bufunfa.openstack.common import cfg
|
||||
from bufunfa.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import abc
|
||||
|
||||
from bufunfa.openstack.common import cfg
|
||||
from bufunfa.openstack.common.context import get_admin_context
|
||||
|
||||
|
||||
cfg.CONF.register_opt(
|
||||
|
@ -30,7 +31,7 @@ cfg.CONF.register_opt(
|
|||
)
|
||||
|
||||
|
||||
class RecorderEngine(object):
|
||||
class BaseEngine(object):
|
||||
"""
|
||||
Base Record engine for getting Records from external systems
|
||||
"""
|
||||
|
@ -60,3 +61,8 @@ class RecorderEngine(object):
|
|||
end_timestamp: End of the pulling period
|
||||
system_account_id: The account id in the external system
|
||||
"""
|
||||
|
||||
|
||||
class RecorderEngine(BaseEngine):
|
||||
def __init__(self):
|
||||
self.admin_context = get_admin_context()
|
||||
|
|
|
@ -20,7 +20,9 @@ import ceilometerclient
|
|||
|
||||
from bufunfa.openstack.common import cfg
|
||||
from bufunfa.openstack.common import log
|
||||
from bufunfa.openstack.common.context import get_admin_context
|
||||
from bufunfa.openstack.common import timeutils
|
||||
from bufunfa.openstack.common.rpc.common import RemoteError
|
||||
from bufunfa import exceptions
|
||||
from bufunfa.central import api as central_api
|
||||
from bufunfa.recorder.openstack import OpenstackEngine
|
||||
|
||||
|
@ -48,15 +50,32 @@ class RecordEngine(OpenstackEngine):
|
|||
LOG.exception(e)
|
||||
return
|
||||
|
||||
poll_start = datetime.now() - timedelta(seconds=cfg.CONF.poll_age)
|
||||
|
||||
for project_id in projects:
|
||||
if project_id is None:
|
||||
continue
|
||||
|
||||
started = datetime.now()
|
||||
|
||||
start_timestamp = self.get_poll_start(project_id)
|
||||
|
||||
project_records = self.get_project_records_between(project_id,
|
||||
start_timestamp=poll_start)
|
||||
admin_context = get_admin_context()
|
||||
central_api.process_records(admin_context, project_records)
|
||||
start_timestamp=start_timestamp)
|
||||
central_api.process_records(self.admin_context, project_records)
|
||||
|
||||
central_api.set_polled_at(self.admin_context, project_id, started)
|
||||
|
||||
def get_poll_start(self, project_id):
|
||||
"""
|
||||
Get poll start time
|
||||
|
||||
:param project_id: The project ID
|
||||
"""
|
||||
try:
|
||||
account = central_api.get_system_account(self.admin_context, project_id)
|
||||
except RemoteError:
|
||||
return
|
||||
polled_at = timeutils.parse_strtime(account['polled_at'])
|
||||
return polled_at
|
||||
|
||||
def get_project_records_between(self, project_id, start_timestamp=None,
|
||||
end_timestamp=None):
|
||||
|
@ -100,6 +119,10 @@ class RecordEngine(OpenstackEngine):
|
|||
start_timestamp=start_timestamp, end_timestamp=end_timestamp
|
||||
)
|
||||
|
||||
#if not duration_info['start_timestamp'] and \
|
||||
# not duration_info['end_timestamp']:
|
||||
#return
|
||||
|
||||
volume = volume or duration_info.get('duration')
|
||||
|
||||
# NOTE: Not sure on this but I think we can skip returning events that
|
||||
|
|
|
@ -62,14 +62,14 @@ class Connection(base.Connection):
|
|||
""" Semi-Private Method to reset the database schema """
|
||||
models.Base.metadata.drop_all(self.session.bind)
|
||||
|
||||
def _get_id(self, model, context, id):
|
||||
def _get_id(self, model, context, id_):
|
||||
"""
|
||||
Helper to not write the same code x times
|
||||
"""
|
||||
query = self.session.query(model)
|
||||
obj = query.get(id)
|
||||
obj = query.get(id_)
|
||||
if not obj:
|
||||
raise exceptions.NotFound(id)
|
||||
raise exceptions.NotFound(id_)
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
|
|
@ -142,6 +142,7 @@ class SystemAccount(Base):
|
|||
|
||||
id = Column(Unicode(40), primary_key=True)
|
||||
name = Column(Unicode(100))
|
||||
polled_at = Column(DateTime)
|
||||
|
||||
account = relationship("Account", backref="systems")
|
||||
account_id = Column(UUID, ForeignKey('accounts.id'))
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from datetime import datetime, timedelta
|
||||
import random
|
||||
from bufunfa.openstack.common import log as logging
|
||||
from bufunfa.tests.test_central import CentralTestCase
|
||||
from bufunfa import exceptions
|
||||
|
@ -34,16 +33,78 @@ class ServiceTest(CentralTestCase):
|
|||
'account_id': 'c97027dd880d4c129ae7a4ba7edade05'
|
||||
}
|
||||
|
||||
rates = [
|
||||
{'name': 'cpu', 'value': 1},
|
||||
{'name': 'memory', 'value': 2}
|
||||
]
|
||||
|
||||
accounts = [
|
||||
{'name': 'customer_a'}
|
||||
]
|
||||
|
||||
system_accounts = [
|
||||
{'name': 'system_a', 'id': 'd44f1779-5034-455e-b334-cac2ac3eee33'},
|
||||
{'name': 'system_b', 'id': 'a45e43af-090b-4045-ae78-6a9d507d1418'}
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(ServiceTest, self).setUp()
|
||||
self.config(rpc_backend='bufunfa.openstack.common.rpc.impl_fake')
|
||||
self.service = self.get_central_service()
|
||||
self.admin_context = self.get_admin_context()
|
||||
|
||||
def add_rate(self, fixture=0, context=None, values={}):
|
||||
context = context or self.get_admin_context()
|
||||
values = self.rates[fixture]
|
||||
values.update(values)
|
||||
return self.service.add_rate(context, values)
|
||||
|
||||
def add_account(self, fixture=0, context=None, values={}):
|
||||
context = context or self.get_admin_context()
|
||||
values = self.accounts[fixture]
|
||||
values.update(values)
|
||||
return self.service.add_account(context, values)
|
||||
|
||||
def add_system_account(self, fixture=0, context=None, values={}):
|
||||
context = context or self.get_admin_context()
|
||||
values = self.system_accounts[fixture]
|
||||
values.update(values)
|
||||
return self.service.add_system_account(context, values)
|
||||
|
||||
def test_process_record_unexisting_system(self):
|
||||
"""
|
||||
If the system we we're receiving a record from doesn't have a system
|
||||
account entry we'll create one
|
||||
"""
|
||||
self.service.process_record(
|
||||
self.admin_context, self.record)
|
||||
|
||||
system = self.service.storage_conn.get_system_account(
|
||||
self.admin_context, self.record['account_id'])
|
||||
self.assertEquals(system.id, self.record['account_id'])
|
||||
|
||||
def test_set_polled_at(self):
|
||||
"""
|
||||
Set the last time the SystemAccount was polled
|
||||
"""
|
||||
account_id = str(self.add_system_account()['id'])
|
||||
now = datetime.now()
|
||||
self.service.set_polled_at(self.admin_context, account_id, now)
|
||||
|
||||
account = self.service.get_system_account(self.admin_context, account_id)
|
||||
self.assertEquals(account["polled_at"], now)
|
||||
|
||||
def test_set_polled_at_too_old(self):
|
||||
"""
|
||||
Shouldn't be allowed to set polled_at older then the current one in
|
||||
SystemAccount
|
||||
"""
|
||||
account_id = str(self.add_system_account()['id'])
|
||||
now = datetime.now()
|
||||
self.service.set_polled_at(
|
||||
self.admin_context, account_id, now)
|
||||
|
||||
with self.assertRaises(exceptions.TooOld):
|
||||
self.service.set_polled_at(
|
||||
self.admin_context, account_id,
|
||||
now - timedelta(1))
|
||||
|
|
Loading…
Reference in New Issue