Python 3: encode or decode i/o data of Popen.communicate()

In Python 3, input and output for Popen.communicate() is bytes type.
Therefore, encode input data and decode return data for Popen.communicate().

Change-Id: I70f009e3366f0eeda5790652ea14f3627b934664
Blueprint: neutron-python3
Closes-Bug: #1479159
This commit is contained in:
fumihiko kakuma 2015-07-22 14:00:25 +09:00
parent bd7e6d267c
commit 048316e981
3 changed files with 87 additions and 3 deletions

View File

@ -108,15 +108,31 @@ def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False):
try:
if (process_input is None or
isinstance(process_input, six.binary_type)):
_process_input = process_input
else:
_process_input = process_input.encode('utf-8')
if run_as_root and cfg.CONF.AGENT.root_helper_daemon:
returncode, _stdout, _stderr = (
execute_rootwrap_daemon(cmd, process_input, addl_env))
else:
obj, cmd = create_process(cmd, run_as_root=run_as_root,
addl_env=addl_env)
_stdout, _stderr = obj.communicate(process_input)
_stdout, _stderr = obj.communicate(_process_input)
returncode = obj.returncode
obj.stdin.close()
if six.PY3:
if isinstance(_stdout, bytes):
try:
_stdout = _stdout.decode(encoding='utf-8')
except UnicodeError:
pass
if isinstance(_stderr, bytes):
try:
_stderr = _stderr.decode(encoding='utf-8')
except UnicodeError:
pass
m = _("\nCommand: {cmd}\nExit code: {code}\nStdin: {stdin}\n"
"Stdout: {stdout}\nStderr: {stderr}").format(

View File

@ -18,6 +18,7 @@ import os
from eventlet.green import subprocess
from eventlet import greenthread
from oslo_log import log as logging
import six
from neutron.common import utils
@ -45,12 +46,29 @@ def create_process(cmd, addl_env=None):
def execute(cmd, process_input=None, addl_env=None,
check_exit_code=True, return_stderr=False, log_fail_as_error=True,
extra_ok_codes=None, run_as_root=False):
extra_ok_codes=None, run_as_root=False, do_decode=True):
try:
if (process_input is None or
isinstance(process_input, six.binary_type)):
_process_input = process_input
else:
_process_input = process_input.encode('utf-8')
obj, cmd = create_process(cmd, addl_env=addl_env)
_stdout, _stderr = obj.communicate(process_input)
_stdout, _stderr = obj.communicate(_process_input)
obj.stdin.close()
if six.PY3:
if isinstance(_stdout, bytes):
try:
_stdout = _stdout.decode(encoding='utf-8')
except UnicodeError:
pass
if isinstance(_stderr, bytes):
try:
_stderr = _stderr.decode(encoding='utf-8')
except UnicodeError:
pass
m = _("\nCommand: %(cmd)s\nExit code: %(code)s\nStdin: %(stdin)s\n"
"Stdout: %(stdout)s\nStderr: %(stderr)s") % \
{'cmd': cmd,

View File

@ -15,6 +15,7 @@
import socket
import mock
import six
import testtools
from neutron.agent.linux import utils
@ -107,6 +108,55 @@ class AgentUtilsExecuteTest(base.BaseTestCase):
['ls'], log_fail_as_error=False)
self.assertFalse(log.error.called)
def test_encode_process_input(self):
str_idata = "%s\n" % self.test_file[:-1]
str_odata = "%s\n" % self.test_file
if six.PY3:
bytes_idata = str_idata.encode(encoding='utf-8')
bytes_odata = str_odata.encode(encoding='utf-8')
self.mock_popen.return_value = [bytes_odata, b'']
result = utils.execute(['cat'], process_input=str_idata)
self.mock_popen.assert_called_once_with(bytes_idata)
self.assertEqual(str_odata, result)
else:
self.mock_popen.return_value = [str_odata, '']
result = utils.execute(['cat'], process_input=str_idata)
self.mock_popen.assert_called_once_with(str_idata)
self.assertEqual(str_odata, result)
def test_return_str_data(self):
str_data = "%s\n" % self.test_file
self.mock_popen.return_value = [str_data, '']
result = utils.execute(['ls', self.test_file], return_stderr=True)
self.assertEqual((str_data, ''), result)
def test_raise_unicodeerror_in_decoding_out_data(self):
class m_bytes(bytes):
def decode(self, encoding=None):
raise UnicodeError
err_data = 'UnicodeError'
bytes_err_data = b'UnicodeError'
out_data = "%s\n" % self.test_file
bytes_out_data = m_bytes(out_data.encode(encoding='utf-8'))
if six.PY3:
self.mock_popen.return_value = [bytes_out_data, bytes_err_data]
result = utils.execute(['ls', self.test_file],
return_stderr=True)
self.assertEqual((bytes_out_data, err_data), result)
class AgentUtilsExecuteEncodeTest(base.BaseTestCase):
def setUp(self):
super(AgentUtilsExecuteEncodeTest, self).setUp()
self.test_file = self.get_temp_file_path('test_execute.tmp')
open(self.test_file, 'w').close()
def test_decode_return_data(self):
str_data = "%s\n" % self.test_file
result = utils.execute(['ls', self.test_file], return_stderr=True)
self.assertEqual((str_data, ''), result)
class AgentUtilsGetInterfaceMAC(base.BaseTestCase):
def test_get_interface_mac(self):