Adding Python Jobs using Pyleus

In order to allow user to develop storm topologies that we can also can
call storm jobs in a pure python form we are adding support to Pyleus in
Sahara.
Pyleus is a framework that allows the creation of Storm topologies in
python and uses yaml to wire how the flow is going to work.

Change-Id: I3a657164c91f1c7705d47882a1334925adf8db39
Partially-implements: blueprint python-storm-jobs
This commit is contained in:
Telles Nobrega 2016-07-08 19:46:57 -03:00
parent 27253b9969
commit 9789e6c596
11 changed files with 174 additions and 64 deletions

View File

@ -47,6 +47,7 @@ def generate_storm_config(master_hostname, zk_hostnames, version):
if version == '1.0.1':
host_cfg = 'nimbus.seeds'
master_value = [master_hostname.encode('ascii', 'ignore')]
else:
host_cfg = 'nimbus.host'
master_value = master_hostname.encode('ascii', 'ignore')
@ -62,6 +63,13 @@ def generate_storm_config(master_hostname, zk_hostnames, version):
"storm.local.dir": "/app/storm"
}
# Since pyleus is built using previous versions os Storm we need this
# option to allow the cluster to be compatible with pyleus topologies as
# well as with topologies built using older versions of Storm
if version == '1.0.1':
cfg['client.jartransformer.class'] = (
"org.apache.storm.hack.StormShadeTransformer")
return cfg

View File

@ -18,18 +18,35 @@ from sahara.i18n import _
from sahara.service.edp.storm import engine as edp_engine
class EdpEngine(edp_engine.StormJobEngine):
class EdpStormEngine(edp_engine.StormJobEngine):
edp_base_version = "0.9.2"
@staticmethod
def edp_supported(version):
return version >= EdpEngine.edp_base_version
return version >= EdpStormEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Storm {base} required to run {type} jobs').format(
base=EdpEngine.edp_base_version, type=job.type))
base=EdpStormEngine.edp_base_version, type=job.type))
super(EdpEngine, self).validate_job_execution(cluster, job, data)
super(EdpStormEngine, self).validate_job_execution(cluster, job, data)
class EdpPyleusEngine(edp_engine.StormPyleusJobEngine):
edp_base_version = "0.9.2"
@staticmethod
def edp_supported(version):
return version >= EdpPyleusEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Storm {base} required to run {type} jobs').format(
base=EdpPyleusEngine.edp_base_version, type=job.type))
super(EdpPyleusEngine, self).validate_job_execution(cluster, job, data)

View File

@ -103,22 +103,27 @@ class StormProvider(p.ProvisioningPluginBase):
self._set_cluster_info(cluster)
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpEngine.get_supported_job_types():
return edp_engine.EdpEngine(cluster)
if job_type in edp_engine.EdpStormEngine.get_supported_job_types():
return edp_engine.EdpStormEngine(cluster)
if job_type in edp_engine.EdpPyleusEngine.get_supported_job_types():
return edp_engine.EdpPyleusEngine(cluster)
return None
def get_edp_job_types(self, versions=None):
res = {}
for vers in self.get_versions():
if not versions or vers in versions:
if edp_engine.EdpEngine.edp_supported(vers):
res[vers] = edp_engine.EdpEngine.get_supported_job_types()
storm_engine = edp_engine.EdpStormEngine
pyleus_engine = edp_engine.EdpPyleusEngine
res[vers] = (storm_engine.get_supported_job_types() +
pyleus_engine.get_supported_job_types())
return res
def get_edp_config_hints(self, job_type, version):
if edp_engine.EdpEngine.edp_supported(version):
return edp_engine.EdpEngine.get_possible_job_config(job_type)
if edp_engine.EdpStormEngine.edp_supported(version):
return edp_engine.EdpStormEngine.get_possible_job_config(job_type)
if edp_engine.EdpPyleusEngine.edp_supported(version):
return edp_engine.EdpPyleusEngine.get_possible_job_config(job_type)
return {}
def get_open_ports(self, node_group):

View File

@ -43,7 +43,8 @@ conductor = c.API
ENGINES = [oozie_engine.OozieJobEngine,
spark_engine.SparkJobEngine,
storm_engine.StormJobEngine]
storm_engine.StormJobEngine,
storm_engine.StormPyleusJobEngine]
def _get_job_type(job_execution):

View File

