Skip offline nodes

Apparently, shotgun never tested which nodes are online
prior to execute snapshot method of objects. Therefore, for offline
nodes shotgun will then consume a lot of time waiting for connection
timeouts to be happened on per object basis.

This patch totally rework how shotgun handles the objects:
* objects being processed on per host basis
* on NetworkError, all affected objects on per host basis will be
  postponed for processing at the end of queue.
* if postponed objects still couldn't be proccessed on the last
  attempt, they will then be converted into offline objects.
* adds `Offline` object driver. Being snapshotted, this object just
  creates a file with friendly reminder that particular node was
  offline inside of snapshot target directory. So, one which's
  looking at snapshot archive will effectively realize that node
  was offline without deep diving into shotgun log file.

So, shotgun won't waste time or per object basis, as it was before.

Change-Id: Ib99476c7b67a04d4f472f3fa3803b3fb92d4fec4
Closes-Bug: #1397038
This commit is contained in:
Alexander Gordeev 2015-11-03 19:32:24 +03:00
parent 25dd78a311
commit c377d16351
7 changed files with 234 additions and 9 deletions

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import deque
import copy
import logging
import time
@ -23,8 +25,16 @@ logger = logging.getLogger(__name__)
class Config(object):
def __init__(self, data=None):
self.data = data
self.data = data or {}
self.time = time.localtime()
self.offline_hosts = set()
self.objs = deque()
self.try_again = deque()
for properties in self.data.get("dump", {}).itervalues():
for host in properties.get("hosts", []):
for object_ in properties.get("objects", []):
object_["host"] = host
self.objs.append(copy.copy(object_))
def _timestamp(self, name):
return "{0}-{1}".format(
@ -55,13 +65,44 @@ class Config(object):
def lastdump(self):
return self.data.get("lastdump", settings.LASTDUMP)
@staticmethod
def get_network_address(obj):
"""Returns network address of object."""
return obj["host"].get('address', '127.0.0.1')
def on_network_error(self, obj):
"""Lets the object to have another attempt for being proccessed."""
host = self.get_network_address(obj)
logger.debug("Remote host %s is unreachable. "
"Processing of its objects postponed.", host)
self.try_again.append(obj)
self.offline_hosts.add(host)
@property
def objects(self):
for role, properties in self.data["dump"].iteritems():
for host in properties.get("hosts", []):
for object_ in properties.get("objects", []):
object_["host"] = host
yield object_
"""Stateful generator for processing objects.
It should be used in conjunction with on_network_error() to give
another try for objects which threw NetworkError.
"""
for _ in range(settings.ATTEMPTS):
while self.objs:
obj = self.objs.popleft()
if self.get_network_address(obj) not in self.offline_hosts:
yield obj
else:
self.try_again.append(obj)
self.offline_hosts.clear()
self.objs, self.try_again = self.try_again, deque()
for obj in self.objs:
obj["type"] = 'offline'
host = self.get_network_address(obj)
if host not in self.offline_hosts:
self.offline_hosts.add(host)
yield obj
else:
logger.debug("Skipping offline object processing: %s", obj)
@property
def timeout(self):

View File

@ -22,6 +22,7 @@ import sys
import xmlrpclib
import fabric.api
import fabric.exceptions
from shotgun import utils
@ -53,6 +54,7 @@ class Driver(object):
"postgres": Postgres,
"xmlrpc": XmlRpc,
"command": Command,
"offline": Offline,
}.get(driver_type, cls)(data, conf)
def __init__(self, data, conf):
@ -98,8 +100,11 @@ class Driver(object):
logger.debug("Running local command: %s", command)
out.return_code, out.stdout, out.stderr = utils.execute(
command)
except fabric.exceptions.NetworkError as e:
logger.error("NetworkError occured: %s", str(e))
raise
except Exception as e:
logger.error("Error occured: %s", str(e))
logger.error("Unexpected error occured: %s", str(e))
out.stdout = raw_stdout.getvalue()
return out
@ -131,8 +136,11 @@ class Driver(object):
utils.execute('mkdir -p "{0}"'.format(target_path))
return utils.execute('cp -r "{0}" "{1}"'.format(path,
target_path))
except fabric.exceptions.NetworkError as e:
logger.error("NetworkError occured: %s", str(e))
raise
except Exception as e:
logger.error("Error occured: %s", str(e))
logger.error("Unexpected error occured: %s", str(e))
class File(Driver):
@ -256,3 +264,19 @@ class Command(Driver):
f.write("\n===== STDERR =====:\n")
if out.stderr:
f.write(out.stderr)
class Offline(Driver):
def __init__(self, data, conf):
super(Offline, self).__init__(data, conf)
self.target_path = os.path.join(
self.conf.target, self.host, "OFFLINE_NODE.txt")
def snapshot(self):
if not os.path.exists(self.target_path):
utils.execute('mkdir -p "{0}"'.format(os.path.dirname(
self.target_path)))
with open(self.target_path, "w") as f:
f.write("Host {0} with IP {1} was offline/unreachable during "
"logs obtaining.\n".format(self.host, self.addr))

View File

@ -15,6 +15,8 @@
import logging
import os
import fabric.exceptions
from shotgun.driver import Driver
from shotgun import utils
@ -33,7 +35,10 @@ class Manager(object):
for obj_data in self.conf.objects:
logger.debug("Dumping: %s", obj_data)
driver = Driver.getDriver(obj_data, self.conf)
driver.snapshot()
try:
driver.snapshot()
except fabric.exceptions.NetworkError:
self.conf.on_network_error(obj_data)
logger.debug("Archiving dump directory: %s", self.conf.target)
utils.compress(self.conf.target, self.conf.compression_level)

View File

