Validating message size

Received request's content should be validated if
its size does not exceed allowed value. Bytes refers
to byte size of the object instead of amount
of characters. This change is required in order
to reject those meesages that couldn't be processed
by Kafka queue.

Changed:
- added payload size validation to Python
- added validation of message size that it sent to Kafka
- reworked validation of message size in Java

Change-Id: I2acc647550d7c851a5715a7cf44f749db1f54d7b
This commit is contained in:
Tomasz Trębski 2015-12-04 07:37:14 +01:00
parent d2d50f42d4
commit 1feaa74013
18 changed files with 705 additions and 53 deletions

View File

@ -11,6 +11,7 @@ logs = monasca_log_api.v2.reference.logs:Logs
versions = monasca_log_api.v2.reference.versions:Versions
[service]
max_log_size = 1048576
region = 'pl'
[log_publisher]

View File

@ -15,4 +15,4 @@ use = egg:gunicorn#main
host = 127.0.0.1
port = 8082
workers = 1
proc_name = monasca_log_api
proc_name = monasca_log_api

View File

@ -21,14 +21,24 @@ import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.NotEmpty;
import monasca.common.messaging.kafka.KafkaConfiguration;
import monasca.log.api.common.LogApiConstants;
import monasca.log.api.infrastructure.middleware.MiddlewareConfiguration;
public class ApiConfig extends Configuration {
/**
* Refers to payload/envelope size.
* If either is exceeded API will throw an error.
*/
private static final Integer DEFAULT_LOG_SIZE = LogApiConstants.MAX_LOG_LENGTH;
@NotEmpty
public String region;
@NotEmpty
public String logTopic = "log";
@Valid
public Integer logSize = DEFAULT_LOG_SIZE;
@Valid
@NotNull
public KafkaConfiguration kafka;
@Valid

View File

@ -16,8 +16,8 @@ package monasca.log.api.app;
import static monasca.log.api.common.LogApiConstants.LOG_MARKER;
import static monasca.log.api.common.LogApiConstants.LOG_MARKER_KAFKA;
import static monasca.log.api.common.LogApiConstants.LOG_MARKER_WARN;
import static monasca.log.api.common.LogApiConstants.MAX_LOG_LENGTH;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Map.Entry;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import com.google.common.base.Preconditions;
@ -134,9 +135,6 @@ public class LogService {
if (log.getDimensions() != null) {
DimensionValidation.validate(log.getDimensions(), null);
}
if (log.getMessage().length() > MAX_LOG_LENGTH) {
throw Exceptions.unprocessableEntity("Log must be %d characters or less", MAX_LOG_LENGTH);
}
} catch (Exception exp) {
LOGGER.warn(LOG_MARKER_WARN, "Log {} not valid, error is {}", log, exp);
throw exp;
@ -147,10 +145,15 @@ public class LogService {
}
public void sendToKafka(Log log, String tenantId) {
final String envelope = this.serializer
.logEnvelopeToJson(this.newLogEnvelope(log, tenantId));
this.validateEnvelopeSize(envelope);
final KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
this.config.logTopic,
this.buildKey(tenantId, log),
this.serializer.logEnvelopeToJson(this.newLogEnvelope(log, tenantId))
envelope
);
LOGGER.debug(LOG_MARKER_KAFKA, "Shipping kafka message {}", keyedMessage);
@ -158,6 +161,47 @@ public class LogService {
this.producer.send(keyedMessage);
}
public void validateContentLength(final Integer contentLength) {
LOGGER.debug("validateContentLength(length=%d)", contentLength);
if (contentLength == null) {
throw Exceptions.lengthRequired(
"Content length header is missing",
"Content length is required to estimate if payload can be processed"
);
}
if (contentLength >= this.config.logSize) {
throw Exceptions.payloadTooLarge(
"Log payload size exceeded",
String.format("Maximum allowed size is %d bytes", this.config.logSize)
);
}
}
public void validateContentType(final MediaType contentType) {
if(contentType == null){
throw Exceptions.headerMissing(HttpHeaders.CONTENT_TYPE);
}
}
public void validateEnvelopeSize(final String envelope) {
if (!StringUtils.isEmpty(envelope)) {
// that must be length in bytes in common encoding
final int size = envelope.getBytes(Charset.forName("UTF-8")).length;
if (size >= this.config.logSize) {
throw Exceptions.internalServerError(
"Envelope size exceeded",
String.format("Maximum allowed size is %d bytes", this.config.logSize),
null
);
}
}
}
protected LogEnvelope newLogEnvelope(final Log log, final String tenantId) {
return new LogEnvelope(
log,

View File

@ -20,7 +20,6 @@ import org.slf4j.MarkerFactory;
public final class LogApiConstants {
public static final Marker LOG_MARKER = MarkerFactory.getMarker("log-api");
public static final Marker LOG_MARKER_ERROR = MarkerFactory.getMarker("log-api-error");
public static final Marker LOG_MARKER_WARN = MarkerFactory.getMarker("log-api-warn");
public static final Marker LOG_MARKER_KAFKA = MarkerFactory.getMarker("log-api-kafka");

View File

@ -22,6 +22,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Request;
@ -29,6 +30,7 @@ import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.sun.jersey.api.core.HttpRequestContext;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +71,11 @@ public class LogResource {
LOGGER.debug("/single/{}", tenantId);
final MediaType contentType = this.getContentType(request);
this.service.validateContentLength(this.getContentLength(request));
this.service.validateContentType(contentType);
if (!this.isDelegate(roles)) {
LOGGER.trace(String.format("/single/%s is not delegated request, checking for crossTenantId",
tenantId));
@ -81,7 +88,7 @@ public class LogResource {
new LogRequestBean()
.setApplicationType(applicationType)
.setDimensions(this.getDimensions(dimensionsStr))
.setContentType(this.getContentType(request))
.setContentType(contentType)
.setPayload(payload),
VALIDATE_LOG
);
@ -96,6 +103,11 @@ public class LogResource {
return ((HttpRequestContext) request).getMediaType();
}
private Integer getContentLength(final Request request){
final String value = ((HttpRequestContext) request).getHeaderValue(HttpHeaders.CONTENT_LENGTH);
return StringUtils.isNotEmpty(value) ? Integer.valueOf(value) : null;
}
private String getTenantId(final String tenantId, final String crossTenantId) {
return Strings.isNullOrEmpty(crossTenantId) ? tenantId : crossTenantId;
}

View File

@ -21,13 +21,12 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.common.base.Splitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Exception factory methods.
@ -52,11 +51,14 @@ public final class Exceptions {
NOT_FOUND(Status.NOT_FOUND, true),
CONFLICT(Status.CONFLICT, true),
UNPROCESSABLE_ENTITY(422, true),
FORBIDDEN(Status.FORBIDDEN, true);
FORBIDDEN(Status.FORBIDDEN, true),
LENGTH_REQUIRED(411, true),
PAYLOAD_TOO_LARGE(413, true),
MISSING_HEADER(Status.BAD_REQUEST, true);
public final int statusCode;
public final boolean loggable;
public final boolean loggable;
FaultType(int statusCode, boolean loggable) {
this.statusCode = statusCode;
this.loggable = loggable;
@ -71,17 +73,17 @@ public final class Exceptions {
public String toString() {
return name().toLowerCase();
}
}
private static class WebAppException extends WebApplicationException {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;
public WebAppException(FaultType faultType, String message) {
super(Response.status(faultType.statusCode).entity(message).type(MediaType.APPLICATION_JSON)
.build());
}
}
}
private Exceptions() {}
public static WebApplicationException badRequest(String msg, Object... args) {
@ -166,4 +168,27 @@ public final class Exceptions {
return new WebAppException(FaultType.UNPROCESSABLE_ENTITY, buildLoggedErrorMessage(
FaultType.UNPROCESSABLE_ENTITY, msg, details, exception));
}
public static WebApplicationException lengthRequired(final String msg, final String details) {
final FaultType faultType = FaultType.LENGTH_REQUIRED;
return new WebAppException(faultType, buildLoggedErrorMessage(faultType, msg, details));
}
public static WebApplicationException payloadTooLarge(final String msg, final String details) {
final FaultType faultType = FaultType.PAYLOAD_TOO_LARGE;
return new WebAppException(faultType, buildLoggedErrorMessage(faultType, msg, details));
}
public static WebApplicationException internalServerError(final String msg, final String
details, @Nullable final Throwable error) {
final FaultType faultType = FaultType.SERVER_ERROR;
return new WebAppException(faultType, buildLoggedErrorMessage(faultType, msg, details, error));
}
public static WebApplicationException headerMissing(final String headerName) {
final FaultType faultType = FaultType.MISSING_HEADER;
final String details = String.format("The %s header is required", headerName);
return new WebAppException(faultType, buildLoggedErrorMessage(faultType, "Missing header value", details));
}
}

View File

@ -13,11 +13,14 @@
*/
package monasca.log.api.app;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -34,6 +37,7 @@ import monasca.log.api.ApiConfig;
import monasca.log.api.common.LogApiConstants;
import monasca.log.api.model.Log;
import monasca.log.api.model.LogEnvelope;
import monasca.log.api.utils.TestUtils;
@Test
public class LogServiceTest {
@ -48,23 +52,27 @@ public class LogServiceTest {
private Producer<String, String> producer;
private LogService logService;
private LogSerializer serializer;
private ObjectMapper mapper;
@BeforeTest
@SuppressWarnings("unchecked")
protected void beforeMethod() {
dimensions.clear();
dimensions.put("a", "b");
config = new ApiConfig();
config.region = REGION;
config.logTopic = TOPIC;
this.producer = Mockito.mock(Producer.class);
this.serializer = Mockito.spy(new LogSerializer(new ApplicationModule().objectMapper()));
this.mapper = new ApplicationModule().objectMapper();
this.serializer = Mockito.spy(new LogSerializer(this.mapper));
this.logService = Mockito.spy(new LogService(this.config, this.producer, this.serializer));
}
public void testValidate_shouldFail_ApplicationType_TooLarge() {
final String str = this.generateRandomStr(LogApiConstants.MAX_NAME_LENGTH + new Random().nextInt(10));
final String str = TestUtils.generateRandomStr(LogApiConstants.MAX_NAME_LENGTH + new Random()
.nextInt(10));
try {
final Log log = new Log(str, null, null);
this.logService.validate(log);
@ -98,11 +106,71 @@ public class LogServiceTest {
Mockito.verifyZeroInteractions(this.producer, this.serializer);
}
private String generateRandomStr(final int length) {
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < length; i++) {
builder.append(i);
}
return builder.toString();
public void testValidateContentLength_OK() {
final Integer contentLength = this.config.logSize / 2;
this.logService.validateContentLength(contentLength);
}
public void testValidateContentLength_PayloadTooLarge() throws IOException {
final Integer contentLength = this.config.logSize * 2;
try {
this.logService.validateContentLength(contentLength);
} catch (WebApplicationException exp) {
final Map map = this.mapper.readValue((String) exp.getResponse().getEntity(), Map.class);
Assert.assertTrue(map.containsKey("payload_too_large"));
}
}
public void testValidateContentLength_MissingHeader() throws IOException {
final Integer contentLength = null;
try {
this.logService.validateContentLength(contentLength);
} catch (WebApplicationException exp) {
final Map map = this.mapper.readValue((String) exp.getResponse().getEntity(), Map.class);
Assert.assertTrue(map.containsKey("length_required"));
}
}
public void testValidateEnvelopeSize_OK() {
this.config.logSize = 100;
final int length = this.config.logSize / 2;
final String msg = TestUtils.generateByteLengthString(length);
this.logService.validateEnvelopeSize(msg);
}
public void testValidateEnvelopeSize_Exceeded() throws IOException {
this.config.logSize = 100;
final int length = this.config.logSize * 2;
final String msg = TestUtils.generateByteLengthString(length);
try {
this.logService.validateEnvelopeSize(msg);
} catch (WebApplicationException exp) {
final Map map = this.mapper.readValue((String) exp.getResponse().getEntity(), Map.class);
Assert.assertTrue(map.containsKey("server_error"));
return;
}
Assert.assertFalse(true, "Should not happen");
}
public void testShouldThrowExceptionIfContentTypeNull() throws IOException {
try {
this.logService.validateContentType(null);
} catch (WebApplicationException exp){
final Map map = this.mapper.readValue((String) exp.getResponse().getEntity(), Map.class);
Assert.assertTrue(map.containsKey("missing_header"));
return;
}
Assert.assertFalse(true, "Should not happen");
}
public void testShouldNotThrowExceptionIfContentTypeSet() throws IOException {
this.logService.validateContentType(MediaType.APPLICATION_ATOM_XML_TYPE);
Assert.assertTrue(true);
}
}

View File

@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import com.google.common.collect.Maps;
@ -34,6 +35,7 @@ import com.sun.jersey.api.client.WebResource;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import monasca.log.api.ApiConfig;
import monasca.log.api.app.ApplicationModule;
import monasca.log.api.app.LogSerializer;
import monasca.log.api.app.LogService;
@ -42,6 +44,8 @@ import monasca.log.api.common.LogApiConstants;
import monasca.log.api.common.LogRequestBean;
import monasca.log.api.model.Log;
import monasca.log.api.resource.exception.ErrorMessages;
import monasca.log.api.resource.exception.Exceptions;
import monasca.log.api.utils.TestUtils;
@Test
public class LogResourceTest
@ -54,6 +58,7 @@ public class LogResourceTest
private String longString;
private LogService service;
private String jsonMessage;
private ApiConfig config;
@Override
@SuppressWarnings("unchecked")
@ -74,7 +79,9 @@ public class LogResourceTest
final LogSerializer serializer = new LogSerializer(new ApplicationModule().objectMapper());
service = Mockito.spy(new LogService(null, null, serializer));
config = new ApiConfig();
service = Mockito.spy(new LogService(config, null, serializer));
service.setJsonPayloadTransformer(new JsonPayloadTransformer(serializer));
doNothing().when(service).sendToKafka(any(Log.class), anyString());
@ -351,13 +358,21 @@ public class LogResourceTest
String tooLongMessage = buf.toString();
ClientResponse response = createResponseForJson(null, null, tooLongMessage);
ErrorMessages.assertThat(response.getEntity(String.class)).matches("unprocessable_entity", 422, "Log must be " + LogApiConstants.MAX_LOG_LENGTH + " characters or less");
ErrorMessages
.assertThat(response.getEntity(String.class))
.matches(
"payload_too_large",
Exceptions.FaultType.PAYLOAD_TOO_LARGE.statusCode,
"Log payload size exceeded"
);
}
public void shouldErrorOnCreateJsonMessageWithCrossTenant() {
ClientResponse response = createResponseForJsonWithCrossTenant(null, null, jsonPayload, "illegal-role", "def");
ErrorMessages.assertThat(response.getEntity(String.class)).matches("forbidden", 403, "Project abc cannot POST cross tenant");
ErrorMessages
.assertThat(response.getEntity(String.class))
.matches("forbidden", 403, "Project abc cannot POST cross tenant");
}
public void shouldCreateJsonMessageWithCrossTenant() {
@ -370,7 +385,11 @@ public class LogResourceTest
private ClientResponse createResponseForJson(String applicationType, String dimensions, String message) {
WebResource.Builder builder =
client().resource("/v2.0/log/single").header("X-Tenant-Id", tenantId).header("Content-Type", MediaType.APPLICATION_JSON);
client()
.resource("/v2.0/log/single")
.header("X-Tenant-Id", tenantId)
.header(HttpHeaders.CONTENT_LENGTH, message != null ? message.length() : null)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
if (applicationType != null)
builder = builder.header("X-Application-Type", applicationType);
if (dimensions != null)
@ -380,9 +399,12 @@ public class LogResourceTest
private ClientResponse createResponseForJsonWithCrossTenant(String applicationType, String dimensions, String message, String roles,
String crossTenantId) {
WebResource.Builder builder =
client().resource("/v2.0/log/single?tenant_id=" + crossTenantId).header("X-Tenant-Id", tenantId).header("X-Roles", roles)
.header("Content-Type", MediaType.APPLICATION_JSON);
WebResource.Builder builder = client()
.resource("/v2.0/log/single?tenant_id=" + crossTenantId)
.header("X-Tenant-Id", tenantId)
.header("X-Roles", roles)
.header(HttpHeaders.CONTENT_LENGTH, message != null ? message.length() : null)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
if (applicationType != null)
builder = builder.header("X-Application-Type", applicationType);
if (dimensions != null)

View File

@ -0,0 +1,47 @@
/*
* Copyright 2015 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.log.api.utils;
import java.nio.charset.Charset;
public class TestUtils {
public static String generateRandomStr(final int length) {
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < length; i++) {
builder.append(i);
}
return builder.toString();
}
public static String generateByteLengthString(final double length) {
int currentLength = 0;
int size;
String currentString;
do {
currentString = generateRandomStr(currentLength++);
size = currentString.getBytes(Charset.forName("UTF-8")).length;
if (size == length) {
break;
}
} while (currentLength < length);
return currentString;
}
}

View File

@ -58,7 +58,8 @@ class TestLogs(testing.TestBase):
headers={
headers.X_ROLES.name: 'some_role',
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'Content-Length': '0'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
@ -81,7 +82,8 @@ class TestLogs(testing.TestBase):
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'Content-Length': '0'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
@ -105,7 +107,8 @@ class TestLogs(testing.TestBase):
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'Content-Length': '0'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
@ -124,7 +127,8 @@ class TestLogs(testing.TestBase):
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json'
'Content-Type': 'application/json',
'Content-Length': '0'
}
)
self.assertEqual(log_api_exceptions.HTTP_422, self.srmock.status)
@ -136,7 +140,80 @@ class TestLogs(testing.TestBase):
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'video/3gpp'
'Content-Type': 'video/3gpp',
'Content-Length': '0'
}
)
self.assertEqual(falcon.HTTP_406, self.srmock.status)
self.assertEqual(falcon.HTTP_415, self.srmock.status)
def test_should_pass_payload_size_not_exceeded(self):
max_log_size = 1000
content_length = max_log_size - 100
self.conf.config(max_log_size=max_log_size, group='service')
self.logs_resource._log_creator = mock.Mock()
self.logs_resource._kafka_publisher = mock.Mock()
self.simulate_request(
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json',
'Content-Length': str(content_length)
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_should_fail_payload_size_exceeded(self):
max_log_size = 1000
content_length = max_log_size + 100
self.conf.config(max_log_size=max_log_size, group='service')
self.logs_resource._log_creator = mock.Mock()
self.logs_resource._kafka_publisher = mock.Mock()
self.simulate_request(
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json',
'Content-Length': str(content_length)
}
)
self.assertEqual(falcon.HTTP_413, self.srmock.status)
def test_should_fail_payload_size_equal(self):
max_log_size = 1000
content_length = max_log_size
self.conf.config(max_log_size=max_log_size, group='service')
self.logs_resource._log_creator = mock.Mock()
self.logs_resource._kafka_publisher = mock.Mock()
self.simulate_request(
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json',
'Content-Length': str(content_length)
}
)
self.assertEqual(falcon.HTTP_413, self.srmock.status)
def test_should_fail_content_length(self):
self.simulate_request(
'/log/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_411, self.srmock.status)

View File

@ -14,13 +14,17 @@
# under the License.
import datetime
import random
import string
import unittest
from falcon import errors
from falcon import testing
import mock
from monasca_log_api.api import exceptions
from monasca_log_api.api import logs_api
from monasca_log_api.tests import base
from monasca_log_api.v2.common import service as common_service
@ -210,6 +214,142 @@ class DimensionsValidations(unittest.TestCase):
common_service.Validations.validate_dimensions(dimensions)
class ContentTypeValidations(unittest.TestCase):
def test_should_pass_text_plain(self):
content_type = 'text/plain'
req = mock.Mock()
req.content_type = content_type
common_service.Validations.validate_content_type(req)
def test_should_pass_application_json(self):
content_type = 'application/json'
req = mock.Mock()
req.content_type = content_type
common_service.Validations.validate_content_type(req)
def test_should_fail_invalid_content_type(self):
content_type = 'no/such/type'
req = mock.Mock()
req.content_type = content_type
self.assertRaises(
errors.HTTPUnsupportedMediaType,
common_service.Validations.validate_content_type,
req
)
def test_should_fail_missing_header(self):
content_type = None
req = mock.Mock()
req.content_type = content_type
self.assertRaises(
errors.HTTPMissingHeader,
common_service.Validations.validate_content_type,
req
)
class PayloadSizeValidations(testing.TestBase):
def setUp(self):
self.conf = base.mock_config(self)
return super(PayloadSizeValidations, self).setUp()
def test_should_fail_missing_header(self):
content_length = None
req = mock.Mock()
req.content_length = content_length
self.assertRaises(
errors.HTTPLengthRequired,
common_service.Validations.validate_payload_size,
req
)
def test_should_pass_limit_not_exceeded(self):
content_length = 120
max_log_size = 240
self.conf.config(max_log_size=max_log_size,
group='service')
req = mock.Mock()
req.content_length = content_length
common_service.Validations.validate_payload_size(req)
def test_should_fail_limit_exceeded(self):
content_length = 120
max_log_size = 60
self.conf.config(max_log_size=max_log_size,
group='service')
req = mock.Mock()
req.content_length = content_length
self.assertRaises(
errors.HTTPRequestEntityTooLarge,
common_service.Validations.validate_payload_size,
req
)
def test_should_fail_limit_equal(self):
content_length = 120
max_log_size = 120
self.conf.config(max_log_size=max_log_size,
group='service')
req = mock.Mock()
req.content_length = content_length
self.assertRaises(
errors.HTTPRequestEntityTooLarge,
common_service.Validations.validate_payload_size,
req
)
class EnvelopeSizeValidations(testing.TestBase):
@staticmethod
def _rand_str(size):
return ''.join((random.choice(string.letters) for _ in range(size)))
def setUp(self):
self.conf = base.mock_config(self)
return super(EnvelopeSizeValidations, self).setUp()
def test_should_pass_envelope_size_ok(self):
envelope = self._rand_str(120)
max_log_size = 240
self.conf.config(max_log_size=max_log_size,
group='service')
common_service.Validations.validate_envelope_size(envelope)
def test_should_pass_envelope_size_exceeded(self):
envelope = self._rand_str(360)
max_log_size = 240
self.conf.config(max_log_size=max_log_size,
group='service')
self.assertRaises(
errors.HTTPInternalServerError,
common_service.Validations.validate_envelope_size,
envelope
)
def test_should_pass_envelope_size_equal(self):
envelope = self._rand_str(240)
max_log_size = 240
self.conf.config(max_log_size=max_log_size,
group='service')
self.assertRaises(
errors.HTTPInternalServerError,
common_service.Validations.validate_envelope_size,
envelope
)
class LogsCreatorNewLog(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()

View File

@ -18,6 +18,8 @@ from oslo_config import cfg
from oslo_log import log
import simplejson as json
from monasca_log_api.v2.common import service
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -107,7 +109,7 @@ class LogPublisher(object):
* log
* creation_time
If keys are found, each key must have a value.
If keys are found, each key must have a valueH.
If at least none of the conditions is met
:py:class:`.InvalidMessageException` is raised
@ -159,6 +161,8 @@ class LogPublisher(object):
sort_keys=False,
ensure_ascii=False).encode('utf8')
service.Validations.validate_envelope_size(msg)
# TODO(feature) next version of monasca-common
LOG.debug('Build key [%s] for message' % key)
LOG.debug('Sending message {topics=%s,key=%s,message=%s}' %

View File

@ -15,7 +15,9 @@
import datetime
import re
import sys
from falcon import errors as falcon_errors
from oslo_config import cfg
from oslo_log import log
@ -26,10 +28,16 @@ from monasca_log_api.api import rest_utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
_DEFAULT_MAX_LOG_SIZE = 1024 * 1024
service_opts = [
cfg.StrOpt('region',
default=None,
help='Region')
help='Region'),
cfg.IntOpt('max_log_size',
default=_DEFAULT_MAX_LOG_SIZE,
help=('Refers to payload/envelope size. If either is exceeded'
'API will throw an error'))
]
service_group = cfg.OptGroup(name='service', title='service')
@ -60,6 +68,7 @@ DIMENSION_VALUE_CONSTRAINTS = {
See :py:func:`Validations.validate_dimensions`
"""
EPOCH_START = datetime.datetime(1970, 1, 1)
SUPPORTED_CONTENT_TYPE = {'application/json', 'text/plain'}
class LogEnvelopeException(Exception):
@ -156,6 +165,103 @@ class Validations(object):
raise exceptions.HTTPUnprocessableEntity(
'Dimensions %s must be a dictionary (map)' % dimensions)
@staticmethod
def validate_content_type(req):
"""Validates content type.
Method validates request against correct
content type.
If content-type cannot be established (i.e. header is missing),
:py:class:`falcon.HTTPMissingHeader` is thrown.
If content-type is not **application/json** or **text/plain**,
:py:class:`falcon.HTTPUnsupportedMediaType` is thrown.
:param :py:class:`falcon.Request` req: current request
:exception: :py:class:`falcon.HTTPMissingHeader`
:exception: :py:class:`falcon.HTTPUnsupportedMediaType`
"""
content_type = req.content_type
LOG.debug('Content-Type is %s', content_type)
if content_type is None or len(content_type) == 0:
raise falcon_errors.HTTPMissingHeader(u'Content-Type')
if content_type not in SUPPORTED_CONTENT_TYPE:
sup_types = ', '.join(SUPPORTED_CONTENT_TYPE)
details = u'Only [%s] are accepted as logs representations' % str(
sup_types)
raise falcon_errors.HTTPUnsupportedMediaType(description=details)
@staticmethod
def validate_payload_size(req):
"""Validates payload size.
Method validates sent payload size.
It expects that http header **Content-Length** is present.
If it does not, method raises :py:class:`falcon.HTTPLengthRequired`.
Otherwise values is being compared with ::
[service]
max_log_size = 1048576
**max_log_size** refers to the maximum allowed content length.
If it is exceeded :py:class:`falcon.HTTPRequestEntityTooLarge` is
thrown.
:param :py:class:`falcon.Request` req: current request
:exception: :py:class:`falcon.HTTPLengthRequired`
:exception: :py:class:`falcon.HTTPRequestEntityTooLarge`
"""
payload_size = req.content_length
max_size = CONF.service.max_log_size
LOG.debug('Payload (content-length) is %s', str(payload_size))
if payload_size is None:
raise falcon_errors.HTTPLengthRequired(
title=u'Content length header is missing',
description=u'Content length is required to estimate if '
u'payload can be processed'
)
if payload_size >= max_size:
raise falcon_errors.HTTPRequestEntityTooLarge(
title=u'Log payload size exceeded',
description=u'Maximum allowed size is %d bytes' % max_size
)
@staticmethod
def validate_envelope_size(envelope=None):
"""Validates envelope size before sending to kafka.
Validates the case similar to what
:py:meth:`.Validations.validate_payload_size`. Difference is
that this method checks if log envelope (already serialized)
can be safely sent to Kafka.
For more information check kafka documentation regarding
Message Size Too Large exception.
:param str envelope: serialized envelope
:exception: :py:class:`falcon.HTTPInternalServerError`
"""
max_size = CONF.service.max_log_size
envelope_size = sys.getsizeof(envelope) if envelope is not None else -1
LOG.debug('Envelope size is %s', envelope_size)
if envelope_size >= max_size:
raise falcon_errors.HTTPInternalServerError(
title=u'Envelope size exceeded',
description=u'Maximum allowed size is %d bytes' % max_size
)
class LogCreator(object):
"""Transforms logs,
@ -221,16 +327,16 @@ class LogCreator(object):
application_type = parse_application_type(application_type)
dimensions = parse_dimensions(dimensions)
self._log.debug(
'application_type=%s,dimensions=%s' % (
application_type, dimensions)
)
if validate:
self._log.debug('Validation enabled, proceeding with validation')
Validations.validate_application_type(application_type)
Validations.validate_dimensions(dimensions)
self._log.debug(
'application_type=%s,dimensions=%s' % (
application_type, dimensions)
)
log_object = {}
if content_type == 'application/json':
log_object.update(payload)

View File

@ -37,8 +37,6 @@ def _before_logs_post(req, res, payload, params):
class Logs(logs_api.LogsApi):
"""Logs Api V1."""
_supported_c_types = {'application/json', 'text/plain'}
def __init__(self):
self._log_creator = service.LogCreator()
self._kafka_publisher = log_publisher.LogPublisher()
@ -46,12 +44,8 @@ class Logs(logs_api.LogsApi):
@falcon.before(_before_logs_post)
def on_post(self, req, res):
content_type = req.content_type
if content_type not in self._supported_c_types:
raise falcon.HTTPNotAcceptable(
description='Only %s are accepted as logs representations'
% self._supported_c_types)
service.Validations.validate_payload_size(req)
service.Validations.validate_content_type(req)
cross_tenant_id = req.get_param('tenant_id')
tenant_id = req.get_header(*headers.X_TENANT_ID)

View File

@ -44,3 +44,7 @@ class LogApiClient(service_client.ServiceClient):
resp, body = self.post(_uri(uri), msg, default_headers)
return resp, body
def custom_request(self, method, headers=None, body=None):
uri = '/' + _uri("/log/single")
self.request(method=method, url=uri, headers=headers, body=body)

View File

@ -38,7 +38,7 @@ def _get_message_size(size_base):
_SMALL_MESSAGE_SIZE = _get_message_size(0.001)
_MEDIUM_MESSAGE_SIZE = _get_message_size(0.01)
_LARGE_MESSAGE_SIZE = _get_message_size(0.1)
_REJECTABLE_MESSAGE_SIZE = _get_message_size(10)
_REJECTABLE_MESSAGE_SIZE = _get_message_size(1)
def generate_unique_message(message=None, size=50):

View File

@ -0,0 +1,99 @@
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import test
from tempest_lib import exceptions
from monasca_log_api_tempest.tests import base
class TestLogApiConstraints(base.BaseLogsTestCase):
@test.attr(type='gate')
def test_should_reject_if_content_length_missing(self):
headers = base._get_headers()
try:
self.logs_client.custom_request('POST', headers, None)
except exceptions.UnexpectedResponseCode as urc:
self.assertIn('411', str(urc)) # Only possible way to detect that
return
self.assertTrue(False, 'API should respond with 411')
@test.attr(type='gate')
def test_should_reject_if_content_type_missing(self):
headers = base._get_headers(content_type='')
try:
self.logs_client.custom_request('POST', headers, '{}')
except exceptions.BadRequest as urc:
self.assertEqual(400, urc.resp.status)
return
self.assertTrue(False, 'API should respond with 400')
@test.attr(type='gate')
def test_should_reject_if_wrong_content_type(self):
headers = base._get_headers(content_type='video/3gpp')
try:
self.logs_client.custom_request('POST', headers, '{}')
except exceptions.InvalidContentType as urc:
self.assertEqual(415, urc.resp.status)
return
self.assertTrue(False, 'API should respond with 400')
@test.attr(type='gate')
def test_should_reject_too_big_message(self):
_, message = base.generate_rejectable_message()
headers = base._get_headers(self.logs_client.get_headers())
try:
self.logs_client.send_single_log(message, headers)
except exceptions.OverLimit as urc:
self.assertEqual(413, urc.resp.status)
return
self.assertTrue(False, 'API should respond with 413')
@test.attr(type='gate')
def test_should_reject_too_big_message_multiline(self):
_, message = base.generate_rejectable_message()
message = message.replace(' ', '\n')
headers = base._get_headers(self.logs_client.get_headers())
try:
self.logs_client.send_single_log(message, headers)
except exceptions.OverLimit as urc:
self.assertEqual(413, urc.resp.status)
return
self.assertTrue(False, 'API should respond with 413')
@test.attr(type='gate')
def test_should_accept_message_but_reject_after_adding_metadata(self):
_, message = base.generate_unique_message(
size=base._get_message_size(0.9999))
headers = base._get_headers(self.logs_client.get_headers())
data = base._get_data(message)
try:
self.logs_client.send_single_log(data, headers)
except exceptions.ServerFault as urc:
self.assertEqual(500, urc.resp.status)
msg = urc.resp_body.get('title', None)
# in Java that is under message
if msg is None:
msg = urc.resp_body.get('message', None)
self.assertIsNotNone(msg, 'Should get status message')
self.assertEqual('Envelope size exceeded', msg)
return
self.assertTrue(False, 'API should respond with 500')