Extend cluster provision logging of MapR plugin

- Add provision events for Node Manage
- Add provision events for Services
- Fix little bug with displaying zero
  progress in event log

Change-Id: I19b5d9f212b7405f0b4d5d7a6f726d0b8c674c13
This commit is contained in:
Vitaliy Levitski 2017-01-20 12:21:32 +02:00 committed by Vitaly Gridnev
parent 108a10bb55
commit 1d77977a5e
9 changed files with 81 additions and 25 deletions

View File

@ -213,15 +213,24 @@ class BaseConfigurer(ac.AbstractConfigurer):
def _configure_database(self, cluster_context, instances):
mysql_instance = mysql.MySQL.get_db_instance(cluster_context)
distro_name = cluster_context.distro.name
mysql.MySQL.install_mysql(mysql_instance, distro_name)
mysql.MySQL.start_mysql_server(cluster_context)
mysql.MySQL.create_databases(cluster_context, instances)
@el.provision_event(instance=mysql_instance,
name=_("Configure database"))
def decorated():
distro_name = cluster_context.distro.name
mysql.MySQL.install_mysql(mysql_instance, distro_name)
mysql.MySQL.start_mysql_server(cluster_context)
mysql.MySQL.create_databases(cluster_context, instances)
decorated()
def _post_install_services(self, cluster_context, instances):
LOG.debug('Executing service post install hooks')
for s in cluster_context.cluster_services:
s.post_install(cluster_context, instances)
service_instances = cluster_context.filter_instances(instances,
service=s)
if service_instances:
s.post_install(cluster_context, instances)
LOG.info(_LI('Post install hooks execution successfully executed'))
def _update_cluster_info(self, cluster_context):
@ -230,8 +239,8 @@ class BaseConfigurer(ac.AbstractConfigurer):
'Password': pu.get_mapr_password
(cluster_context.cluster)}}
for service in cluster_context.cluster_services:
for title, node_process, ui_info in (service.get_ui_info
(cluster_context)):
for title, node_process, ui_info in (
service.get_ui_info(cluster_context)):
removed = cluster_context.removed_instances(node_process)
instances = cluster_context.get_instances(node_process)
instances = [i for i in instances if i not in removed]

View File

@ -26,6 +26,8 @@ import sahara.plugins.exceptions as ex
import sahara.plugins.mapr.abstract.node_manager as s
import sahara.plugins.mapr.services.management.management as mng
import sahara.plugins.mapr.services.maprfs.maprfs as mfs
import sahara.plugins.mapr.util.event_log as el
from sahara.utils import cluster_progress_ops as cpo
LOG = logging.getLogger(__name__)
@ -79,15 +81,27 @@ class BaseNodeManager(s.AbstractNodeManager):
others = filter(
lambda i: not cluster_context.check_for_process(i, mfs.CLDB),
instances)
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Start ZooKeepers nodes"), len(zookeepers))
self._start_zk_nodes(zookeepers)
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Start CLDB nodes"), len(cldbs))
self._start_cldb_nodes(cldbs)
self._start_non_cldb_nodes(others)
if others:
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Start non-CLDB nodes"),
len(list(others)))
self._start_non_cldb_nodes(others)
self._await_cldb(cluster_context, instances)
def stop(self, cluster_context, instances=None):
instances = instances or cluster_context.get_instances()
zookeepers = cluster_context.filter_instances(instances, mng.ZOOKEEPER)
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Stop ZooKeepers nodes"), len(zookeepers))
self._stop_zk_nodes(zookeepers)
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Stop Warden nodes"), len(instances))
self._stop_warden_on_nodes(instances)
def _await_cldb(self, cluster_context, instances=None, timeout=600):
@ -163,9 +177,11 @@ class BaseNodeManager(s.AbstractNodeManager):
command=cmd, ip=instance.internal_ip))
r.execute_command(cmd, run_as_root=True)
@el.provision_event(instance_reference=1)
def _start_service(self, instance, service):
return self._do_service_action(instance, service, START)
@el.provision_event(instance_reference=1)
def _stop_service(self, instance, service):
return self._do_service_action(instance, service, STOP)

View File

