Remove eventlet usage

This removes entirely our usage of eventlet and its ugly monkey-patching
in favor of a threaded approach.

Implements: remove-eventlet
Change-Id: Ib5f623e2d1ff9e9254601ad091bf5b53ab32000d
This commit is contained in:
Julien Danjou 2015-11-02 18:51:13 +01:00
parent 6bc86f75ea
commit 867ad8d6ab
18 changed files with 25 additions and 196 deletions

View File

@ -1,7 +1,5 @@
# Copyright 2014 eNovance # Copyright 2014 eNovance
# #
# Authors: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
@ -14,13 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# This must be set before the initial import of eventlet because if
# dnspython is present in your environment then eventlet monkeypatches
# socket.getaddrinfo() with an implementation which doesn't work for IPv6.
import os
os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
class NotImplementedError(NotImplementedError): class NotImplementedError(NotImplementedError):
# FIXME(jd) This is used by WSME to return a correct HTTP code. We should # FIXME(jd) This is used by WSME to return a correct HTTP code. We should

View File

@ -1,22 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
# NOTE(jd) We need to monkey patch the socket and select module for,
# at least, oslo.messaging, otherwise everything's blocked on its
# first read() or select(), thread need to be patched too, because
# oslo.messaging use threading.local
eventlet.monkey_patch(socket=True, select=True, thread=True, time=True)

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
"""Implementation of Inspector abstraction for XenAPI.""" """Implementation of Inspector abstraction for XenAPI."""
from eventlet import timeout
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import units from oslo_utils import units
try: try:
@ -38,9 +37,6 @@ OPTS = [
cfg.StrOpt('connection_password', cfg.StrOpt('connection_password',
help='Password for connection to XenServer/Xen Cloud Platform.', help='Password for connection to XenServer/Xen Cloud Platform.',
secret=True), secret=True),
cfg.IntOpt('login_timeout',
default=10,
help='Timeout in seconds for XenAPI login.'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
@ -63,13 +59,10 @@ def get_api_session():
raise XenapiException(_('Must specify connection_url, and ' raise XenapiException(_('Must specify connection_url, and '
'connection_password to use')) 'connection_password to use'))
exception = api.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
try: try:
session = (api.xapi_local() if url == 'unix://local' session = (api.xapi_local() if url == 'unix://local'
else api.Session(url)) else api.Session(url))
with timeout.Timeout(CONF.xenapi.login_timeout, exception): session.login_with_password(username, password)
session.login_with_password(username, password)
except api.Failure as e: except api.Failure as e:
msg = _("Could not connect to XenAPI: %s") % e.details[0] msg = _("Could not connect to XenAPI: %s") % e.details[0]
raise XenapiException(msg) raise XenapiException(msg)

View File

@ -61,7 +61,7 @@ def get_rpc_server(transport, topic, endpoint):
serializer = oslo_serializer.RequestContextSerializer( serializer = oslo_serializer.RequestContextSerializer(
oslo_serializer.JsonPayloadSerializer()) oslo_serializer.JsonPayloadSerializer())
return oslo_messaging.get_rpc_server(transport, target, return oslo_messaging.get_rpc_server(transport, target,
[endpoint], executor='eventlet', [endpoint], executor='threading',
serializer=serializer) serializer=serializer)
@ -79,7 +79,7 @@ def get_notification_listener(transport, targets, endpoints,
allow_requeue=False): allow_requeue=False):
"""Return a configured oslo_messaging notification listener.""" """Return a configured oslo_messaging notification listener."""
return oslo_messaging.get_notification_listener( return oslo_messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet', transport, targets, endpoints, executor='threading',
allow_requeue=allow_requeue) allow_requeue=allow_requeue)

View File

