Add computer per model feature

Change-Id: I11b66c3eb9e195f49871456084a12db42676cd75
This commit is contained in:
bharath 2018-11-13 22:35:01 +05:30
parent 371c754b0d
commit 40b7b896ac
20 changed files with 293 additions and 61 deletions

View File

@ -164,11 +164,9 @@ class FlavorController(base.Controller):
policy.enforce(context, "flavor:create",
action="flavor:create")
LOG.debug("bhaaaaaaaaaaaaaaaaaaaaaaaaaaa")
LOG.debug(flavor_dict)
flavor_dict["additional_details"] = json.dumps(flavor_dict["additional_details"])
LOG.debug(flavor_dict)
# flavor_dict["model_data"] = open("/home/bharath/model.zip", "rb").read()
new_flavor = objects.Flavor(context, **flavor_dict)
flavor = new_flavor.create(context)
LOG.debug(new_flavor)

View File

@ -12,6 +12,11 @@
import base64
import shlex
import os
import time
import yaml
from eventlet import greenthread
from oslo_log import log as logging
from oslo_utils import strutils
@ -31,6 +36,7 @@ from gyan.common import context as gyan_context
from gyan.common import exception
from gyan.common.i18n import _
from gyan.common.policies import ml_model as policies
from gyan.common import clients
from gyan.common import policy
from gyan.common import utils
import gyan.conf
@ -38,7 +44,7 @@ from gyan import objects
CONF = gyan.conf.CONF
LOG = logging.getLogger(__name__)
BASE_TEMPLATE = os.path.join(CONF.state_path, "base.yaml")
def check_policy_on_ml_model(ml_model, action):
context = pecan.request.context
@ -81,7 +87,6 @@ class MLModelController(base.Controller):
'predict': ['POST']
}
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
def get_all(self, **kwargs):
@ -107,7 +112,7 @@ class MLModelController(base.Controller):
expand = kwargs.pop('expand', None)
ml_model_allowed_filters = ['name', 'status', 'project_id', 'user_id',
'type']
'type']
filters = {}
for filter_key in ml_model_allowed_filters:
if filter_key in kwargs:
@ -119,23 +124,23 @@ class MLModelController(base.Controller):
marker = kwargs.pop('marker', None)
if marker:
marker_obj = objects.ML_Model.get_by_uuid(context,
marker)
marker)
if kwargs:
unknown_params = [str(k) for k in kwargs]
msg = _("Unknown parameters: %s") % ", ".join(unknown_params)
raise exception.InvalidValue(msg)
ml_models = objects.ML_Model.list(context,
limit,
marker_obj,
sort_key,
sort_dir,
filters=filters)
limit,
marker_obj,
sort_key,
sort_dir,
filters=filters)
return MLModelCollection.convert_with_links(ml_models, limit,
url=resource_url,
expand=expand,
sort_key=sort_key,
sort_dir=sort_dir)
url=resource_url,
expand=expand,
sort_key=sort_key,
sort_dir=sort_dir)
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -152,7 +157,7 @@ class MLModelController(base.Controller):
ml_model = utils.get_ml_model(ml_model_ident)
check_policy_on_ml_model(ml_model.as_dict(), "ml_model:get_one")
return view.format_ml_model(context, pecan.request.host_url,
ml_model.as_dict())
ml_model.as_dict())
@base.Controller.api_version("1.0")
@pecan.expose('json')
@ -168,9 +173,9 @@ class MLModelController(base.Controller):
compute_api = pecan.request.compute_api
new_model = view.format_ml_model(context, pecan.request.host_url,
ml_model.as_dict())
compute_api.ml_model_create(context, new_model)
# compute_api.ml_model_create(context, new_model)
return new_model
@base.Controller.api_version("1.0")
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -180,10 +185,13 @@ class MLModelController(base.Controller):
ml_model = utils.get_ml_model(ml_model_ident)
pecan.response.status = 200
compute_api = pecan.request.compute_api
host_ip = ml_model.deployed_on
LOG.debug(pecan.request.POST['file'])
predict_dict = {
"data": base64.b64encode(pecan.request.POST['file'].file.read())
}
prediction = compute_api.ml_model_predict(context, ml_model_ident, **predict_dict)
LOG.debug(predict_dict)
prediction = compute_api.ml_model_predict(context, ml_model_ident, host_ip, **predict_dict)
return prediction
@base.Controller.api_version("1.0")
@ -210,20 +218,22 @@ class MLModelController(base.Controller):
ml_model_dict['status'] = consts.CREATED
ml_model_dict['ml_type'] = ml_model_dict['type']
ml_model_dict['flavor_id'] = ml_model_dict['flavor_id']
extra_spec = {}
extra_spec['hints'] = ml_model_dict.get('hints', None)
#ml_model_dict["model_data"] = open("/home/bharath/model.zip", "rb").read()
# ml_model_dict["model_data"] = open("/home/bharath/model.zip", "rb").read()
new_ml_model = objects.ML_Model(context, **ml_model_dict)
# heat_client = clients.OpenStackClients(context).heat()
ml_model = new_ml_model.create(context)
LOG.debug(new_ml_model)
#compute_api.ml_model_create(context, new_ml_model)
# compute_api.ml_model_create(context, new_ml_model)
# Set the HTTP Location Header
pecan.response.location = link.build_url('ml_models',
ml_model.id)
pecan.response.status = 201
return view.format_ml_model(context, pecan.request.host_url,
ml_model.as_dict())
ml_model.as_dict())
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -241,8 +251,7 @@ class MLModelController(base.Controller):
compute_api = pecan.request.compute_api
ml_model = compute_api.ml_model_update(context, ml_model, patch)
return view.format_ml_model(context, pecan.request.node_url,
ml_model.as_dict())
ml_model.as_dict())
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -259,6 +268,58 @@ class MLModelController(base.Controller):
ml_model.destroy(context)
pecan.response.status = 204
def _do_compute_node_schedule(self, context, ml_model, url, compute_api, host_url):
target_status = "COMPLETE"
timeout = 500
sleep_interval = 5
stack_data = {
"files": {},
"disable_rollback": True,
"parameters": {},
"stack_name": "TENSORFLOW",
"environment": {}
}
stack_data["template"] = yaml.safe_load(open(BASE_TEMPLATE))
LOG.debug(stack_data)
heat_client = clients.OpenStackClients(context).heat()
stack = heat_client.stacks.create(**stack_data)
LOG.debug(stack)
stack_id = stack["stack"]["id"]
while True:
stack_result = heat_client.stacks.get(stack_id)
status = stack_result.status
if (status == target_status):
break
if status == target_status:
ml_model.status = consts.DEPLOYED_COMPUTE_NODE
ml_model.save(context)
else:
ml_model.status = consts.DEPLOYMENT_FAILED
ml_model.save(context)
return
host_ip = None
stack_result = heat_client.stacks.get(stack_id)
for output in stack_result.outputs:
if "public" in output["output_key"]:
host_ip = output["output_value"]
ml_model.deployed_on = host_ip
ml_model.save(context)
while True:
hosts = objects.ComputeHost.list(context)
LOG.debug(hosts)
LOG.debug(host_ip)
for host in hosts:
if host_ip == host.hostname:
ml_model.status = consts.DEPLOYED
ml_model.url = url
ml_model.save(context)
ml_model.ml_data = None
new_model = view.format_ml_model(context, host_url,
ml_model.as_dict())
compute_api.ml_model_create(context, new_model, host_ip)
return
greenthread.sleep(sleep_interval)
return None
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -267,19 +328,27 @@ class MLModelController(base.Controller):
:param ml_model_ident: UUID or Name of a ML Model.
"""
context = pecan.request.context
ml_model = utils.get_ml_model(ml_model_ident)
@utils.synchronized(ml_model.id)
def do_compute_schedule(context, ml_model, url, compute_api, host_url):
self._do_compute_node_schedule(context, ml_model, url, compute_api, host_url)
check_policy_on_ml_model(ml_model.as_dict(), "ml_model:deploy")
utils.validate_ml_model_state(ml_model, 'deploy')
LOG.debug('Calling compute.ml_model_deploy with %s',
ml_model.id)
ml_model.status = consts.DEPLOYED
ml_model.status = consts.DEPLOYMENT_STARTED
url = pecan.request.url.replace("deploy", "predict")
ml_model.url = url
# ml_model.url = url
compute_api = pecan.request.compute_api
utils.spawn_n(do_compute_schedule, context, ml_model, url, compute_api, pecan.request.host_url)
ml_model.save(context)
pecan.response.status = 202
return view.format_ml_model(context, pecan.request.host_url,
ml_model.as_dict())
ml_model.as_dict())
@pecan.expose('json')
@exception.wrap_pecan_controller_exception
@ -294,9 +363,9 @@ class MLModelController(base.Controller):
utils.validate_ml_model_state(ml_model, 'undeploy')
LOG.debug('Calling compute.ml_model_deploy with %s',
ml_model.id)
ml_model.status = consts.SCHEDULED
ml_model.status = consts.CREATED
ml_model.url = None
ml_model.save(context)
pecan.response.status = 202
return view.format_ml_model(context, pecan.request.host_url,
ml_model.as_dict())
ml_model.as_dict())

View File

@ -20,7 +20,8 @@ ml_model_create = {
'type': 'object',
'properties': {
"name": parameter_types.ml_model_name,
"type": parameter_types.ml_model_type
"type": parameter_types.ml_model_type,
"flavor_id": parameter_types.ml_model_flavor_id
},
'required': ['name', 'type'],
'additionalProperties': False

View File

@ -147,6 +147,13 @@ ml_model_name = {
'pattern': '^[a-zA-Z0-9-._]*$'
}
ml_model_flavor_id = {
'type': 'string',
'minLength': 1,
'maxLength': 255,
'pattern': '^[a-zA-Z0-9-._]*$'
}
ml_model_type = {
'type': 'string',
'minLength': 1,

View File

@ -27,8 +27,9 @@ _basic_keys = (
'status',
'status_reason',
'host_id',
'deployed',
'ml_type'
'ml_type',
'flavor_id',
'deployed_on'
)
LOG = logging.getLogger(__name__)
@ -36,8 +37,6 @@ LOG = logging.getLogger(__name__)
def format_ml_model(context, url, ml_model):
def transform(key, value):
LOG.debug(key)
LOG.debug(value)
if key not in _basic_keys:
return
# strip the key if it is not allowed by policy
@ -57,4 +56,4 @@ def format_ml_model(context, url, ml_model):
yield (key, value)
return dict(itertools.chain.from_iterable(
transform(k, v) for k, v in ml_model.items()))
transform(k, v) for k, v in ml_model.items()))

View File

@ -63,7 +63,7 @@ def get_resource(resource, resource_ident):
context = pecan.request.context
if context.is_admin:
context.all_projects = True
if uuidutils.is_uuid_like(resource_ident):
if uuidutils.is_uuid_like(resource_ident) or 'gyan-' in resource_ident:
return resource.get_by_uuid(context, resource_ident)
return resource.get_by_name(context, resource_ident)

62
gyan/common/clients.py Normal file
View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient import client as heatclient
from gyan.common import exception
from gyan.common import keystone
import gyan.conf
class OpenStackClients(object):
"""Convenience class to create and cache client instances."""
def __init__(self, context):
self.context = context
self._keystone = None
self._heat = None
def url_for(self, **kwargs):
return self.keystone().session.get_endpoint(**kwargs)
def gyan_url(self):
endpoint_type = self._get_client_option('gyan', 'endpoint_type')
region_name = self._get_client_option('gyan', 'region_name')
return self.url_for(service_type='ml-infra',
interface=endpoint_type,
region_name=region_name)
@property
def auth_token(self):
return self.context.auth_token or self.keystone().auth_token
def keystone(self):
if self._keystone:
return self._keystone
self._keystone = keystone.KeystoneClientV3(self.context)
return self._keystone
def _get_client_option(self, client, option):
return getattr(getattr(gyan.conf.CONF, '%s_client' % client), option)
@exception.wrap_keystone_exception
def heat(self):
if self._heat:
return self._heat
heatclient_version = self._get_client_option('heat', 'api_version')
session = self.keystone().session
self._heat = heatclient.Client(heatclient_version,
session=session)
return self._heat

View File

@ -17,4 +17,8 @@ UNDEPLOYED = 'undeployed'
DEPLOYED = 'deployed'
CREATING = 'CREATING'
CREATED = 'CREATED'
SCHEDULED = 'SCHEDULED'
SCHEDULED = 'SCHEDULED'
DEPLOYED_COMPUTE_NODE = 'DEPLOYED COMPUTE NODE'
DEPLOYMENT_FAILED = 'DEPLOYMENT FAILED'
DEPLOYMENT_STARTED = 'DEPLOYMENT STARTED'
NAME_PREFIX='gyan-'

View File

@ -25,8 +25,11 @@ import json
import mimetypes
import os
import zipfile
import ctypes, os
from oslo_concurrency import lockutils
from oslo_concurrency import processutils
from oslo_utils import importutils
from oslo_context import context as common_context
from oslo_log import log as logging
from oslo_utils import excutils
@ -42,12 +45,16 @@ from gyan.common import privileged
import gyan.conf
from gyan import objects
eventlet.monkey_patch()
CONF = gyan.conf.CONF
LOG = logging.getLogger(__name__)
synchronized = lockutils.synchronized_with_prefix(consts.NAME_PREFIX)
VALID_STATES = {
'deploy': [consts.CREATED, consts.UNDEPLOYED, consts.SCHEDULED],
'undeploy': [consts.DEPLOYED]
'undeploy': [consts.DEPLOYED, consts.DEPLOYED_COMPUTE_NODE, consts.DEPLOYMENT_FAILED, consts.DEPLOYMENT_STARTED]
}
def safe_rstrip(value, chars=None):
"""Removes trailing characters from a string if that does not make it empty
@ -101,6 +108,7 @@ def spawn_n(func, *args, **kwargs):
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
_context.update_store()
if _context is not None:
_context.update_store()
func(*args, **kwargs)
@ -271,4 +279,4 @@ def save_model(path, model):
f.write(model.ml_data)
zip_ref = zipfile.ZipFile(file_path+'.zip', 'r')
zip_ref.extractall(file_path)
zip_ref.close()
zip_ref.close()