@ -93,8 +93,8 @@ class Service(object):
g.execute_on_instances(_instances,
self._install_packages_on_instance,
_context)
_install(cluster_context, service_instances)
if service_instances:
_install(cluster_context, service_instances)
@el.provision_event(instance_reference=1)
def _install_packages_on_instance(self, instance, cluster_context):

View File

@ -15,10 +15,12 @@
from oslo_log import log as logging
from sahara.i18n import _
import sahara.plugins.mapr.domain.configuration_file as bcf
import sahara.plugins.mapr.domain.node_process as np
import sahara.plugins.mapr.domain.service as s
import sahara.plugins.mapr.services.sentry.sentry as sentry
import sahara.plugins.mapr.util.event_log as el
import sahara.plugins.mapr.util.validation_utils as vu
import sahara.utils.files as files
@ -137,7 +139,17 @@ class Hive(s.Service):
self._create_sentry_role(cluster_context)
def _create_sentry_role(self, cluster_context):
instance = cluster_context.get_instance(HIVE_METASTORE)
@el.provision_event(name=_("Create Sentry role for Hive"))
def _create_role(instance):
cmd = 'sudo -u mapr hive -e "create role admin_role;' \
'grant all on server HS2 to role admin_role;' \
'grant role admin_role to group mapr;"'
with instance.remote() as r:
LOG.debug("Creating hive role for sentry")
r.execute_command(cmd, raise_when_error=False)
hive_host = cluster_context.get_instance(HIVE_METASTORE)
sentry_host = cluster_context.get_instance(sentry.SENTRY)
if sentry_host:
sentry_mode = cluster_context._get_cluster_config_value(
@ -148,12 +160,7 @@ class Hive(s.Service):
sentry_service = cluster_context. \
_find_service_instance(ui_name, sentry_version)
if sentry_service.supports(self, sentry_mode):
cmd = 'sudo -u mapr hive -e "create role admin_role;' \
'grant all on server HS2 to role admin_role;' \
'grant role admin_role to group mapr;"'
with instance.remote() as r:
LOG.debug("Creating hive role for sentry")
r.execute_command(cmd, raise_when_error=False)
_create_role(hive_host)
class HiveV013(Hive):

View File

@ -34,6 +34,7 @@ import sahara.plugins.mapr.services.sentry.sentry as sentry
import sahara.plugins.mapr.services.spark.spark as spark
import sahara.plugins.mapr.services.sqoop.sqoop2 as sqoop
import sahara.plugins.mapr.services.yarn.yarn as yarn
import sahara.plugins.mapr.util.event_log as el
import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.password_utils as pu
from sahara.plugins.mapr.util import service_utils as su
@ -192,6 +193,8 @@ class Hue(s.Service):
def post_install(self, cluster_context, instances):
hue_instance = cluster_context.get_instance(HUE)
@el.provision_event(name=_("Migrating Hue database"),
instance=hue_instance)
def migrate_database(remote, cluster_context):
hue_home = self.home_dir(cluster_context)
cmd = '%(activate)s && %(syncdb)s && %(migrate)s'
@ -272,6 +275,8 @@ class Hue(s.Service):
}
return path % args
@el.provision_event(name="Install Hue Job Tracker plugin",
instance_reference=2)
def _install_jt_plugin(self, cluster_context, hue_instance):
LOG.debug("Copying Hue JobTracker plugin")
job_trackers = cluster_context.get_instances(mr.JOB_TRACKER)

View File

@ -21,9 +21,11 @@ from sahara.i18n import _LI
import sahara.plugins.mapr.domain.configuration_file as bcf
import sahara.plugins.mapr.domain.node_process as np
import sahara.plugins.mapr.domain.service as s
import sahara.plugins.mapr.util.event_log as el
import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.validation_utils as vu
import sahara.plugins.provisioning as p
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import files
LOG = logging.getLogger(__name__)
@ -105,12 +107,15 @@ class MapRFS(s.Service):
LOG.debug('Initializing MapR FS')
instances = instances or cluster_context.get_instances()
file_servers = cluster_context.filter_instances(instances, FILE_SERVER)
cpo.add_provisioning_step(cluster_context.cluster.id,
_("Initializing MapR-FS"), len(file_servers))
with context.ThreadGroup() as tg:
for instance in file_servers:
tg.spawn('init-mfs-%s' % instance.id,
self._init_mfs_instance, instance)
LOG.info(_LI('MapR FS successfully initialized'))
@el.provision_event(instance_reference=1)
def _init_mfs_instance(self, instance):
self._generate_disk_list_file(instance, self._CREATE_DISK_LIST)
self._execute_disksetup(instance)

