python-muranoclient/portas/portas/api/v1/sessions.py

99 lines
3.6 KiB
Python

from amqplib.client_0_8 import Message
import anyjson
import eventlet
from eventlet.semaphore import Semaphore
from webob import exc
from portas.common import config
from portas.db.models import Session, Status, Environment
from portas.db.session import get_session
from portas.openstack.common import wsgi
from portas.openstack.common import log as logging
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__)
class Controller(object):
def __init__(self):
self.write_lock = Semaphore(1)
connection = amqp.Connection(rabbitmq.host, virtual_host=rabbitmq.virtual_host,
userid=rabbitmq.userid, password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
self.ch = connection.channel()
self.ch.exchange_declare('tasks', 'direct', durable=True, auto_delete=False)
def index(self, request, environment_id):
filters = {'environment_id': environment_id, 'user_id': request.context.user}
unit = get_session()
configuration_sessions = unit.query(Session).filter_by(**filters)
return {"sessions": [session.to_dict() for session in configuration_sessions if
session.environment.tenant_id == request.context.tenant]}
def configure(self, request, environment_id):
params = {'environment_id': environment_id, 'user_id': request.context.user, 'state': 'open'}
session = Session()
session.update(params)
unit = get_session()
if unit.query(Session).filter_by(**{'environment_id': environment_id, 'state': 'open'}).first():
log.info('There is already open session for this environment')
raise exc.HTTPConflict
#create draft for apply later changes
environment = unit.query(Environment).get(environment_id)
session.description = environment.description
with unit.begin():
unit.add(session)
return session.to_dict()
def show(self, request, environment_id, session_id):
unit = get_session()
session = unit.query(Session).get(session_id)
if session.environment.tenant_id != request.context.tenant:
log.info('User is not authorized to access this tenant resources.')
raise exc.HTTPUnauthorized
return session.to_dict()
def delete(self, request, environment_id, session_id):
unit = get_session()
session = unit.query(Session).get(session_id)
if session.state == 'deploying':
log.info('Session is in \'deploying\' state. Could not be deleted.')
raise exc.HTTPForbidden(comment='Session object in \'deploying\' state could not be deleted')
with unit.begin():
unit.delete(session)
return None
def reports(self, request, environment_id, session_id):
unit = get_session()
statuses = unit.query(Status).filter_by(session_id=session_id)
return {"reports": [status.to_dict() for status in statuses]}
def deploy(self, request, environment_id, session_id):
unit = get_session()
session = unit.query(Session).get(session_id)
if session.state != 'open':
log.warn(_('Could not deploy session. Session is already deployed or in deployment state'))
session.state = 'deploying'
session.save(unit)
with self.write_lock:
self.ch.basic_publish(Message(body=anyjson.serialize(session.description)), 'tasks', 'tasks')
def create_resource():
return wsgi.Resource(Controller())