View File

@ -23,7 +23,6 @@ from gyan.compute import rpcapi
import gyan.conf
from gyan import objects
CONF = gyan.conf.CONF
LOG = logging.getLogger(__name__)
@ -35,11 +34,11 @@ class API(object):
self.rpcapi = rpcapi.API(context=context)
super(API, self).__init__()
def ml_model_create(self, context, new_ml_model, **extra_spec):
def ml_model_create(self, context, new_ml_model, host_ip, **extra_spec):
try:
host_state = {
"host": "localhost"
} #self._schedule_ml_model(context, ml_model, extra_spec)
"host": host_ip
}
except exception.NoValidHost:
new_ml_model.status = consts.ERROR
new_ml_model.status_reason = _(
@ -53,15 +52,15 @@ class API(object):
raise
LOG.debug(host_state)
return self.rpcapi.ml_model_create(context, host_state['host'],
new_ml_model)
def ml_model_predict(self, context, ml_model_id, **kwargs):
return self.rpcapi.ml_model_predict(context, ml_model_id,
**kwargs)
new_ml_model)
def ml_model_predict(self, context, ml_model_id, host_ip, **kwargs):
return self.rpcapi.ml_model_predict(context, ml_model_id, host_ip,
**kwargs)
def ml_model_delete(self, context, ml_model, *args):
self._record_action_start(context, ml_model, ml_model_actions.DELETE)
return self.rpcapi.ml_model_delete(context, ml_model, *args)
def ml_model_show(self, context, ml_model):
return self.rpcapi.ml_model_show(context, ml_model)
return self.rpcapi.ml_model_show(context, ml_model)

