diff --git a/storlets/gateway/gateways/container/runtime.py b/storlets/gateway/gateways/container/runtime.py index a46ed9f1..9fe28862 100644 --- a/storlets/gateway/gateways/container/runtime.py +++ b/storlets/gateway/gateways/container/runtime.py @@ -558,8 +558,8 @@ class StorletInvocationProtocol(object): Close all of the container side descriptors """ fds = [self.data_write_fd, self.metadata_write_fd] - if not self.srequest.data.has_fd: - fds.append(self.input_data_read_fd) + if self._input_data_read_fd is not None: + fds.append(self._input_data_read_fd) fds.extend([source['read_fd'] for source in self.extra_data_sources]) for fd in fds: os.close(fd) @@ -569,7 +569,8 @@ class StorletInvocationProtocol(object): Close all of the host side descriptors """ fds = [self.data_read_fd, self.metadata_read_fd] - fds.extend([source['write_fd'] for source in self.extra_data_sources]) + # NOTE(tkajinam): Local FDs for data input are closed by + # _write_input_data self._safe_close(fds) def _cancel(self): @@ -675,10 +676,6 @@ class StorletInvocationProtocol(object): if fd not in w: raise StorletRuntimeException('Write fd is not ready') - def _close_input_data_descriptors(self): - fds = [self._input_data_read_fd, self._input_data_write_fd] - self._safe_close(fds) - def communicate(self): try: self._invoke() @@ -715,8 +712,6 @@ class StorletInvocationProtocol(object): return StorletResponse(data) except Exception: self._close_local_side_descriptors() - if not self.srequest.data.has_fd: - self._close_input_data_descriptors() raise @contextmanager @@ -726,21 +721,21 @@ class StorletInvocationProtocol(object): def _write_input_data(self, fd, data_iter): try: - # double try/except block saving from unexpected errors - try: - with self._open_writer(fd) as writer: - for chunk in data_iter: - with StorletTimeout(self.timeout): - writer.write(chunk) - except (OSError, TypeError, ValueError): - self.logger.exception('fdopen failed') - except IOError: - # this will happen at sort of broken pipe while writer.write - self.logger.exception('IOError with writing fd %s' % fd) - except StorletTimeout: - self.logger.exception( - 'Timeout (%s)s with writing fd %s' % (self.timeout, fd)) + with self._open_writer(fd) as writer: + for chunk in data_iter: + with StorletTimeout(self.timeout): + writer.write(chunk) + except (OSError, TypeError, ValueError): + self.logger.exception('fdopen failed') + except IOError: + # this will happen at sort of broken pipe while writer.write + self.logger.exception('IOError with writing fd %s' % fd) + except StorletTimeout: + self.logger.exception( + 'Timeout (%s)s with writing fd %s' % (self.timeout, fd)) except Exception: # _write_input_data is designed to run eventlet thread - # so that we should catch and suppress it here + # so we should catch an exception and suppress it here self.logger.exception('Unexpected error at writing input data') + finally: + self._safe_close(fd)