Handle XSS attacks using various service fields

Change-Id: I746bcc7856bbc99149067b1b9641ec4c3b64c1fe
This commit is contained in:
amitgandhinz 2015-01-02 14:53:10 -05:00
parent 4939d2a533
commit 433507a7a2
10 changed files with 169 additions and 15 deletions

View File

@ -53,7 +53,7 @@ class ServicesController(base.ServicesController):
services = []
for i in self.created_service_ids:
services = [{'service_id': i,
'name': uuid.uuid4(),
'service_name': uuid.uuid4(),
'domains': [json.dumps(
{'domain': 'www.mywebsite.com'})
],
@ -114,7 +114,7 @@ class ServicesController(base.ServicesController):
[{'operator_url': 'mockcf123.fastly.prod.com'}]})}
service_dict = {'service_id': service_id,
'name': uuid.uuid4(),
'service_name': uuid.uuid4(),
'domains': [domain_json],
'origins': [origin_json],
'flavor_id': 'standard',
@ -185,7 +185,7 @@ class ServicesController(base.ServicesController):
@staticmethod
def format_result(result):
service_id = result.get('service_id')
name = result.get('service_name')
name = str(result.get('service_name'))
origins = [json.loads(o) for o in result.get('origins', [])]
domains = [json.loads(d) for d in result.get('domains', [])]
origins = [origin.Origin(o['origin'],

View File

@ -51,6 +51,11 @@ class FlavorsController(base.Controller, hooks.HookController):
}
@pecan.expose('json')
@decorators.validate(
flavor_id=rule.Rule(
helpers.is_valid_flavor_id(),
helpers.abort_with_message)
)
def get_one(self, flavor_id):
"""get_one

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -27,7 +27,7 @@ class Model(collections.OrderedDict):
def __init__(self, caching):
super(Model, self).__init__()
self['name'] = caching.name
self['name'] = cgi.escape(caching.name)
self['ttl'] = caching.ttl
if caching.rules != []:
self['rules'] = [rule.Model(r) for r in caching.rules]

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -25,5 +25,5 @@ class Model(collections.OrderedDict):
def __init__(self, domain):
super(Model, self).__init__()
self['domain'] = domain.domain
self['domain'] = cgi.escape(domain.domain)
self['protocol'] = domain.protocol

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -27,7 +27,7 @@ class Model(collections.OrderedDict):
def __init__(self, origin):
super(Model, self).__init__()
self['origin'] = origin.origin
self['origin'] = cgi.escape(origin.origin)
self['port'] = origin.port
self['ssl'] = origin.ssl
self['rules'] = [rule.Model(r) for r in origin.rules]

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -27,5 +27,5 @@ class Model(collections.OrderedDict):
def __init__(self, restriction):
super(Model, self).__init__()
self['name'] = restriction.name
self['name'] = cgi.escape(restriction.name)
self['rules'] = [rule.Model(r) for r in restriction.rules]

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -25,10 +25,10 @@ class Model(collections.OrderedDict):
def __init__(self, rule):
super(Model, self).__init__()
self['name'] = rule.name
self['name'] = cgi.escape(rule.name)
for attr_name in ['http_host', 'http_method',
'client_ip', 'request_url',
'referrer']:
attr = getattr(rule, attr_name, None)
if attr is not None:
self[attr_name] = attr
self[attr_name] = cgi.escape(attr)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
try:
import ordereddict as collections
except ImportError: # pragma: no cover
@ -32,7 +33,7 @@ class Model(collections.OrderedDict):
def __init__(self, service_obj, controller):
super(Model, self).__init__()
self["name"] = service_obj.name
self["name"] = cgi.escape(service_obj.name)
self["id"] = str(service_obj.service_id)
self["domains"] = [domain.Model(d) for d in service_obj.domains]
self["origins"] = [origin.Model(o) for o in service_obj.origins]

View File

@ -0,0 +1,75 @@
{
"name_injection": {
"name": "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>",
"domain_list": [{"domain": "mywebsite.com", "protocol": "http"}],
"origin_list": [{"origin": "mywebsite1.com",
"port": 443,
"ssl": false}],
"caching_list": [{"name": "default", "ttl": 3600},
{"name": "home",
"ttl": 1200,
"rules": [{"name" : "index",
"request_url" : "/index.htm"}]}]
},
"domain_injection": {
"name": "bad_domain",
"domain_list": [
{
"domain": "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>",
"protocol": "http"
}
],
"origin_list": [{"origin": "mywebsite1.com",
"port": 443,
"ssl": false}],
"caching_list": []
},
"origin_injection": {
"name": "bad_origin",
"domain_list": [{"domain": "mywebsite.com", "protocol": "http"}],
"origin_list": [{"origin": "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>",
"port": 443,
"ssl": false}],
"caching_list": [{"name": "default", "ttl": 3600},
{"name": "home",
"ttl": 1200,
"rules": [{"name" : "index",
"request_url" : "/index.htm"}]}]
},
"caching_name_injection": {
"name": "bad_caching_name",
"domain_list": [{"domain": "mywebsite.com", "protocol": "http"}],
"origin_list": [{"origin": "mywebsite1.com",
"port": 443,
"ssl": false}],
"caching_list": [{"name": "default", "ttl": 3600},
{"name": "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>",
"ttl": 1200,
"rules": [{"name" : "index",
"request_url" : "/index.htm"}]}]
},
"caching_rule_injection": {
"name": "bad_caching_name",
"domain_list": [{"domain": "mywebsite.com", "protocol": "http"}],
"origin_list": [{"origin": "mywebsite1.com",
"port": 443,
"ssl": false}],
"caching_list": [{"name": "default", "ttl": 3600},
{"name": "images",
"ttl": 1200,
"rules": [{"name" : "index",
"request_url" : "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>"}]}]
},
"caching_rule_name_injection": {
"name": "bad_caching_name",
"domain_list": [{"domain": "mywebsite.com", "protocol": "http"}],
"origin_list": [{"origin": "mywebsite1.com",
"port": 443,
"ssl": false}],
"caching_list": [{"name": "default", "ttl": 3600},
{"name": "images",
"ttl": 1200,
"rules": [{"name" : "<SCRIPT SRC=http://ha.ckers.org/xss.js></SCRIPT>",
"request_url" : "/images"}]}]
}
}

View File

@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
import time
import uuid
@ -142,6 +143,78 @@ class TestCreateService(providers.TestProviderBase):
super(TestCreateService, self).tearDown()
@ddt.file_data("data_create_service_xss.json")
def test_create_service_with_xss_injection(self, test_data):
# create with hacker data
service_name = test_data['name']
domain_list = test_data['domain_list']
for item in domain_list:
item['domain'] = str(uuid.uuid1()) + '.com'
origin_list = test_data['origin_list']
caching_list = test_data['caching_list']
if 'flavor_id' in test_data:
flavor_id = test_data['flavor_id']
else:
flavor_id = self.flavor_id
resp = self.client.create_service(service_name=service_name,
domain_list=domain_list,
origin_list=origin_list,
caching_list=caching_list,
flavor_id=flavor_id)
if 'location' in resp.headers:
self.service_url = resp.headers['location']
self.assertEqual(resp.status_code, 202)
self.assertEqual(resp.text, '')
self.service_url = resp.headers['location']
resp = self.client.get_service(location=self.service_url)
self.assertEqual(resp.status_code, 200)
body = resp.json()
# validate name
self.assertEqual(
body['name'],
cgi.escape(test_data['name'])
)
# validate domain
self.assertEqual(
body['domains'][0]['domain'],
cgi.escape(domain_list[0]['domain'])
)
# validate origin
self.assertEqual(
body['origins'][0]['origin'],
cgi.escape(origin_list[0]['origin'])
)
if len(caching_list) > 0:
# validate caching name
self.assertEqual(
body['caching'][1]['name'],
cgi.escape(caching_list[1]['name'])
)
if len(caching_list) > 1:
# we have more than just the default
if len(caching_list[1]['rules']) > 0:
# validate caching rule name
self.assertEqual(
body['caching'][1]['rules'][0]['name'],
cgi.escape(caching_list[1]['rules'][0]['name'])
)
# validate caching rule request_url
self.assertEqual(
body['caching'][1]['rules'][0]['request_url'],
cgi.escape(caching_list[1]['rules'][0]['request_url'])
)
@ddt.ddt
class TestListServices(base.TestBase):