@ -74,11 +74,16 @@ class StormJobEngine(base_engine.JobEngine):
return topology_name
def _set_topology_name(self, job_execution, name):
return self._generate_topology_name(name)
def _generate_topology_name(self, name):
return name + "_" + six.text_type(uuid.uuid4())
def _get_job_status_from_remote(self, job_execution):
topology_name, inst_id = self._get_instance_if_running(job_execution)
def _get_job_status_from_remote(self, job_execution, retries=3):
topology_name, inst_id = self._get_instance_if_running(
job_execution)
if topology_name is None or inst_id is None:
return edp.JOB_STATUSES_TERMINATED
@ -93,14 +98,16 @@ class StormJobEngine(base_engine.JobEngine):
"host": master.hostname(),
"topology_name": topology_name
})
with remote.get_remote(master) as r:
ret, stdout = r.execute_command("%s " % (cmd))
# If the status is ACTIVE is there, it's still running
if stdout.strip() == "ACTIVE":
return {"status": edp.JOB_STATUS_RUNNING}
else:
return {"status": edp.JOB_STATUS_KILLED}
for i in range(retries):
with remote.get_remote(master) as r:
ret, stdout = r.execute_command("%s " % (cmd))
# If the status is ACTIVE is there, it's still running
if stdout.strip() == "ACTIVE":
return {"status": edp.JOB_STATUS_RUNNING}
else:
if i == retries - 1:
return {"status": edp.JOB_STATUS_KILLED}
context.sleep(10)
def _job_script(self):
path = "service/edp/resources/launch_command.py"
@ -163,7 +170,47 @@ class StormJobEngine(base_engine.JobEngine):
def get_job_status(self, job_execution):
topology_name, instance = self._get_instance_if_running(job_execution)
if instance is not None:
return self._get_job_status_from_remote(job_execution)
return self._get_job_status_from_remote(job_execution, retries=3)
def _execute_remote_job(self, master, wf_dir, cmd):
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
# The redirects of stdout and stderr will preserve output in the wf_dir
with remote.get_remote(master) as r:
# Upload the command launch script
launch = os.path.join(wf_dir, "launch_command")
r.write_file_to(launch, self._job_script())
r.execute_command("chmod +x %s" % launch)
ret, stdout = r.execute_command(
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
% (wf_dir, cmd))
return ret, stdout
def _build_command(self, paths, updated_job_configs, host, topology_name):
app_jar = paths.pop(0)
job_class = updated_job_configs["configs"]["edp.java.main_class"]
args = updated_job_configs.get('args', [])
args = " ".join([arg for arg in args])
if args:
args = " " + args
cmd = (
'%(storm_jar)s -c nimbus.host=%(host)s %(job_jar)s '
'%(main_class)s %(topology_name)s%(args)s' % (
{
"storm_jar": "/usr/local/storm/bin/storm jar",
"main_class": job_class,
"job_jar": app_jar,
"host": host,
"topology_name": topology_name,
"args": args
}))
return cmd
def run_job(self, job_execution):
ctx = context.ctx()
@ -202,47 +249,18 @@ class StormJobEngine(base_engine.JobEngine):
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) for p in paths]
app_jar = paths.pop(0)
job_class = updated_job_configs["configs"]["edp.java.main_class"]
topology_name = self._generate_topology_name(job.name)
topology_name = self._set_topology_name(job_execution, job.name)
# Launch the storm job using storm jar
host = master.hostname()
args = updated_job_configs.get('args', [])
args = " ".join([arg for arg in args])
if args:
args = " " + args
cmd = (
'%(storm_jar)s -c nimbus.host=%(host)s %(job_jar)s '
'%(main_class)s %(topology_name)s%(args)s' % (
{
"storm_jar": "/usr/local/storm/bin/storm jar",
"main_class": job_class,
"job_jar": app_jar,
"host": host,
"topology_name": topology_name,
"args": args
}))
cmd = self._build_command(paths, updated_job_configs, host,
topology_name)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
# The redirects of stdout and stderr will preserve output in the wf_dir
with remote.get_remote(master) as r:
# Upload the command launch script
launch = os.path.join(wf_dir, "launch_command")
r.write_file_to(launch, self._job_script())
r.execute_command("chmod +x %s" % launch)
ret, stdout = r.execute_command(
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
% (wf_dir, cmd))
ret, stdout = self._execute_remote_job(master, wf_dir, cmd)
if ret == 0:
# Success, we'll add the wf_dir in job_execution.extra and store
# topology_name@instance_id as the job id
@ -271,3 +289,36 @@ class StormJobEngine(base_engine.JobEngine):
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_STORM]
class StormPyleusJobEngine(StormJobEngine):
def _build_command(self, paths, updated_job_configs, host, topology_name):
jar_file = paths.pop(0)
cmd = ("{pyleus} -n {nimbus_host} {jar_file}").format(
pyleus='pyleus submit', nimbus_host=host, jar_file=jar_file)
return cmd
def validate_job_execution(self, cluster, job, data):
j.check_topology_name_present(data, job)
def _set_topology_name(self, job_execution, name):
topology_name = job_execution["configs"]["topology_name"]
return topology_name
def _execute_remote_job(self, master, wf_dir, cmd):
with remote.get_remote(master) as r:
ret, stdout = r.execute_command(
"cd %s; %s > /dev/null 2>&1 & echo $!"
% (wf_dir, cmd))
return ret, stdout
@staticmethod
def get_possible_job_config(job_type):
return {'job_config': {'configs': [], 'args': []}}
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_PYLEUS]