View File

@ -53,8 +53,8 @@ class Manager(periodic_task.PeriodicTasks):
db_ml_model = objects.ML_Model.get_by_uuid_db(context, ml_model["id"])
utils.save_model(CONF.state_path, db_ml_model)
obj_ml_model = objects.ML_Model.get_by_uuid(context, ml_model["id"])
obj_ml_model.status = consts.SCHEDULED
obj_ml_model.status_reason = "The ML Model is scheduled and saved to the host %s" % self.host
#obj_ml_model.status = consts.SCHEDULED
#obj_ml_model.status_reason = "The ML Model is scheduled and saved to the host %s" % self.host
obj_ml_model.save(context)
def ml_model_predict(self, context, ml_model_id, kwargs):

View File

@ -48,8 +48,8 @@ class API(rpc_service.API):
self._cast(host, 'ml_model_create',
ml_model=ml_model)
def ml_model_predict(self, context, ml_model_id, **kwargs):
return self._call("localhost", 'ml_model_predict',
def ml_model_predict(self, context, ml_model_id, host_ip, **kwargs):
return self._call(host_ip, 'ml_model_predict',
ml_model_id=ml_model_id, kwargs=kwargs)
@check_ml_model_host
@ -66,4 +66,4 @@ class API(rpc_service.API):
@check_ml_model_host
def ml_model_update(self, context, ml_model, patch):
return self._call(ml_model.host, 'ml_model_update',
ml_model=ml_model, patch=patch)
ml_model=ml_model, patch=patch)

