Fix base64 usage for Python 3

Python 3 now handles str and bytes differently to Python 2, so when
using b64encode/decode, the input and output must specifically by
bytes.

This commit adds some base64 helpers to coerce the input to become
bytes if the input is a str and decodes the output back to str after
the encode or decode.

It also adds a test case to replicate the issue and cover the new
functions.

Change-Id: I8f64da03c7cfc76e620cb3b98201f6287b752bb3
This commit is contained in:
Andy Botting 2020-10-08 10:53:00 +11:00
parent dd1131146f
commit 3c41b4f1b9
5 changed files with 161 additions and 9 deletions

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import shutil
@ -27,6 +26,7 @@ from oslo_log import log as logging
from muranoagent import bunch
from muranoagent.common import config
from muranoagent import util
CONF = config.CONF
LOG = logging.getLogger(__name__)
@ -58,13 +58,14 @@ class ExecutionPlanQueue(object):
os.mkdir(folder_path)
plan_file_path = os.path.join(
folder_path, ExecutionPlanQueue.plan_filename)
json_plan = json.dumps({
'Data': util.b64encode(execution_plan),
'Signature': util.b64encode(signature or ''),
'ID': msg_id,
'ReplyTo': reply_to
})
with open(plan_file_path, 'wb') as out_file:
out_file.write(json.dumps({
'Data': base64.b64encode(execution_plan),
'Signature': base64.b64encode(signature or ''),
'ID': msg_id,
'ReplyTo': reply_to
}))
out_file.write(json_plan)
def _get_first_timestamp(self, filename):
def predicate(folder):
@ -94,9 +95,9 @@ class ExecutionPlanQueue(object):
return None
try:
data = base64.b64decode(ep_info['Data'])
data = util.b64decode(ep_info['Data'])
if self._key:
signature = base64.b64decode(ep_info['Signature'])
signature = util.b64decode(ep_info['Signature'])
self._verify_signature(data, signature)
ep = json.loads(data)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2015 Telefonica I+D
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import builtins
import fixtures
from muranoagent import execution_plan_queue
from muranoagent import app
from muranoagent import bunch
from muranoagent.common import config as cfg
from muranoagent.common.messaging import mqclient
from muranoagent import exceptions as exc
from muranoagent.tests.unit import base
from muranoagent.tests.unit import execution_plan as ep
from muranoagent import validation
CONF = cfg.CONF
class TestExecutionPlanQueue(base.MuranoAgentTestCase,
fixtures.FunctionFixture):
@mock.patch('os.chmod')
@mock.patch('os.path.exists')
def setUp(self, mock_path, mock_chmod):
super(TestExecutionPlanQueue, self).setUp()
mock_path.side_effect = self._exists
self.epq = execution_plan_queue.ExecutionPlanQueue()
CONF.set_override('storage', 'cache')
self.addCleanup(CONF.clear_override, 'storage')
@staticmethod
def _exists(path):
return 'stamp' not in path
@mock.patch('os.path.lexists')
@mock.patch('os.path.isdir')
@mock.patch('os.mkdir')
def test_put_execution_plan(self, mock_makedir, mock_path,
mock_exists):
mock_path.return_value = True
mock_makedir.return_value = None
mock_exists.return_value = True
mock_write = mock.mock_open()
execution_plan = 'myplan'
signature = None
msg_id = 1
reply_to = 'test'
expected_content = ('{"Data": "bXlwbGFu", "Signature": "", '
'"ID": 1, "ReplyTo": "test"}')
with mock.patch.object(builtins, 'open', mock_write) as mocked_file:
self.epq.put_execution_plan(execution_plan, signature,
msg_id, reply_to)
mocked_file().write.assert_called_once_with(expected_content)

View File

@ -0,0 +1,39 @@
# Copyright (c) 2015 Telefonica I+D
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from muranoagent.tests.unit import base
from muranoagent import util
class TestUtils(base.MuranoAgentTestCase):
def test_str_to_bytes(self):
self.assertEqual(util._to_bytes('test'), b'test')
def test_bytes_to_bytes(self):
self.assertEqual(util._to_bytes(b'test'), b'test')
def test_b64encode_str(self):
self.assertEqual(util.b64encode('test'), 'dGVzdA==')
def test_b64encode_bytes(self):
self.assertEqual(util.b64encode(b'test'), 'dGVzdA==')
def test_b64decode_str(self):
self.assertEqual(util.b64decode('dGVzdA=='), 'test')
def test_b64decode_bytes(self):
self.assertEqual(util.b64decode(b'dGVzdA=='), 'test')

40
muranoagent/util.py Normal file
View File

@ -0,0 +1,40 @@
# Copyright (c) 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
def _to_bytes(string):
"""Coerce a string into bytes
Since Python 3 now handles bytes and str differently, this helper
will coerce a string to bytes if possible for use with base64
"""
try:
string = string.encode()
except AttributeError:
pass
return string
def b64encode(string):
"""Base64 encode a string to a string"""
string = _to_bytes(string)
return base64.b64encode(string).decode()
def b64decode(string):
"""Base64 decode a string to a string"""
string = _to_bytes(string)
return base64.b64decode(string).decode()

View File

@ -0,0 +1,3 @@
---
fixes:
- Fixed a string handling issue with base64 when using Python 3