View File

@ -15,10 +15,12 @@
from oslo_log import log as logging
import sahara.context as context
from sahara.i18n import _
import sahara.plugins.mapr.domain.configuration_file as bcf
import sahara.plugins.mapr.domain.node_process as np
import sahara.plugins.mapr.domain.service as s
import sahara.plugins.mapr.services.mysql.mysql as mysql
import sahara.plugins.mapr.util.event_log as el
import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.validation_utils as vu
@ -115,6 +117,7 @@ class Oozie(s.Service):
instances = cluster_context.filter_instances(instances, OOZIE)
self._rebuild(cluster_context, instances)
@el.provision_event(instance_reference=1)
@g.remote_command(1)
def _rebuild_oozie_war(self, remote, cluster_context):
cmd = '%(home)s/bin/oozie-setup.sh -hadoop %(version)s' \
@ -128,6 +131,7 @@ class Oozie(s.Service):
instances = cluster_context.filter_instances(instances, OOZIE)
self._rebuild(cluster_context, instances)
@el.provision_step(_("Rebuilt Oozie war"))
def _rebuild(self, cluster_context, instances):
OOZIE.stop(filter(OOZIE.is_started, instances))
g.execute_on_instances(

View File

@ -17,6 +17,7 @@ import sahara.plugins.mapr.domain.configuration_file as cf
import sahara.plugins.mapr.domain.node_process as np
import sahara.plugins.mapr.domain.service as s
import sahara.plugins.mapr.services.mysql.mysql as mysql
import sahara.plugins.mapr.util.event_log as el
import sahara.plugins.mapr.util.maprfs_helper as mfs
import sahara.plugins.provisioning as p
import sahara.utils.files as files
@ -57,9 +58,9 @@ class Sentry(s.Service):
return [Sentry.SENTRY_STORAGE_MODE]
def get_config_files(self, cluster_context, configs, instance=None):
sentry_default =\
sentry_default = \
'plugins/mapr/services/sentry/resources/sentry-default.xml'
global_policy_template =\
global_policy_template = \
'plugins/mapr/services/sentry/resources/global-policy.ini'
sentry_site = cf.HadoopXML('sentry-site.xml')
sentry_site.remote_path = self.conf_dir(cluster_context)
@ -112,8 +113,13 @@ class Sentry(s.Service):
' --conffile %(home)s/conf/sentry-site.xml' \
' --dbType mysql --initSchema' % {
'home': self.home_dir(cluster_context)}
with instance.remote() as r:
r.execute_command(cmd, run_as_root=True)
@el.provision_event(name=_("Init Sentry DB schema"), instance=instance)
def decorated():
with instance.remote() as r:
r.execute_command(cmd, run_as_root=True)
decorated()
def post_start(self, cluster_context, instances):
sentry_host = cluster_context.get_instance(SENTRY)

View File

@ -35,16 +35,20 @@ def provision_step(name, cluster_context_reference=1, instances_reference=2):
return wrapper
def provision_event(instance_reference=0):
def provision_event(instance_reference=0, name=None, instance=None):
def wrapper(function):
def wrapped(*args, **kwargs):
instance = _find_argument(instance_reference, *args, **kwargs)
event_instance = instance or _find_argument(instance_reference,
*args, **kwargs)
if name:
cpo.add_provisioning_step(event_instance.node_group.cluster.id,
name, 1)
try:
result = function(*args, **kwargs)
cpo.add_successful_event(instance)
cpo.add_successful_event(event_instance)
return result
except Exception as exception:
cpo.add_fail_event(instance, exception)
cpo.add_fail_event(event_instance, exception)
raise exception
return wrapped