diff --git a/almanach/collector/main.py b/almanach/collector/main.py index 79c667b..b2b9971 100644 --- a/almanach/collector/main.py +++ b/almanach/collector/main.py @@ -34,11 +34,8 @@ def main(): database_driver.connect() application_controller = controller.Controller(config, database_driver) - connection = kombu.Connection(hostname=config.collector.rabbit_host, - port=config.collector.rabbit_port, - userid=config.collector.rabbit_username, - password=config.collector.rabbit_password, - heartbeat=540) + + connection = kombu.Connection(config.collector.url, heartbeat=config.collector.heartbeat) retry_listener = retry_adapter.RetryAdapter(config, connection) bus_listener = bus_adapter.BusAdapter(config, application_controller, connection, retry_listener) diff --git a/almanach/collector/retry_adapter.py b/almanach/collector/retry_adapter.py index 91346f3..3c6fe3b 100644 --- a/almanach/collector/retry_adapter.py +++ b/almanach/collector/retry_adapter.py @@ -20,18 +20,18 @@ LOG = log.getLogger(__name__) class RetryAdapter(object): - def __init__(self, config, connection): + def __init__(self, config, connection, retry_producer=None, dead_producer=None): self.config = config self.connection = connection retry_exchange = self._configure_retry_exchanges(self.connection) dead_exchange = self._configure_dead_exchange(self.connection) - self._retry_producer = kombu.Producer(self.connection, exchange=retry_exchange) - self._dead_producer = kombu.Producer(self.connection, exchange=dead_exchange) + self._retry_producer = retry_producer or kombu.Producer(self.connection, exchange=retry_exchange) + self._dead_producer = dead_producer or kombu.Producer(self.connection, exchange=dead_exchange) def publish_to_dead_letter(self, message): - death_count = self._rejected_count(message) + death_count = self._get_rejected_count(message) LOG.info('Message die %d times', death_count) if death_count < self.config.collector.max_retries: @@ -114,7 +114,7 @@ class RetryAdapter(object): "x-dead-letter-routing-key": self.config.collector.routing_key, } - def _rejected_count(self, message): + def _get_rejected_count(self, message): if 'x-death' in message.headers: return len(message.headers['x-death']) return 0 diff --git a/almanach/core/opts.py b/almanach/core/opts.py index e029cbb..bd09aa1 100644 --- a/almanach/core/opts.py +++ b/almanach/core/opts.py @@ -24,6 +24,7 @@ database_opts = [ default='mongodb', help='Database driver'), cfg.StrOpt('connection_url', + secret=True, default='mongodb://almanach:almanach@localhost:27017/almanach', help='Database connection URL'), ] @@ -38,16 +39,13 @@ api_opts = [ ] collector_opts = [ - cfg.HostnameOpt('rabbit_host', - default='localhost', - help='RabbitMQ Hostname'), - cfg.PortOpt('rabbit_port', - default=5672, - help='RabbitMQ TCP port'), - cfg.StrOpt('rabbit_username', - help='RabbitMQ Username'), - cfg.StrOpt('rabbit_password', - help='RabbitMQ Password'), + cfg.StrOpt('url', + secret=True, + default='amqp://guest:guest@localhost:5672', + help='RabbitMQ connection URL'), + cfg.IntOpt('heartbeat', + default=540, + help='RabbitMQ connection heartbeat'), cfg.StrOpt('queue', default='almanach.info', help='Default queue name'), @@ -85,11 +83,13 @@ auth_opts = [ default='private_key', help='Authentication driver for the API'), cfg.StrOpt('private_key', + secret=True, default='secret', help='Private key for private key authentication'), cfg.StrOpt('keystone_username', help='Keystone service username'), cfg.StrOpt('keystone_password', + secret=True, help='Keystone service password'), cfg.StrOpt('keystone_tenant', help='Keystone service tenant'), diff --git a/etc/almanach/almanach.docker.conf b/etc/almanach/almanach.docker.conf index 51664c8..176b9e6 100644 --- a/etc/almanach/almanach.docker.conf +++ b/etc/almanach/almanach.docker.conf @@ -47,19 +47,8 @@ bind_port = 8000 # From almanach # -# RabbitMQ Hostname (hostname value) -rabbit_host = messaging - -# RabbitMQ TCP port (port value) -# Minimum value: 0 -# Maximum value: 65535 -#rabbit_port = 5672 - -# RabbitMQ Username (string value) -rabbit_username = guest - -# RabbitMQ Password (string value) -rabbit_password = guest +# RabbitMQ connection URL (string value) +url = amqp://guest:guest@messaging:5672 # Default queue name (string value) #default_queue = almanach.info diff --git a/requirements.txt b/requirements.txt index 06402d9..d045ec8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ Flask==0.10.1 PyYAML==3.11 jsonpickle==0.7.1 pymongo>=3.0.2,!=3.1 # Apache-2.0 -kombu>=3.0.30 +kombu>=3.0.25 # BSD pytz>=2014.10 voluptuous==0.8.11 python-keystoneclient>=1.6.0 diff --git a/test-requirements.txt b/test-requirements.txt index 7be5dea..1729c6b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -12,3 +12,4 @@ sphinxcontrib-httpdomain # BSD flake8>=2.5.4,<2.6.0 # MIT hacking<0.12,>=0.11.0 # Apache-2.0 testtools>=1.4.0 # MIT +mock>=2.0 # BSD \ No newline at end of file diff --git a/tests/collector/test_retry_adapter.py b/tests/collector/test_retry_adapter.py index 2c52b01..b8104d2 100644 --- a/tests/collector/test_retry_adapter.py +++ b/tests/collector/test_retry_adapter.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flexmock import flexmock -from kombu import Connection -from kombu.tests import mocks -from kombu.transport import pyamqp +import mock from almanach.collector import retry_adapter from tests import base @@ -25,73 +22,28 @@ class BusAdapterTest(base.BaseTestCase): def setUp(self): super(BusAdapterTest, self).setUp() - self.setup_connection_mock() - self.retry_adapter = retry_adapter.RetryAdapter(self.config, self.connection) + self.connection = mock.Mock() + self.retry_producer = mock.Mock() + self.dead_producer = mock.Mock() + self.retry_adapter = retry_adapter.RetryAdapter(self.config, self.connection, + self.retry_producer, self.dead_producer) - def setup_connection_mock(self): - mocks.Transport.recoverable_connection_errors = pyamqp.Transport.recoverable_connection_errors - self.connection = flexmock(Connection(transport=mocks.Transport)) - self.channel_mock = flexmock(self.connection.default_channel) - self.connection.should_receive('channel').and_return(self.channel_mock) - - def test_declare_retry_exchanges_retries_if_it_fails(self): - connection = flexmock(Connection(transport=mocks.Transport)) - connection.should_receive('_establish_connection').times(3)\ - .and_raise(IOError)\ - .and_raise(IOError)\ - .and_return(connection.transport.establish_connection()) - - self.retry_adapter = retry_adapter.RetryAdapter(self.config, connection) - - def test_publish_to_retry_queue_happy_path(self): - message = self.build_message() - - self.expect_publish_with(message, 'almanach.retry').once() - self.retry_adapter.publish_to_dead_letter(message) - - def test_publish_to_retry_queue_retries_if_it_fails(self): - message = self.build_message() - - self.expect_publish_with(message, 'almanach.retry').times(4)\ - .and_raise(IOError)\ - .and_raise(IOError)\ - .and_raise(IOError)\ - .and_return(message) + def test_message_is_published_to_retry_queue(self): + message = mock.Mock(headers=dict()) + message.delivery_info = dict(routing_key='test') self.retry_adapter.publish_to_dead_letter(message) + self.connection.ensure.assert_called_with(self.retry_producer, self.retry_producer.publish, + errback=self.retry_adapter._error_callback, + interval_max=30, interval_start=0, interval_step=5) - def build_message(self, headers=dict()): - message = MyObject() - message.headers = headers - message.body = b'Now that the worst is behind you, it\'s time we get you back. - Mr. Robot' - message.delivery_info = {'routing_key': 42} - message.content_type = 'xml/rapture' - message.content_encoding = 'iso8859-1' - return message - - def test_publish_to_dead_letter_messages_retried_more_than_twice(self): - message = self.build_message(headers={'x-death': [0, 1, 2, 3]}) - - self.expect_publish_with(message, 'almanach.dead').once() + def test_message_is_published_to_dead_queue(self): + message = mock.Mock(headers={'x-death': [0, 1, 2, 3]}) + message.delivery_info = dict(routing_key='test') self.retry_adapter.publish_to_dead_letter(message) + self.assertEqual(self.connection.ensure.call_count, 3) - def expect_publish_with(self, message, exchange): - expected_message = {'body': message.body, - 'priority': 0, - 'content_encoding': message.content_encoding, - 'content_type': message.content_type, - 'headers': message.headers, - 'properties': {'delivery_mode': 2}} - - return self.channel_mock.should_receive('basic_publish')\ - .with_args(expected_message, exchange=exchange, routing_key=message.delivery_info['routing_key'], - mandatory=False, immediate=False) - - -class MyObject(object): - headers = None - body = None - delivery_info = None - content_type = None - content_encoding = None + self.connection.ensure.assert_called_with(self.dead_producer, self.dead_producer.publish, + errback=self.retry_adapter._error_callback, + interval_max=30, interval_start=0, interval_step=5)