Merge "Moved get_oozie_server from plugin SPI to edp_engine"
This commit is contained in:
commit
22b49d6ecd
|
@ -129,13 +129,6 @@ When user terminates cluster, Sahara simply shuts down all the cluster VMs. This
|
|||
|
||||
*Returns*: None
|
||||
|
||||
get_oozie_server(cluster)
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Returns the instance object for the host running the Oozie server (this service may be referenced by a vendor-dependent identifier)
|
||||
|
||||
*Returns*: The Oozie server instance object
|
||||
|
||||
Object Model
|
||||
============
|
||||
|
||||
|
|
|
@ -37,3 +37,6 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
|||
def get_resource_manager_uri(self, cluster):
|
||||
resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn()
|
||||
return '%s:8032' % resourcemanager_ip
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return cu.get_oozie(cluster)
|
||||
|
|
|
@ -75,9 +75,6 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
|
|||
vl.validate_existing_ng_scaling(cluster, existing)
|
||||
vl.validate_additional_ng_scaling(cluster, additional)
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return cu.get_oozie(cluster)
|
||||
|
||||
def _set_cluster_info(self, cluster):
|
||||
mng = cu.get_manager(cluster)
|
||||
info = {
|
||||
|
|
|
@ -137,9 +137,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
"node_groups": node_groups,
|
||||
"cluster_configs": cluster_configs})
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return u.get_instance(cluster, "OOZIE_SERVER")
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
version_handler = (
|
||||
self.version_factory.get_version_handler(cluster.hadoop_version))
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.service.edp.oozie import engine as edp_engine
|
||||
|
||||
|
||||
|
@ -26,3 +27,6 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
|||
|
||||
def get_oozie_server_uri(self, cluster):
|
||||
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return u.get_instance(cluster, "OOZIE_SERVER")
|
||||
|
|
|
@ -61,10 +61,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
|||
def scale_cluster(self, cluster, instances):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def get_oozie_server(self, cluster):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def validate_edp(self, cluster):
|
||||
pass
|
||||
|
|
|
@ -53,10 +53,6 @@ class AbstractVersionHandler():
|
|||
def validate_scaling(self, cluster, existing, additional):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_oozie_server(self, cluster):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
return
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.vanilla import utils as vu
|
||||
from sahara.service.edp.oozie import engine as edp_engine
|
||||
|
||||
|
||||
|
@ -25,3 +26,6 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
|||
|
||||
def get_oozie_server_uri(self, cluster):
|
||||
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return vu.get_oozie(cluster)
|
||||
|
|
|
@ -70,10 +70,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
|
|||
cluster.hadoop_version).validate_scaling(cluster, existing,
|
||||
additional)
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return self._get_version_handler(
|
||||
cluster.hadoop_version).get_oozie_server(cluster)
|
||||
|
||||
def validate_edp(self, cluster):
|
||||
oo_count = u.get_instances_count(cluster, 'oozie')
|
||||
if oo_count != 1:
|
||||
|
|
|
@ -55,9 +55,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
"Hive": ["hiveserver"]
|
||||
}
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return vu.get_oozie(cluster)
|
||||
|
||||
def validate(self, cluster):
|
||||
nn_count = sum([ng.count for ng
|
||||
in utils.get_node_groups(cluster, "namenode")])
|
||||
|
|
|
@ -137,9 +137,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return vu.get_oozie(cluster)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EdpOozieEngine(cluster)
|
||||
|
|
|
@ -132,9 +132,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return vu.get_oozie(cluster)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EdpOozieEngine(cluster)
|
||||
|
|
|
@ -46,7 +46,7 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
|
||||
def _get_client(self):
|
||||
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
|
||||
self.plugin.get_oozie_server(self.cluster))
|
||||
self.get_oozie_server(self.cluster))
|
||||
|
||||
def _get_oozie_job_params(self, hdfs_user, path_to_workflow):
|
||||
rm_path = self.get_resource_manager_uri(self.cluster)
|
||||
|
@ -89,9 +89,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
hdfs_user = self.get_hdfs_user()
|
||||
|
||||
# TODO(tmckay): this should probably be "get_namenode"
|
||||
# but that call does not exist in the plugin api now.
|
||||
# However, other engines may need it.
|
||||
oozie_server = self.plugin.get_oozie_server(self.cluster)
|
||||
# but that call does not exist in the oozie engine api now.
|
||||
oozie_server = self.get_oozie_server(self.cluster)
|
||||
|
||||
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
|
||||
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job)
|
||||
|
@ -129,6 +128,10 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
def get_oozie_server_uri(self, cluster):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_oozie_server(self, cluster):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_name_node_uri(self, cluster):
|
||||
pass
|
||||
|
|
|
@ -61,6 +61,7 @@ class InstanceInfo:
|
|||
|
||||
class TestCluster():
|
||||
def __init__(self, node_groups):
|
||||
self.plugin_name = 'hdp'
|
||||
self.hadoop_version = None
|
||||
self.cluster_configs = {}
|
||||
self.node_groups = node_groups
|
||||
|
|
|
@ -305,15 +305,19 @@ class AmbariPluginTest(sahara_base.SaharaTestCase):
|
|||
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
|
||||
"JOBTRACKER", "TASKTRACKER", "OOZIE_SERVER"])
|
||||
cluster = base.TestCluster([node_group])
|
||||
cluster.hadoop_version = '2.0.6'
|
||||
plugin = ap.AmbariPlugin()
|
||||
|
||||
self.assertIsNotNone(plugin.get_oozie_server(cluster))
|
||||
self.assertIsNotNone(plugin.get_edp_engine(
|
||||
cluster, edp.JOB_TYPE_PIG).get_oozie_server(cluster))
|
||||
|
||||
node_group = base.TestNodeGroup(
|
||||
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
|
||||
"JOBTRACKER", "TASKTRACKER", "NOT_OOZIE"])
|
||||
cluster = base.TestCluster([node_group])
|
||||
self.assertIsNone(plugin.get_oozie_server(cluster))
|
||||
cluster.hadoop_version = '2.0.6'
|
||||
self.assertIsNone(plugin.get_edp_engine(
|
||||
cluster, edp.JOB_TYPE_PIG).get_oozie_server(cluster))
|
||||
|
||||
@mock.patch('sahara.service.edp.hdfs_helper.create_dir_hadoop1')
|
||||
def test_edp132_calls_hadoop1_create_dir(self, create_dir):
|
||||
|
|
|
@ -85,6 +85,9 @@ class FakeOozieJobEngine(oe.OozieJobEngine):
|
|||
def get_oozie_server_uri(self, cluster):
|
||||
return 'http://localhost:11000/oozie'
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return None
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
return 'hdfs://localhost:8020'
|
||||
|
||||
|
|
Loading…
Reference in New Issue