View File

@ -24,6 +24,7 @@ from gyan.conf import services
from gyan.conf import ssl
from gyan.conf import utils
from gyan.conf import gyan_client
from gyan.conf import heat_client
CONF = cfg.CONF
@ -36,6 +37,7 @@ path.register_opts(CONF)
scheduler.register_opts(CONF)
services.register_opts(CONF)
gyan_client.register_opts(CONF)
heat_client.register_opts(CONF)
ssl.register_opts(CONF)
profiler.register_opts(CONF)
utils.register_opts(CONF)

54
gyan/conf/heat_client.py Normal file
View File

@ -0,0 +1,54 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
heat_group = cfg.OptGroup(name='heat_client',
title='Options for the Heat client')
common_security_opts = [
cfg.StrOpt('ca_file',
help='Optional CA cert file to use in SSL connections.'),
cfg.StrOpt('cert_file',
help='Optional PEM-formatted certificate chain file.'),
cfg.StrOpt('key_file',
help='Optional PEM-formatted file that contains the '
'private key.'),
cfg.BoolOpt('insecure',
default=False,
help="If set, then the server's certificate will not "
"be verified.")]
heat_client_opts = [
cfg.StrOpt('region_name',
help='Region in Identity service catalog to use for '
'communication with the OpenStack service.'),
cfg.StrOpt('endpoint_type',
default='publicURL',
help='Type of endpoint in Identity service catalog to use '
'for communication with the OpenStack service.'),
cfg.StrOpt('api_version',
default='1',
help='Version of Heat API to use in heatclient.')]
ALL_OPTS = (heat_client_opts + common_security_opts)
def register_opts(conf):
conf.register_group(heat_group)
conf.register_opts(ALL_OPTS, group=heat_group)
def list_opts():
return {heat_group: ALL_OPTS}