@ -16,7 +16,7 @@ import itertools
import ceilometer.agent.manager import ceilometer.agent.manager
import ceilometer.api import ceilometer.api
import ceilometer.api.app import ceilometer.api.app
import ceilometer.cmd.eventlet.polling import ceilometer.cmd.polling
import ceilometer.collector import ceilometer.collector
import ceilometer.compute.discovery import ceilometer.compute.discovery
import ceilometer.compute.notifications import ceilometer.compute.notifications
@ -57,7 +57,7 @@ def list_opts():
('DEFAULT', ('DEFAULT',
itertools.chain(ceilometer.agent.manager.OPTS, itertools.chain(ceilometer.agent.manager.OPTS,
ceilometer.api.app.OPTS, ceilometer.api.app.OPTS,
ceilometer.cmd.eventlet.polling.CLI_OPTS, ceilometer.cmd.polling.CLI_OPTS,
ceilometer.compute.notifications.OPTS, ceilometer.compute.notifications.OPTS,
ceilometer.compute.util.OPTS, ceilometer.compute.util.OPTS,
ceilometer.compute.virt.inspector.OPTS, ceilometer.compute.virt.inspector.OPTS,

View File

@ -139,7 +139,6 @@ class MessagingPublisher(publisher.PublisherBase):
def flush(self): def flush(self):
# NOTE(sileht): # NOTE(sileht):
# IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the # this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to # queue and the remaining messages in the queue are added to
# self.local_queue after in case of a other call have already added # self.local_queue after in case of a other call have already added

View File

@ -18,7 +18,6 @@
import functools import functools
import os.path import os.path
import eventlet
import oslo_messaging.conffixture import oslo_messaging.conffixture
from oslo_utils import timeutils from oslo_utils import timeutils
from oslotest import base from oslotest import base
@ -39,10 +38,6 @@ class BaseTestCase(base.BaseTestCase):
exchange = 'ceilometer' exchange = 'ceilometer'
conf.set_override("control_exchange", exchange) conf.set_override("control_exchange", exchange)
# oslo.messaging fake driver needs time and thread
# to be patched, otherwise there are chances of deadlocks
eventlet.monkey_patch(time=True, thread=True)
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded # NOTE(sileht): Ensure a new oslo.messaging driver is loaded
# between each tests # between each tests
self.transport = messaging.get_transport("fake://", cache=False) self.transport = messaging.get_transport("fake://", cache=False)

View File

@ -16,7 +16,6 @@
import shutil import shutil
import eventlet
import mock import mock
from oslo_config import fixture as fixture_config from oslo_config import fixture as fixture_config
from oslo_context import context from oslo_context import context
@ -255,7 +254,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
if (len(self.publisher.samples) >= self.expected_samples and if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events): len(self.publisher.events) >= self.expected_events):
break break
eventlet.sleep(0)
self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners) self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners)
self.srv.stop() self.srv.stop()
@ -284,114 +282,45 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.assertIn(pipeline_poller_call, self.assertIn(pipeline_poller_call,
self.srv.tg.add_timer.call_args_list) self.srv.tg.add_timer.call_args_list)
@mock.patch('ceilometer.publisher.test.TestPublisher') def test_notification_reloaded_pipeline(self):
def test_notification_reloaded_pipeline(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
pipeline_cfg_file = self.setup_pipeline(['instance']) pipeline_cfg_file = self.setup_pipeline(['instance'])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.expected_samples = 1
self.srv.start() self.srv.start()
notifier = messaging.get_notifier(self.transport, pipeline = self.srv.pipe_manager
"compute.vagrant-precise")
notifier.info(context.RequestContext(), 'compute.instance.create.end',
TEST_NOTICE_PAYLOAD)
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
self.assertEqual(self.expected_samples, len(self.publisher.samples))
# Flush publisher samples to test reloading
self.publisher.samples = []
# Modify the collection targets # Modify the collection targets
updated_pipeline_cfg_file = self.setup_pipeline(['vcpus', updated_pipeline_cfg_file = self.setup_pipeline(['vcpus',
'disk.root.size']) 'disk.root.size'])
# Move/re-name the updated pipeline file to the original pipeline # Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config # file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
self.srv.refresh_pipeline()
self.expected_samples = 2 self.assertNotEqual(pipeline, self.srv.pipe_manager)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
# Send message again to verify the reload works
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
notifier.info(context.RequestContext(), 'compute.instance.create.end',
TEST_NOTICE_PAYLOAD)
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
self.assertEqual(self.expected_samples, len(self.publisher.samples))
(self.assertIn(sample.name, ['disk.root.size', 'vcpus'])
for sample in self.publisher.samples)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_reloaded_event_pipeline(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
def test_notification_reloaded_event_pipeline(self):
ev_pipeline_cfg_file = self.setup_event_pipeline( ev_pipeline_cfg_file = self.setup_event_pipeline(
['compute.instance.create.start']) ['compute.instance.create.start'])
self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file) self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file)
self.CONF.set_override("store_events", True, group="notification") self.CONF.set_override("store_events", True, group="notification")
self.expected_events = 1
self.srv.start() self.srv.start()
notifier = messaging.get_notifier(self.transport, pipeline = self.srv.event_pipe_manager
"compute.vagrant-precise")
notifier.info(context.RequestContext(),
'compute.instance.create.start',
TEST_NOTICE_PAYLOAD)
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.events) >= self.expected_events:
break
eventlet.sleep(0)
self.assertEqual(self.expected_events, len(self.publisher.events))
# Flush publisher events to test reloading
self.publisher.events = []
# Modify the collection targets # Modify the collection targets
updated_ev_pipeline_cfg_file = self.setup_event_pipeline( updated_ev_pipeline_cfg_file = self.setup_event_pipeline(
['compute.instance.*']) ['compute.instance.*'])
# Move/re-name the updated pipeline file to the original pipeline # Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config # file path as recorded in oslo config
shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file) shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file)
self.srv.refresh_pipeline()
self.expected_events = 1 self.assertNotEqual(pipeline, self.srv.pipe_manager)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
# Send message again to verify the reload works
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
notifier.info(context.RequestContext(), 'compute.instance.create.end',
TEST_NOTICE_PAYLOAD)
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.events) >= self.expected_events:
break
eventlet.sleep(0)
self.assertEqual(self.expected_events, len(self.publisher.events))
self.assertEqual(self.publisher.events[0].event_type,
'compute.instance.create.end')
class TestRealNotification(BaseRealNotification): class TestRealNotification(BaseRealNotification):
@ -417,7 +346,6 @@ class TestRealNotification(BaseRealNotification):
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.events) >= self.expected_events: if len(self.publisher.events) >= self.expected_events:
break break
eventlet.sleep(0)
self.srv.stop() self.srv.stop()
self.assertEqual(self.expected_events, len(self.publisher.events)) self.assertEqual(self.expected_events, len(self.publisher.events))
@ -582,7 +510,6 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
if (len(self.publisher.samples + self.publisher2.samples) >= if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples): self.expected_samples):
break break
eventlet.sleep(0)
self.srv.stop() self.srv.stop()
self.srv2.stop() self.srv2.stop()