View File

@ -37,9 +37,10 @@ def check_mains_libs(data, **kwargs):
# These types must have a value in mains and may also use libs
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE,
edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK,
edp.JOB_TYPE_STORM]:
edp.JOB_TYPE_STORM, edp.JOB_TYPE_PYLEUS]:
if not mains:
if job_type in [edp.JOB_TYPE_SPARK, edp.JOB_TYPE_STORM]:
if job_type in [edp.JOB_TYPE_SPARK, edp.JOB_TYPE_STORM,
edp.JOB_TYPE_PYLEUS]:
msg = _(
"%s job requires main application jar") % data.get("type")
else:

View File

@ -48,6 +48,21 @@ def check_main_class_present(data, job):
_('%s job must specify edp.java.main_class') % job.type)
def _is_topology_name_present(data):
if data:
val = data.get(
'job_configs', {}).get(
'configs', {}).get('topology_name', None)
return val and isinstance(val, six.string_types)
return False
def check_topology_name_present(data, job):
if not _is_main_class_present(data):
raise ex.InvalidDataException(
_('%s job must specify topology_name') % job.type)
def _streaming_present(data):
try:
streaming = set(('edp.streaming.mapper',

View File

@ -206,10 +206,18 @@ class StormPluginTest(base.SaharaWithDbTestCase):
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance(plugin.get_edp_engine(cluster, job_type), eng)
def test_plugin092_edp_engine(self):
def test_plugin092_edp_storm_engine(self):
self._test_engine('0.9.2', edp.JOB_TYPE_STORM,
engine.StormJobEngine)
def test_plugin101_edp_engine(self):
def test_plugin092_edp_storm_pyleus_engine(self):
self._test_engine('0.9.2', edp.JOB_TYPE_PYLEUS,
engine.StormJobEngine)
def test_plugin101_edp_storm_engine(self):
self._test_engine('1.0.1', edp.JOB_TYPE_STORM,
engine.StormJobEngine)
def test_plugin101_edp_storm_pyleus_engine(self):
self._test_engine('1.0.1', edp.JOB_TYPE_PYLEUS,
engine.StormJobEngine)

View File

@ -155,7 +155,7 @@ class TestStorm(base.SaharaTestCase):
edp.JOB_STATUS_RUNNING}
status = eng.get_job_status(job_exec)
_get_job_status_from_remote.assert_called_with(eng,
job_exec)
job_exec, 3)
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
@mock.patch.object(se.StormJobEngine,

View File

@ -80,7 +80,8 @@ _job_types = [
_job(edp.JOB_TYPE_JAVA, [_configs(), _args()]),
_job(edp.JOB_TYPE_SHELL, [_configs(), _params(), _args()]),
_job(edp.JOB_TYPE_SPARK, [_configs(), _args()]),
_job(edp.JOB_TYPE_STORM, [_args()])
_job(edp.JOB_TYPE_STORM, [_args()]),
_job(edp.JOB_TYPE_PYLEUS, [])
]

View File

@ -57,6 +57,7 @@ JOB_TYPE_JAVA = 'Java'
JOB_TYPE_MAPREDUCE = 'MapReduce'
JOB_TYPE_SPARK = 'Spark'
JOB_TYPE_STORM = 'Storm'
JOB_TYPE_PYLEUS = 'Storm.Pyleus'
JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP +
JOB_SUBTYPE_STREAMING)
JOB_TYPE_PIG = 'Pig'
@ -71,7 +72,8 @@ JOB_TYPES_ALL = [
JOB_TYPE_PIG,
JOB_TYPE_SHELL,
JOB_TYPE_SPARK,
JOB_TYPE_STORM
JOB_TYPE_STORM,
JOB_TYPE_PYLEUS
]
JOB_TYPES_ACCEPTABLE_CONFIGS = {
@ -82,7 +84,8 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = {
JOB_TYPE_JAVA: {"configs", "args"},
JOB_TYPE_SHELL: {"configs", "params", "args"},
JOB_TYPE_SPARK: {"configs", "args"},
JOB_TYPE_STORM: {"args"}
JOB_TYPE_STORM: {"args"},
JOB_TYPE_PYLEUS: {}
}
# job actions