Refactor FD handling
* Fix duplicate closure of _input_read_fd * Delegate closure of _input_write_fd to the dedicated write thread Change-Id: I74128fe1f55abafea322111cf289fb0a978fc2b4
This commit is contained in:
parent
639fee6cd2
commit
b75465ab51
|
@ -558,8 +558,8 @@ class StorletInvocationProtocol(object):
|
|||
Close all of the container side descriptors
|
||||
"""
|
||||
fds = [self.data_write_fd, self.metadata_write_fd]
|
||||
if not self.srequest.data.has_fd:
|
||||
fds.append(self.input_data_read_fd)
|
||||
if self._input_data_read_fd is not None:
|
||||
fds.append(self._input_data_read_fd)
|
||||
fds.extend([source['read_fd'] for source in self.extra_data_sources])
|
||||
for fd in fds:
|
||||
os.close(fd)
|
||||
|
@ -569,7 +569,8 @@ class StorletInvocationProtocol(object):
|
|||
Close all of the host side descriptors
|
||||
"""
|
||||
fds = [self.data_read_fd, self.metadata_read_fd]
|
||||
fds.extend([source['write_fd'] for source in self.extra_data_sources])
|
||||
# NOTE(tkajinam): Local FDs for data input are closed by
|
||||
# _write_input_data
|
||||
self._safe_close(fds)
|
||||
|
||||
def _cancel(self):
|
||||
|
@ -675,10 +676,6 @@ class StorletInvocationProtocol(object):
|
|||
if fd not in w:
|
||||
raise StorletRuntimeException('Write fd is not ready')
|
||||
|
||||
def _close_input_data_descriptors(self):
|
||||
fds = [self._input_data_read_fd, self._input_data_write_fd]
|
||||
self._safe_close(fds)
|
||||
|
||||
def communicate(self):
|
||||
try:
|
||||
self._invoke()
|
||||
|
@ -715,8 +712,6 @@ class StorletInvocationProtocol(object):
|
|||
return StorletResponse(data)
|
||||
except Exception:
|
||||
self._close_local_side_descriptors()
|
||||
if not self.srequest.data.has_fd:
|
||||
self._close_input_data_descriptors()
|
||||
raise
|
||||
|
||||
@contextmanager
|
||||
|
@ -726,21 +721,21 @@ class StorletInvocationProtocol(object):
|
|||
|
||||
def _write_input_data(self, fd, data_iter):
|
||||
try:
|
||||
# double try/except block saving from unexpected errors
|
||||
try:
|
||||
with self._open_writer(fd) as writer:
|
||||
for chunk in data_iter:
|
||||
with StorletTimeout(self.timeout):
|
||||
writer.write(chunk)
|
||||
except (OSError, TypeError, ValueError):
|
||||
self.logger.exception('fdopen failed')
|
||||
except IOError:
|
||||
# this will happen at sort of broken pipe while writer.write
|
||||
self.logger.exception('IOError with writing fd %s' % fd)
|
||||
except StorletTimeout:
|
||||
self.logger.exception(
|
||||
'Timeout (%s)s with writing fd %s' % (self.timeout, fd))
|
||||
with self._open_writer(fd) as writer:
|
||||
for chunk in data_iter:
|
||||
with StorletTimeout(self.timeout):
|
||||
writer.write(chunk)
|
||||
except (OSError, TypeError, ValueError):
|
||||
self.logger.exception('fdopen failed')
|
||||
except IOError:
|
||||
# this will happen at sort of broken pipe while writer.write
|
||||
self.logger.exception('IOError with writing fd %s' % fd)
|
||||
except StorletTimeout:
|
||||
self.logger.exception(
|
||||
'Timeout (%s)s with writing fd %s' % (self.timeout, fd))
|
||||
except Exception:
|
||||
# _write_input_data is designed to run eventlet thread
|
||||
# so that we should catch and suppress it here
|
||||
# so we should catch an exception and suppress it here
|
||||
self.logger.exception('Unexpected error at writing input data')
|
||||
finally:
|
||||
self._safe_close(fd)
|
||||
|
|
Loading…
Reference in New Issue