@ -18,3 +18,4 @@ TIMESTAMP = True
COMPRESSION_LEVEL = 3
LOG_FILE = "/var/log/shotgun.log"
DEFAULT_TIMEOUT = 10
ATTEMPTS = 2

View File

@ -56,3 +56,75 @@ class TestConfig(base.BaseTestCase):
'timeout': timeout,
})
self.assertEqual(conf.timeout, timeout)
def test_on_network_error(self):
data = {
"dump": {
"master": {
"objects":
[{"path": "/etc/nailgun",
"type": "dir"},
],
"hosts": [{"ssh-key": "/root/.ssh/id_rsa",
"address": "10.109.2.2"}]},
}
}
conf = Config(data)
obj = conf.objects.next()
host = conf.get_network_address(obj)
self.assertNotIn(obj, conf.try_again)
self.assertNotIn(host, conf.offline_hosts)
conf.on_network_error(obj)
self.assertIn(obj, conf.try_again)
self.assertIn(host, conf.offline_hosts)
def test_get_network_address(self):
data = {
"dump": {
"master": {
"objects":
[{"path": "/etc/nailgun",
"type": "dir"},
],
"hosts": [{"ssh-key": "/root/.ssh/id_rsa",
"address": "10.109.2.2"}]},
}
}
conf = Config(data)
obj = conf.objects.next()
self.assertEqual('10.109.2.2', conf.get_network_address(obj))
def test_get_network_address_default(self):
data = {
"dump": {
"master": {
"objects":
[{"path": "/etc/nailgun",
"type": "dir"},
],
"hosts": [{"ssh-key": "/root/.ssh/id_rsa",
"hostname": "fuel.tld"}]},
}
}
conf = Config(data)
obj = conf.objects.next()
self.assertEqual('127.0.0.1', conf.get_network_address(obj))
def test_init(self):
data = {
"dump": {
"fake_role1": {
"objects":
[{"fake_obj_1": '1'}, {"fake_obj_2": '2'}],
"hosts": ["fake_host1", "fake_host2", "fake_host3"]},
}
}
conf = Config(data)
expected_objs = [
{'host': 'fake_host1', 'fake_obj_1': '1'},
{'host': 'fake_host1', 'fake_obj_2': '2'},
{'host': 'fake_host2', 'fake_obj_1': '1'},
{'host': 'fake_host2', 'fake_obj_2': '2'},
{'host': 'fake_host3', 'fake_obj_1': '1'},
{'host': 'fake_host3', 'fake_obj_2': '2'}]
self.assertItemsEqual(expected_objs, conf.objs)

View File

@ -223,3 +223,35 @@ class TestFile(base.BaseTestCase):
mget.assert_called_with(data["path"], target_path)
mremove.assert_called_with(dir_driver.full_dst_path, data['exclude'])
class TestOffline(base.BaseTestCase):
@mock.patch('shotgun.driver.open', create=True,
new_callable=mock.mock_open)
@mock.patch('shotgun.driver.utils.execute', autospec=True)
@mock.patch('shotgun.driver.os', autospec=True)
def test_snapshot(self, mos, mexec, mopen):
data = {
"type": "offline",
"path": "/remote_dir/remote_file",
"host": {
"hostname": "remote_host",
"address": "10.109.0.2",
},
}
conf = mock.MagicMock()
conf.target = "/target"
target_path = "/target/remote_host/OFFLINE_NODE.txt"
mos.path.exists.return_value = False
mos.path.dirname.return_value = '/target/remote_host'
mos.path.join.return_value = target_path
offline_driver = shotgun.driver.Offline(data, conf)
offline_driver.snapshot()
file_handle_mock = mopen.return_value.__enter__.return_value
file_handle_mock.write.assert_called_once_with(
'Host remote_host with IP 10.109.0.2 was offline/unreachable '
'during logs obtaining.\n')
mopen.assert_called_once_with(target_path, 'w')
mexec.assert_called_once_with('mkdir -p "/target/remote_host"')
self.assertEqual(target_path, offline_driver.target_path)

View File

@ -12,10 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import deque
import tempfile
import fabric.exceptions
import mock
from shotgun.config import Config
from shotgun.manager import Manager
from shotgun.test import base
@ -41,3 +44,50 @@ class TestManager(base.BaseTestCase):
manager.snapshot()
mget.assert_called_once_with(data, conf)
mexecute.assert_called_once_with('rm -rf /target')
@mock.patch('shotgun.manager.Driver.getDriver')
@mock.patch('shotgun.manager.utils.execute')
@mock.patch('shotgun.manager.utils.compress')
def test_snapshot_network_error(self, mcompress, mexecute, mget):
objs = [
{"type": "file",
"path": "/remote_file1",
"host": {"address": "remote_host1"},
},
{"type": "dir",
"path": "/remote_dir1",
"host": {"address": "remote_host1"},
},
{"type": "file",
"path": "/remote_file1",
"host": {"address": "remote_host2"},
},
]
drv = mock.MagicMock()
drv.snapshot.side_effect = [
fabric.exceptions.NetworkError,
None,
fabric.exceptions.NetworkError,
None,
]
mget.return_value = drv
conf = Config()
conf.objs = deque(objs)
offline_obj = {
'path': '/remote_file1',
'host': {'address': 'remote_host1'},
'type': 'offline',
}
processed_obj = {
'path': '/remote_file1',
'host': {'address': 'remote_host2'},
'type': 'file',
}
manager = Manager(conf)
manager.snapshot()
self.assertEquals([mock.call(offline_obj, conf),
mock.call(processed_obj, conf),
mock.call(offline_obj, conf),
mock.call(offline_obj, conf)],
mget.call_args_list)
mexecute.assert_called_once_with('rm -rf /tmp')