View File

@ -12,18 +12,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Tests for ceilometer/central/manager.py """Tests for ceilometer agent manager"""
"""
import shutil import shutil
import eventlet
from keystoneclient import exceptions as ks_exceptions from keystoneclient import exceptions as ks_exceptions
import mock import mock
from novaclient import client as novaclient from novaclient import client as novaclient
from oslo_service import service as os_service from oslo_service import service as os_service
from oslo_utils import fileutils from oslo_utils import fileutils
from oslo_utils import timeutils
from oslotest import base from oslotest import base
from oslotest import mockpatch from oslotest import mockpatch
import requests import requests
@ -408,11 +405,9 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start() self.mgr.start()
start = timeutils.utcnow() # Manually executes callbacks
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: for timer in self.mgr.pollster_timers:
if len(self.notified_samples) >= expected_samples: timer.f(*timer.args, **timer.kw)
break
eventlet.sleep(0)
samples = self.notified_samples samples = self.notified_samples
self.assertEqual(expected_samples, len(samples)) self.assertEqual(expected_samples, len(samples))
@ -442,12 +437,6 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start() self.mgr.start()
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the old name of meters # we only got the old name of meters
for sample in self.notified_samples: for sample in self.notified_samples:
@ -475,20 +464,10 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
# file path as recorded in oslo config # file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
# Random sleep to let the pipeline poller complete the reloading
eventlet.sleep(3)
# Flush notified samples to test only new, nothing latent on # Flush notified samples to test only new, nothing latent on
# fake message bus. # fake message bus.
self.notified_samples = [] self.notified_samples = []
expected_samples = 1
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.notified_samples) >= expected_samples:
break
eventlet.sleep(0)
# we only got the new name of meters # we only got the new name of meters
for sample in self.notified_samples: for sample in self.notified_samples:
self.assertEqual('testanother', sample['counter_name']) self.assertEqual('testanother', sample['counter_name'])

View File

@ -17,7 +17,6 @@
import datetime import datetime
import uuid import uuid
import eventlet
import mock import mock
from oslo_config import fixture as fixture_config from oslo_config import fixture as fixture_config
from oslo_context import context from oslo_context import context
@ -116,7 +115,6 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
collector.stop()) collector.stop())
collector.start() collector.start()
eventlet.sleep()
publisher.publish_samples(context.RequestContext(), publisher.publish_samples(context.RequestContext(),
self.test_sample_data) self.test_sample_data)
collector.wait() collector.wait()
@ -219,32 +217,6 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
class TestPublisherPolicy(TestPublisher): class TestPublisherPolicy(TestPublisher):
def test_published_concurrency(self):
"""Test concurrent access to the local queue of the rpc publisher."""
publisher = self.publisher_cls(
netutils.urlsplit('%s://' % self.protocol))
with mock.patch.object(publisher, '_send') as fake_send:
def fake_send_wait(ctxt, topic, meters):
fake_send.side_effect = mock.Mock()
# Sleep to simulate concurrency and allow other threads to work
eventlet.sleep(0)
fake_send.side_effect = fake_send_wait
job1 = eventlet.spawn(getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
job2 = eventlet.spawn(getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
job1.wait()
job2.wait()
self.assertEqual('default', publisher.policy)
self.assertEqual(2, len(fake_send.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@mock.patch('ceilometer.publisher.messaging.LOG') @mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_no_policy(self, mylog): def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls( publisher = self.publisher_cls(

View File

@ -3,7 +3,6 @@
# process, which may cause wedges in the gate later. # process, which may cause wedges in the gate later.
retrying!=1.3.0,>=1.2.3 # Apache-2.0 retrying!=1.3.0,>=1.2.3 # Apache-2.0
eventlet>=0.17.4
jsonpath-rw-ext>=0.1.9 jsonpath-rw-ext>=0.1.9
jsonschema!=2.5.0,<3.0.0,>=2.0.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0
kafka-python>=0.9.2 # Apache-2.0 kafka-python>=0.9.2 # Apache-2.0

View File

@ -240,13 +240,13 @@ ceilometer.event.trait_plugin =
console_scripts = console_scripts =
ceilometer-api = ceilometer.cmd.api:main ceilometer-api = ceilometer.cmd.api:main
ceilometer-polling = ceilometer.cmd.eventlet.polling:main ceilometer-polling = ceilometer.cmd.polling:main
ceilometer-agent-notification = ceilometer.cmd.eventlet.agent_notification:main ceilometer-agent-notification = ceilometer.cmd.agent_notification:main
ceilometer-send-sample = ceilometer.cmd.eventlet.sample:send_sample ceilometer-send-sample = ceilometer.cmd.sample:send_sample
ceilometer-dbsync = ceilometer.cmd.eventlet.storage:dbsync ceilometer-dbsync = ceilometer.cmd.storage:dbsync
ceilometer-expirer = ceilometer.cmd.eventlet.storage:expirer ceilometer-expirer = ceilometer.cmd.storage:expirer
ceilometer-rootwrap = oslo_rootwrap.cmd:main ceilometer-rootwrap = oslo_rootwrap.cmd:main
ceilometer-collector = ceilometer.cmd.eventlet.collector:main ceilometer-collector = ceilometer.cmd.collector:main
ceilometer.dispatcher.meter = ceilometer.dispatcher.meter =
database = ceilometer.dispatcher.database:DatabaseDispatcher database = ceilometer.dispatcher.database:DatabaseDispatcher

View File

@ -9,7 +9,6 @@ deps = -r{toxinidir}/requirements.txt
install_command = pip install -U {opts} {packages} install_command = pip install -U {opts} {packages}
usedevelop = True usedevelop = True
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/unit OS_TEST_PATH=ceilometer/tests/unit
passenv = OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE passenv = OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE
commands = commands =
@ -44,7 +43,6 @@ commands =
[testenv:functional] [testenv:functional]
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/functional/ OS_TEST_PATH=ceilometer/tests/functional/
passenv = CEILOMETER_* passenv = CEILOMETER_*
commands = commands =
@ -52,7 +50,6 @@ commands =
[testenv:py34-functional] [testenv:py34-functional]
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/functional/ OS_TEST_PATH=ceilometer/tests/functional/
basepython = python3.4 basepython = python3.4
passenv = CEILOMETER_* passenv = CEILOMETER_*
@ -61,7 +58,6 @@ commands =
[testenv:integration] [testenv:integration]
setenv = VIRTUAL_ENV={envdir} setenv = VIRTUAL_ENV={envdir}
EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=./ceilometer/tests/integration OS_TEST_PATH=./ceilometer/tests/integration
OS_TEST_TIMEOUT=2400 OS_TEST_TIMEOUT=2400
GABBI_LIVE_FAIL_IF_NO_TEST=1 GABBI_LIVE_FAIL_IF_NO_TEST=1