View File

@ -0,0 +1,26 @@
"""Add flavor column to ML Model
Revision ID: 319fc86b7f72
Revises: 395aff469925
Create Date: 2018-11-06 15:31:35.600670
"""
# revision identifiers, used by Alembic.
revision = '319fc86b7f72'
down_revision = '395aff469925'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('ml_model', schema=None) as batch_op:
batch_op.add_column(sa.Column('deployed_on', sa.Text(), nullable=True))
batch_op.add_column(sa.Column('flavor_id', sa.String(length=255), nullable=False))
batch_op.drop_column('deployed')
# ### end Alembic commands ###

View File

@ -34,6 +34,5 @@ def upgrade():
batch_op.add_column(sa.Column('ml_type', sa.String(length=255), nullable=True))
batch_op.add_column(sa.Column('started_at', sa.DateTime(), nullable=True))
batch_op.create_unique_constraint('uniq_mlmodel0uuid', ['id'])
batch_op.drop_constraint(u'ml_model_ibfk_1', type_='foreignkey')
# ### end Alembic commands ###

View File

@ -91,6 +91,8 @@ def add_identity_filter(query, value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(id=value)
elif "gyan-" in value:
return query.filter_by(id=value)
else:
raise exception.InvalidIdentity(identity=value)
@ -240,7 +242,7 @@ class Connection(object):
def create_ml_model(self, context, values):
# ensure defaults are present for new ml_models
if not values.get('id'):
values['id'] = uuidutils.generate_uuid()
values['id'] = 'gyan-' + uuidutils.generate_uuid()
ml_model = models.ML_Model()
ml_model.update(values)
try:

View File

@ -122,7 +122,8 @@ class ML_Model(Base):
status = Column(String(20))
status_reason = Column(Text, nullable=True)
host_id = Column(String(255), nullable=True)
deployed = Column(Text, nullable=True)
deployed_on = Column(Text, nullable=True)
flavor_id = Column(String(255), nullable=False)
url = Column(Text, nullable=True)
hints = Column(Text, nullable=True)
ml_type = Column(String(255), nullable=True)

View File

@ -33,7 +33,7 @@ class Flavor(base.GyanPersistentObject, base.GyanObject):
'cpu': fields.StringField(nullable=True),
'memory': fields.StringField(nullable=True),
'python_version': fields.StringField(nullable=True),
'disk': fields.BooleanField(nullable=True),
'disk': fields.StringField(nullable=True),
'additional_details': fields.StringField(nullable=True),
'created_at': fields.DateTimeField(tzinfo_aware=False, nullable=True),
'updated_at': fields.DateTimeField(tzinfo_aware=False, nullable=True),

View File

@ -28,14 +28,15 @@ class ML_Model(base.GyanPersistentObject, base.GyanObject):
VERSION = '1'
fields = {
'id': fields.UUIDField(nullable=True),
'id': fields.StringField(nullable=True),
'name': fields.StringField(nullable=True),
'project_id': fields.StringField(nullable=True),
'user_id': fields.StringField(nullable=True),
'status': fields.StringField(nullable=True),
'status_reason': fields.StringField(nullable=True),
'url': fields.StringField(nullable=True),
'deployed': fields.BooleanField(nullable=True),
'deployed_on': fields.StringField(nullable=True),
'flavor_id': fields.StringField(nullable=True),
'hints': fields.StringField(nullable=True),
'created_at': fields.DateTimeField(tzinfo_aware=False, nullable=True),
'updated_at': fields.DateTimeField(tzinfo_aware=False, nullable=True),