Merge pull request #8 from hpcloud-mon/limitfixes
added get specific transform and bug fixes
This commit is contained in:
commit
1aa41b65dd
|
@ -21,7 +21,7 @@ import yaml
|
|||
|
||||
from monascaclient import ksclient
|
||||
|
||||
events_url = "http://192.168.10.4:8082"
|
||||
events_url = "http://127.0.0.1:8082"
|
||||
|
||||
|
||||
def token():
|
||||
|
@ -133,7 +133,7 @@ def test_stream_definition_delete():
|
|||
data=json.dumps(body),
|
||||
headers=headers)
|
||||
stream_dict = json.loads(stream_resp.text)
|
||||
stream_id = str(stream_dict[0]['id'])
|
||||
stream_id = str(stream_dict[0]['elements']['id'])
|
||||
response = requests.delete(
|
||||
url=events_url + "/v2.0/stream-definitions/{}".format(
|
||||
stream_id),
|
||||
|
@ -181,7 +181,7 @@ def test_transforms():
|
|||
data=json.dumps(body),
|
||||
headers=headers)
|
||||
transform_dict = json.loads(response.text)
|
||||
transform_dict_id = transform_dict[0]['id']
|
||||
transform_dict_id = transform_dict['elements'][0]['id']
|
||||
response = requests.delete(
|
||||
url=events_url + "/v2.0/transforms/{}".format(transform_dict_id),
|
||||
data=json.dumps(body),
|
||||
|
@ -189,8 +189,8 @@ def test_transforms():
|
|||
assert response.status_code == 204
|
||||
print("DELETE /transforms success")
|
||||
|
||||
test_stream_definition_post()
|
||||
test_stream_definition_get()
|
||||
test_stream_definition_delete()
|
||||
test_events_get_all()
|
||||
# test_stream_definition_post()
|
||||
# test_stream_definition_get()
|
||||
# test_stream_definition_delete()
|
||||
# test_events_get_all()
|
||||
test_transforms()
|
||||
|
|
|
@ -95,6 +95,9 @@ class StreamsRepository(mysql_repository.MySQLRepository,
|
|||
else:
|
||||
order_by_clause = " order by sd.created_at "
|
||||
limit_clause = ""
|
||||
if limit:
|
||||
limit_clause = " limit %s"
|
||||
parms.append(int(limit))
|
||||
|
||||
query = select_clause + where_clause + order_by_clause + limit_clause
|
||||
|
||||
|
|
|
@ -18,14 +18,17 @@ from oslo_utils import timeutils
|
|||
|
||||
from monasca_events_api.common.repositories.mysql import mysql_repository
|
||||
from monasca_events_api.common.repositories import transforms_repository
|
||||
from monasca_events_api.common.repositories import constants
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class TransformsRepository(mysql_repository.MySQLRepository,
|
||||
transforms_repository.TransformsRepository):
|
||||
|
||||
def create_transforms(self, id, tenant_id, name, description,
|
||||
specification, enabled):
|
||||
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
with cnxn:
|
||||
now = timeutils.utcnow()
|
||||
|
@ -51,15 +54,48 @@ class TransformsRepository(mysql_repository.MySQLRepository,
|
|||
else:
|
||||
raise e
|
||||
|
||||
def list_transforms(self, tenant_id):
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
with cnxn:
|
||||
cursor.execute("""select * from event_transform
|
||||
where tenant_id = %s and deleted_at IS NULL""", [tenant_id])
|
||||
return cursor.fetchall()
|
||||
def list_transforms(self, tenant_id, limit=None, offset=None):
|
||||
base_query = """select * from event_transform where deleted_at IS NULL"""
|
||||
tenant_id_clause = " and tenant_id = \"{}\"".format(tenant_id)
|
||||
order_by_clause = " order by id"
|
||||
|
||||
offset_clause = ' '
|
||||
if offset:
|
||||
offset_clause = " and id > \"{}\"".format(offset)
|
||||
|
||||
if not limit:
|
||||
limit = constants.PAGE_LIMIT
|
||||
limit_clause = " limit {}".format(limit)
|
||||
|
||||
query = (base_query +
|
||||
tenant_id_clause +
|
||||
offset_clause +
|
||||
order_by_clause +
|
||||
limit_clause)
|
||||
|
||||
rows = self._execute_query(query, [])
|
||||
return rows
|
||||
|
||||
def list_transform(self, tenant_id, transform_id):
|
||||
base_query = """select * from event_transform where deleted_at IS NULL"""
|
||||
tenant_id_clause = " and tenant_id = \"{}\"".format(tenant_id)
|
||||
transform_id_clause = " and id = \"{}\"".format(transform_id)
|
||||
|
||||
query = (base_query+
|
||||
tenant_id_clause+
|
||||
transform_id_clause)
|
||||
|
||||
rows = self._execute_query(query, [])
|
||||
return rows
|
||||
|
||||
def delete_transform(self, tenant_id, transform_id):
|
||||
cnxn, cursor = self._get_cnxn_cursor_tuple()
|
||||
with cnxn:
|
||||
cursor.execute("""delete from event_transform
|
||||
where id = %s and tenant_id = %s""", (transform_id, tenant_id))
|
||||
now = timeutils.utcnow()
|
||||
base_query = "update event_transform set deleted_at = \"{}\" where"\
|
||||
.format(now)
|
||||
tenant_id_clause = " tenant_id = \"{}\"".format(tenant_id)
|
||||
transform_id_clause = " and id = \"{}\"".format(transform_id)
|
||||
|
||||
query = (base_query +
|
||||
tenant_id_clause +
|
||||
transform_id_clause)
|
||||
self._execute_query(query, [])
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
import datetime
|
||||
import json
|
||||
import urlparse
|
||||
import urllib
|
||||
|
||||
import falcon
|
||||
from oslo_log import log
|
||||
|
@ -244,46 +245,46 @@ def get_query_period(req):
|
|||
raise falcon.HTTPBadRequest('Bad request', ex.message)
|
||||
|
||||
|
||||
def paginate(resource, uri, offset):
|
||||
def paginate(resource, uri):
|
||||
|
||||
if offset is not None:
|
||||
limit = constants.PAGE_LIMIT
|
||||
|
||||
if resource:
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
|
||||
if len(resource) >= constants.PAGE_LIMIT:
|
||||
self_link = build_base_uri(parsed_uri)
|
||||
|
||||
new_offset = resource[-1]['id']
|
||||
if resource and len(resource) >= limit:
|
||||
|
||||
parsed_uri = urlparse.urlparse(uri)
|
||||
if 'timestamp' in resource[limit - 1]:
|
||||
new_offset = resource[limit - 1]['timestamp']
|
||||
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
if 'id' in resource[limit - 1]:
|
||||
new_offset = resource[limit - 1]['id']
|
||||
|
||||
new_query_params = [u'offset' + '=' + str(new_offset).decode(
|
||||
'utf8')]
|
||||
next_link = build_base_uri(parsed_uri)
|
||||
|
||||
for query_param in parsed_uri.query.split('&'):
|
||||
query_param_name, query_param_val = query_param.split('=')
|
||||
if query_param_name.lower() != 'offset':
|
||||
new_query_params.append(query_param)
|
||||
new_query_params = [u'offset' + '=' + urllib.quote(
|
||||
new_offset.encode('utf8'), safe='')]
|
||||
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
if new_query_params:
|
||||
next_link += '?' + '&'.join(new_query_params)
|
||||
|
||||
resource = {u'links':
|
||||
[{u'rel': u'self', u'href': uri.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}],
|
||||
u'elements': resource}
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')},
|
||||
{u'rel': u'next',
|
||||
u'href': next_link.decode('utf8')}]),
|
||||
u'elements': resource[:limit]}
|
||||
|
||||
else:
|
||||
else:
|
||||
|
||||
resource = {u'links':
|
||||
[{u'rel': u'self', u'href': uri.decode('utf8')}],
|
||||
u'elements': resource}
|
||||
resource = {u'links': ([{u'rel': u'self',
|
||||
u'href': self_link.decode('utf8')}]),
|
||||
u'elements': resource}
|
||||
|
||||
return resource
|
||||
|
||||
|
||||
def paginate_measurement(measurement, uri, offset):
|
||||
def paginate_measurement(measurement, uri):
|
||||
|
||||
if offset is not None:
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import collections
|
||||
import re
|
||||
|
||||
import falcon
|
||||
from oslo_config import cfg
|
||||
|
@ -56,7 +57,11 @@ class Events(events_api_v2.EventsV2API):
|
|||
tenant_id = helpers.get_tenant_id(req)
|
||||
|
||||
if event_id:
|
||||
result = self._list_event(tenant_id, event_id, req.uri)
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
result = self._list_event(tenant_id, event_id)
|
||||
helpers.add_links_to_resource(
|
||||
result[0], re.sub('/' + event_id, '', req.uri))
|
||||
res.body = helpers.dumpit_utf8(result)
|
||||
res.status = falcon.HTTP_200
|
||||
else:
|
||||
|
@ -111,10 +116,10 @@ class Events(events_api_v2.EventsV2API):
|
|||
@resource.resource_try_catch_block
|
||||
def _list_events(self, tenant_id, uri, offset, limit):
|
||||
rows = self._events_repo.list_events(tenant_id, offset, limit)
|
||||
return helpers.paginate(self._build_events(rows), uri, offset)
|
||||
return helpers.paginate(self._build_events(rows), uri)
|
||||
|
||||
@resource.resource_try_catch_block
|
||||
def _list_event(self, tenant_id, event_id, uri):
|
||||
def _list_event(self, tenant_id, event_id):
|
||||
rows = self._events_repo.list_event(tenant_id, event_id)
|
||||
return self._build_events(rows)
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ class StreamDefinitions(stream_definitions_api_v2.StreamDefinitionsV2API):
|
|||
helpers.add_links_to_resource(sd, req_uri)
|
||||
result.append(sd)
|
||||
|
||||
result = helpers.paginate(result, req_uri, offset)
|
||||
result = helpers.paginate(result, req_uri)
|
||||
|
||||
return result
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ast
|
||||
import re
|
||||
import datetime
|
||||
import json
|
||||
from time import mktime
|
||||
|
@ -72,11 +72,25 @@ class Transforms(transforms_api_v2.TransformsV2API):
|
|||
res.body = self._create_transform_response(transform_id, transform)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
def on_get(self, req, res):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
res.body = self._list_transforms(tenant_id)
|
||||
res.status = falcon.HTTP_200
|
||||
def on_get(self, req, res, transform_id=None):
|
||||
if transform_id:
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
result = self._list_transform(tenant_id, transform_id, req.uri)
|
||||
helpers.add_links_to_resource(
|
||||
result, re.sub('/' + transform_id, '', req.uri))
|
||||
res.body = json.dumps(result, cls=MyEncoder)
|
||||
res.status = falcon.HTTP_200
|
||||
else:
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
tenant_id = helpers.get_tenant_id(req)
|
||||
limit = helpers.get_query_param(req, 'limit')
|
||||
offset = helpers.normalize_offset(helpers.get_query_param(
|
||||
req,
|
||||
'offset'))
|
||||
result = self._list_transforms(tenant_id, limit, offset, req.uri)
|
||||
res.body = json.dumps(result, cls=MyEncoder)
|
||||
res.status = falcon.HTTP_200
|
||||
|
||||
def on_delete(self, req, res, transform_id):
|
||||
helpers.validate_authorization(req, self._default_authorized_roles)
|
||||
|
@ -149,13 +163,24 @@ class Transforms(transforms_api_v2.TransformsV2API):
|
|||
'specification': specification, 'enabled': enabled}
|
||||
return json.dumps(response)
|
||||
|
||||
def _list_transforms(self, tenant_id):
|
||||
def _list_transforms(self, tenant_id, limit, offset, uri):
|
||||
try:
|
||||
transforms = self._transforms_repo.list_transforms(tenant_id)
|
||||
for transform in transforms:
|
||||
transform['specification'] = yaml.safe_dump(
|
||||
ast.literal_eval(transform['specification']))
|
||||
return json.dumps(transforms, cls=MyEncoder)
|
||||
transforms = self._transforms_repo.list_transforms(tenant_id,
|
||||
limit, offset)
|
||||
transforms = helpers.paginate(transforms, uri)
|
||||
return transforms
|
||||
except repository_exceptions.RepositoryException as ex:
|
||||
LOG.error(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable',
|
||||
ex.message)
|
||||
|
||||
def _list_transform(self, tenant_id, transform_id, uri):
|
||||
try:
|
||||
transform = self._transforms_repo.list_transform(tenant_id,
|
||||
transform_id)[0]
|
||||
transform['specification'] = yaml.safe_dump(
|
||||
transform['specification'])
|
||||
return transform
|
||||
except repository_exceptions.RepositoryException as ex:
|
||||
LOG.error(ex)
|
||||
raise falcon.HTTPInternalServerError('Service unavailable',
|
||||
|
|
Loading…
Reference in New Issue