Use quoting for CSV Writing

An attacker could create an instance with a malicious name beginning
with an equals sign (=) or at sign (‘@’).
These are both recognized in Excel as metacharacters for a formula. The
attacker can create an instance name that includes a payload that will
execute code such as:
=cmd|' /C calc'!A0
This payload opens the calculator program when the resulting CSV is
opened on a Windows machine with Microsoft Excel. An attacker could
easily substitute this payload with another that runs any arbitrary
shell commands.

Quote the CSV output so this is no longer a possibility.

Closes-Bug: #1842749
Change-Id: I937fa2a14bb483d87f057b3e8be219ecdc9363eb
This commit is contained in:
Adam Harwell 2019-08-28 16:59:06 -07:00
parent 1d072dbe69
commit 70629916fe
6 changed files with 92 additions and 53 deletions

View File

@ -12,9 +12,7 @@
from __future__ import division
from csv import DictWriter
from csv import writer
import csv
from django.http import HttpResponse
from django.http import StreamingHttpResponse
@ -38,10 +36,11 @@ class CsvDataMixin(object):
super(CsvDataMixin, self).__init__()
if hasattr(self, "columns"):
columns = [self.encode(col) for col in self.columns]
self.writer = DictWriter(self.out, columns)
self.writer = csv.DictWriter(self.out, columns,
quoting=csv.QUOTE_ALL)
self.is_dict = True
else:
self.writer = writer(self.out)
self.writer = csv.writer(self.out, quoting=csv.QUOTE_ALL)
self.is_dict = False
def write_csv_header(self):

View File

@ -56,7 +56,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('Shelve', test.IsHttpRequest())] * 4)
self.assertEqual(12, self.mock_extension_supported.call_count)
self.assertEqual(15, self.mock_extension_supported.call_count)
self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
self.mock_image_list_detailed_by_ids.assert_called_once_with(
test.IsHttpRequest(), instances_img_ids)
@ -105,7 +105,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('Shelve', test.IsHttpRequest())] * 4)
self.assertEqual(12, self.mock_extension_supported.call_count)
self.assertEqual(15, self.mock_extension_supported.call_count)
self.mock_flavor_list.assert_called_once_with(test.IsHttpRequest())
self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
self.mock_flavor_get.assert_has_calls(
@ -157,7 +157,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('Shelve', test.IsHttpRequest())] * 4)
self.assertEqual(12, self.mock_extension_supported.call_count)
self.assertEqual(15, self.mock_extension_supported.call_count)
self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
self.mock_flavor_get.assert_has_calls(
[mock.call(test.IsHttpRequest(), s.flavor['id']) for s in servers])
@ -265,7 +265,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('Shelve', test.IsHttpRequest())] * 4)
self.assertEqual(12, self.mock_extension_supported.call_count)
self.assertEqual(15, self.mock_extension_supported.call_count)
@test.create_mocks({
api.nova: ['flavor_list', 'server_list_paged', 'extension_supported'],
@ -299,7 +299,7 @@ class InstanceViewTest(test.BaseAdminViewTests):
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('AdminActions', test.IsHttpRequest()),
mock.call('Shelve', test.IsHttpRequest())] * 4)
self.assertEqual(12, self.mock_extension_supported.call_count)
self.assertEqual(15, self.mock_extension_supported.call_count)
search_opts = {'marker': None, 'paginate': True, 'all_tenants': True}
self.mock_server_list_paged.assert_called_once_with(
test.IsHttpRequest(),

View File

@ -166,16 +166,17 @@ class UsageViewTests(test.BaseAdminViewTests):
res = self.client.get(csv_url)
self.assertTemplateUsed(res, 'admin/overview/usage.csv')
self.assertIsInstance(res.context['usage'], usage.GlobalUsage)
hdr = 'Project Name,VCPUs,RAM (MB),Disk (GB),Usage (Hours)'
hdr = '"Project Name","VCPUs","RAM (MB)","Disk (GB)","Usage (Hours)"'
self.assertContains(res, '%s\r\n' % hdr)
if nova_stu_enabled:
for obj in usage_obj:
row = u'{0},{1},{2},{3},{4:.2f}\r\n'.format(obj.project_name,
obj.vcpus,
obj.memory_mb,
obj.disk_gb_hours,
obj.vcpu_hours)
row = u'"{0}","{1}","{2}","{3}","{4:.2f}"\r\n'.format(
obj.project_name,
obj.vcpus,
obj.memory_mb,
obj.disk_gb_hours,
obj.vcpu_hours)
self.assertContains(res, row)
self.assert_mock_multiple_calls_with_same_arguments(

View File

@ -1269,8 +1269,8 @@ class UsageViewTests(test.BaseAdminViewTests):
self.assertTemplateUsed(res, 'project/overview/usage.csv')
self.assertIsInstance(res.context['usage'], usage.ProjectUsage)
hdr = ('Instance Name,VCPUs,RAM (MB),Disk (GB),Usage (Hours),'
'Age (Seconds),State')
hdr = ('"Instance Name","VCPUs","RAM (MB)","Disk (GB)",'
'"Usage (Hours)","Age (Seconds)","State"')
self.assertContains(res, '%s\r\n' % hdr)
self.assert_mock_multiple_calls_with_same_arguments(
@ -1283,6 +1283,29 @@ class UsageViewTests(test.BaseAdminViewTests):
else:
self.mock_usage_get.assert_not_called()
@test.create_mocks({api.nova: ('usage_get',
'extension_supported')})
def test_usage_csv_quoting(self):
# Explicitly test the values of the third usage for correct quoting
usage_obj = api.nova.NovaUsage(self.usages.list()[2])
self.mock_usage_get.return_value = usage_obj
project_id = self.usages.list()[2].tenant_id
csv_url = reverse('horizon:identity:projects:usage',
args=[project_id]) + "?format=csv"
res = self.client.get(csv_url)
self.assertIsInstance(res.context['usage'], usage.ProjectUsage)
hdr = ('"Instance Name","VCPUs","RAM (MB)","Disk (GB)",'
'"Usage (Hours)","Age (Seconds)","State"')
self.assertContains(res, '%s\r\n' % hdr)
usage_1_quoted = ('"=cmd|\' /C calc\'!A0","1","512","0","122.87",'
'"442321","Active"')
self.assertContains(res, '%s\r\n' % usage_1_quoted)
usage_2_quoted = ('"=cmd|\' /C calc\'!A0","1","512","0","2.61",'
'"9367","Active"')
self.assertContains(res, '%s\r\n' % usage_2_quoted)
class DetailProjectViewTests(test.BaseAdminViewTests):

View File

@ -229,7 +229,7 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
return self.client.get(INDEX_URL)
def _check_get_index(self, use_servers_update_address=True,
multiplier=4):
multiplier=5):
expected_extension_count = {'AdminActions': 4 * multiplier,
'Shelve': 1 * multiplier}
expected_feature_count = 2 * multiplier
@ -357,10 +357,10 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
self.assertItemsEqual(instances, self.servers.list())
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_server_list_paged.assert_called_once_with(
helpers.IsHttpRequest(),
@ -375,10 +375,10 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
self.mock_tenant_absolute_limits, 2,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
@helpers.create_mocks({
@ -435,10 +435,10 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
if expected_image_name:
self.assertContains(res, expected_image_name)
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_flavor_list.assert_called_once_with(helpers.IsHttpRequest())
self._assert_mock_image_list_detailed_calls()
@ -454,10 +454,10 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
self.mock_tenant_absolute_limits, 2,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
self.mock_volume_list.assert_called_once_with(helpers.IsHttpRequest())
@ -497,7 +497,7 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
if console_link_rendered:
break
self.assertTrue(console_link_rendered)
self._check_get_index(multiplier=5)
self._check_get_index(multiplier=6)
@django.test.utils.override_settings(CONSOLE_TYPE=None)
def test_index_without_console_link(self):
@ -508,7 +508,7 @@ class InstanceTableTests(InstanceTestBase, InstanceTableTestMixin):
for instance in instances:
for action in instances_table.get_row_actions(instance):
self.assertNotIsInstance(action, tables.ConsoleLink)
self._check_get_index(multiplier=8)
self._check_get_index(multiplier=10)
@helpers.create_mocks({api.nova: ('server_list_paged',
'flavor_list',
@ -1928,10 +1928,10 @@ class InstanceTests(InstanceTestBase):
else:
self.assertNotContains(res, _action_id)
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_flavor_list.assert_called_once_with(helpers.IsHttpRequest())
self._assert_mock_image_list_detailed_calls()
@ -1947,10 +1947,10 @@ class InstanceTests(InstanceTestBase):
self.mock_tenant_absolute_limits, 2,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
@helpers.create_mocks({api.nova: ('get_password',)})
@ -4224,10 +4224,10 @@ class InstanceLaunchInstanceTests(InstanceTestBase,
self.assertEqual((('compute', 'os_compute_api:servers:create'),),
launch_action.policy_rules)
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_flavor_list.assert_called_once_with(helpers.IsHttpRequest())
self._assert_mock_image_list_detailed_calls()
@ -4243,10 +4243,10 @@ class InstanceLaunchInstanceTests(InstanceTestBase,
self.mock_tenant_absolute_limits, 3,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
@helpers.create_mocks({
@ -4289,10 +4289,10 @@ class InstanceLaunchInstanceTests(InstanceTestBase,
self.assertEqual('Launch Instance (Quota exceeded)',
six.text_type(launch_action.verbose_name))
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_flavor_list.assert_called_once_with(helpers.IsHttpRequest())
self._assert_mock_image_list_detailed_calls()
@ -4308,10 +4308,10 @@ class InstanceLaunchInstanceTests(InstanceTestBase,
self.mock_tenant_absolute_limits, 3,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
@helpers.create_mocks({api.glance: ('image_list_detailed',),
@ -4472,10 +4472,10 @@ class InstanceTests2(InstanceTestBase, InstanceTableTestMixin):
self.assertContains(res, "instances__confirm")
self.assertContains(res, "instances__revert")
self._check_extension_supported({'AdminActions': 16,
'Shelve': 4})
self._check_extension_supported({'AdminActions': 20,
'Shelve': 5})
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_feature_available, 8,
self.mock_is_feature_available, 10,
mock.call(helpers.IsHttpRequest(), 'locked_attribute'))
self.mock_flavor_list.assert_called_once_with(helpers.IsHttpRequest())
self._assert_mock_image_list_detailed_calls()
@ -4491,10 +4491,10 @@ class InstanceTests2(InstanceTestBase, InstanceTableTestMixin):
self.mock_tenant_absolute_limits, 2,
mock.call(helpers.IsHttpRequest(), reserved=True))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_supported, 8,
self.mock_floating_ip_supported, 10,
mock.call(helpers.IsHttpRequest()))
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_floating_ip_simple_associate_supported, 4,
self.mock_floating_ip_simple_associate_supported, 5,
mock.call(helpers.IsHttpRequest()))
@helpers.create_mocks({api.nova: ('extension_supported',

View File

@ -362,7 +362,7 @@ def data(TEST):
vals.update({"name": u'\u4e91\u89c4\u5219',
"status": "ACTIVE",
"tenant_id": tenant3.id,
"server_id": "3"})
"server_id": "3"})
server_3 = servers.Server(servers.ServerManager(None),
json.loads(SERVER_DATA % vals)['server'])
vals.update({"name": "server_4",
@ -370,7 +370,13 @@ def data(TEST):
"server_id": "4"})
server_4 = servers.Server(servers.ServerManager(None),
json.loads(SERVER_DATA % vals)['server'])
TEST.servers.add(server_1, server_2, server_3, server_4)
vals.update({"name": "=cmd|' /C calc'!A0",
"status": "PAUSED",
"tenant_id": TEST.tenants.first().id,
"server_id": "5"})
server_5 = servers.Server(servers.ServerManager(None),
json.loads(SERVER_DATA % vals)['server'])
TEST.servers.add(server_1, server_2, server_3, server_4, server_5)
# VNC Console Data
console = {
@ -426,6 +432,16 @@ def data(TEST):
json.loads(USAGE_DATA % usage_2_vals))
TEST.usages.add(usage_obj_2)
usage_3_vals = {"tenant_id": TEST.tenants.first(),
"instance_name": server_5.name,
"flavor_name": flavor_1.name,
"flavor_vcpus": flavor_1.vcpus,
"flavor_disk": flavor_1.disk,
"flavor_ram": flavor_1.ram}
usage_obj_3 = usage.Usage(usage.UsageManager(None),
json.loads(USAGE_DATA % usage_3_vals))
TEST.usages.add(usage_obj_3)
# Availability Zones
TEST.availability_zones.add(availability_zones.AvailabilityZone(
availability_zones.AvailabilityZoneManager(None),