Update to REST API defined by ameade

This commit is contained in:
John Bresnahan 2013-05-29 15:10:26 -10:00
parent 8ec88b8c84
commit e06e47ca9d
8 changed files with 131 additions and 74 deletions

View File

@ -3,6 +3,7 @@ import logging
import urlparse
import routes
import webob
from webob.exc import (HTTPError,
HTTPNotFound,
HTTPConflict,
@ -62,10 +63,19 @@ class XferController(object):
raise HTTPBadRequest(explanation=msg,
content_type="text/plain")
def urlxfer(self, request, srcurl, dsturl, dstopts=None, srcopts=None,
start_ndx=0, end_ndx=None):
srcurl_parts = urlparse.urlparse(srcurl)
dsturl_parts = urlparse.urlparse(dsturl)
def newtransfer(self, request, source_url, destination_url, owner,
source_options=None, destination_options=None,
start_offset=0, end_offset=None):
srcurl_parts = urlparse.urlparse(source_url)
dsturl_parts = urlparse.urlparse(destination_url)
dstopts={}
srcopts={}
if source_options is not None:
srcopts = source_options
if destination_options is not None:
dstopts = destination_options
plugin_policy = config.get_protocol_policy(self.conf)
src_module_name = utils.find_protocol_module_name(plugin_policy,
@ -76,26 +86,26 @@ class XferController(object):
src_module = utils.load_protocol_module(src_module_name, self.conf)
dst_module = utils.load_protocol_module(dst_module_name, self.conf)
write_info = dst_module.new_write(dsturl_parts, dstopts)
read_info = src_module.new_write(srcurl_parts, srcopts)
dstopts = dst_module.new_write(dsturl_parts, dstopts)
srcopts = src_module.new_read(srcurl_parts, srcopts)
db_con = db.StaccatoDB(self.conf)
xfer = db_con.get_new_xfer(request.context.owner,
srcurl,
dsturl,
xfer = db_con.get_new_xfer(owner,
source_url,
destination_url,
src_module_name,
dst_module_name,
start_ndx=start_ndx,
end_ndx=end_ndx,
read_info=read_info,
write_info=write_info)
start_ndx=start_offset,
end_ndx=end_offset,
source_opts=srcopts,
dest_opts=dstopts)
return xfer
def status(self, request, xfer_id):
def status(self, request, xfer_id, owner):
xfer = self._xfer_from_db(xfer_id, request)
return xfer
def list(self, request, limit=None):
def list(self, request, owner):
return self.db_con.lookup_xfer_request_all(owner=request.context.owner)
def _xfer_to_dict(self, x):
@ -107,13 +117,13 @@ class XferController(object):
d['progress'] = x.next_ndx
return d
def delete(self, request, xfer_id):
def delete(self, request, xfer_id, owner):
xfer_request = self._xfer_from_db(xfer_id, request)
self._to_state_machine(Events.EVENT_DELETE,
xfer_request,
'delete')
def cancel(self, request, xfer_id):
def cancel(self, request, xfer_id, owner):
xfer_request = self._xfer_from_db(xfer_id, request)
self._to_state_machine(Events.EVENT_CANCEL,
xfer_request,
@ -125,26 +135,57 @@ class XferController(object):
self.log.exception(msg)
self.log.log(level, msg)
class XferHeaderDeserializer(os_wsgi.RequestHeadersDeserializer):
def default(self, request):
return {'owner': request.context.owner}
class XferDeserializer(os_wsgi.RequestHeadersDeserializer):
class XferDeserializer(os_wsgi.JSONDeserializer):
"""Default request headers deserializer"""
meta_string = 'x-xfer-'
def _validate(self, body, required, optional):
body = self._from_json(body)
request = {}
for k in body:
if k not in required and k not in optional:
msg = '%s is an unknown option.' % k
raise webob.exc.HTTPBadRequest(explanation=msg)
for k in required:
if k not in body:
msg = 'The option %s must be specified.' % k
raise webob.exc.HTTPBadRequest(explanation=msg)
request[k] = body[k]
for k in optional:
if k in body:
request[k] = body[k]
return request
def _pullout_xxfers(self, request):
d = {}
for h in request.headers:
if h.lower().startswith(self.meta_string):
key = h[len(self.meta_string):].lower().replace("-", "_")
val = request.headers[h]
d[key] = val
return d
def newtransfer(self, body):
_required = ['source_url', 'destination_url']
_optional = ['source_options', 'destination_options', 'start_offset',
'end_offset', ]
request = self._validate(body, _required, _optional)
return request
def default(self, request):
return self._pullout_xxfers(request)
def list(self, body):
_required = []
_optional = ['limit', 'next', 'filter',]
request = self._validate(body, _required, _optional)
return request
def status(self, body):
request = self._validate(body, [], [])
return request
def delete(self, body):
request = self._validate(body, [], [])
return request
def cancel(self, body):
request = self._validate(body, [], [])
return request
class XferSerializer(os_wsgi.DictSerializer):
class XferSerializer(os_wsgi.JSONDictSerializer):
def serialize(self, data, action='default', *args):
return super(XferSerializer, self).serialize(data, args[0])
@ -158,17 +199,21 @@ class XferSerializer(os_wsgi.DictSerializer):
def delete(self, data):
return self._xfer_to_json(data)
def urlxfer(self, data):
def newtransfer(self, data):
return self._xfer_to_json(data)
def _xfer_to_json(self, data):
x = data
d = {}
d['id'] = x.id
d['srcurl'] = x.srcurl
d['dsturl'] = x.dsturl
d['source_url'] = x.srcurl
d['destination_url'] = x.dsturl
d['state'] = x.state
d['start_offset'] = x.start_ndx
d['end_offset'] = x.end_ndx
d['progress'] = x.next_ndx
d['source_options'] = x.source_opts
d['destination_options'] = x.dest_opts
return json.dumps(d)
def list(self, data):
@ -191,32 +236,34 @@ class API(os_wsgi.Router):
controller = XferController(self.db_con, self.sm, self.conf)
mapper = routes.Mapper()
body_deserializers = {'application/json': XferDeserializer()}
deserializer = os_wsgi.RequestDeserializer(
headers_deserializer=XferDeserializer())
body_deserializers=body_deserializers,
headers_deserializer=XferHeaderDeserializer())
serializer = XferSerializer()
sc = os_wsgi.Resource(controller,
transfer_resource = os_wsgi.Resource(controller,
deserializer=deserializer,
serializer=serializer)
mapper.connect(None,
"/urlxfer",
controller=sc,
action="urlxfer")
mapper.connect(None,
"/status/{xfer_id}",
controller=sc,
action="status")
mapper.connect(None,
"/cancel/{xfer_id}",
controller=sc,
action="cancel")
mapper.connect(None,
"/delete/{xfer_id}",
controller=sc,
action="delete")
mapper.connect(None,
"/list",
controller=sc,
action="list")
mapper.connect('/transfers',
controller=transfer_resource,
action='newtransfer',
conditions={'method': ['POST']})
mapper.connect('/transfers',
controller=transfer_resource,
action='list',
conditions={'method': ['GET']})
mapper.connect('/transfers/{xfer_id}',
controller=transfer_resource,
action='status',
conditions={'method': ['GET']})
mapper.connect('/transfers/{xfer_id}',
controller=transfer_resource,
action='delete',
conditions={'method': ['DELETE']})
mapper.connect('/transfers/{xfer_id}/action',
controller=transfer_resource,
action='action',
conditions={'method': ['POST']})
super(API, self).__init__(mapper)

View File

@ -35,8 +35,8 @@ class StaccatoDB(object):
dst_module_name,
start_ndx=0,
end_ndx=-1,
read_info=None,
write_info=None,
source_opts=None,
dest_opts=None,
session=None):
if session is None:
@ -52,6 +52,8 @@ class StaccatoDB(object):
xfer_request.start_ndx = start_ndx
xfer_request.next_ndx = start_ndx
xfer_request.end_ndx = end_ndx
xfer_request.dest_opts = dest_opts
xfer_request.source_opts = source_opts
xfer_request.state = "STATE_NEW"
session.add(xfer_request)

View File

@ -5,6 +5,7 @@ SQLAlchemy models for staccato data
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import PickleType
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
@ -42,8 +43,8 @@ class XferRequest(BASE, ModelBase):
next_ndx = Column(Integer(), nullable=False)
end_ndx = Column(Integer(), nullable=False, default=-1)
# TODO add protocol specific json documents
write_info = Column(String(512))
read_info = Column(String(512))
source_opts = Column(PickleType())
dest_opts = Column(PickleType())
def register_models(engine):

View File

@ -11,12 +11,12 @@ class FileProtocol(base.BaseProtocolInterface):
pass
def new_write(self, dsturl_parts, dst_opts):
return {}
return dst_opts
def new_read(self, srcurl_parts, src_opts):
return
return src_opts
def get_reader(self, url_parts, writer, monitor, start=0,
def get_reader(self, url_parts, writer, monitor, source_opts, start=0,
end=None, **kwvals):
self._validate_url(url_parts)
@ -28,7 +28,7 @@ class FileProtocol(base.BaseProtocolInterface):
buflen=65536,
**kwvals)
def get_writer(self, url_parts, checkpointer, **kwvals):
def get_writer(self, url_parts, dest_opts, checkpointer, **kwvals):
self._validate_url(url_parts)
return FileWriteConnection(url_parts.path, checkpointer=checkpointer,

View File

@ -12,13 +12,18 @@ class HttpProtocol(base.BaseProtocolInterface):
def _validate_url(self, url_parts):
pass
def _parse_opts(self, opts):
return opts
def new_write(self, dsturl_parts, dst_opts):
return {}
opts = self._parse_opts(dst_opts)
return opts
def new_read(self, srcurl_parts, src_opts):
return
opts = self._parse_opts(src_opts)
return opts
def get_reader(self, url_parts, writer, monitor, start=0,
def get_reader(self, url_parts, writer, monitor, source_opts, start=0,
end=None, **kwvals):
self._validate_url(url_parts)
@ -29,7 +34,7 @@ class HttpProtocol(base.BaseProtocolInterface):
end=end,
**kwvals)
def get_writer(self, url_parts, checkpointer, **kwvals):
def get_writer(self, url_parts, dest_opts, checkpointer, **kwvals):
raise exceptions.StaccatoNotImplementedException(
_('The HTTP protocol is read only'))

View File

@ -4,20 +4,20 @@ from staccato.common import utils
class BaseProtocolInterface(object):
@utils.not_implemented_decorator
def get_reader(self, url_parts, writer, monitor, start=0, end=None,
**kwvals):
def get_reader(self, url_parts, writer, monitor, source_opts, start=0,
end=None, **kwvals):
pass
@utils.not_implemented_decorator
def get_writer(self, url_parts, checkpointer, **kwvals):
def get_writer(self, url_parts, dest_opts, checkpointer, **kwvals):
pass
@utils.not_implemented_decorator
def new_write(self, dsturl_parts, dst_opts):
def new_write(self, request, dsturl_parts, dst_opts):
pass
@utils.not_implemented_decorator
def new_read(self, srcurl_parts, src_opts):
def new_read(self, request, srcurl_parts, src_opts):
pass

View File

@ -24,13 +24,15 @@ def do_transfer(CONF, xfer_id, state_machine):
dsturl_parts = urlparse.urlparse(request.dsturl)
writer = dst_module.get_writer(dsturl_parts,
checkpointer=checkpointer)
request.dest_opts,
checkpointer=checkpointer)
# it is up to the reader/writer to put on the bw limits
srcurl_parts = urlparse.urlparse(request.srcurl)
reader = src_module.get_reader(srcurl_parts,
writer,
monitor,
request.source_opts,
request.next_ndx,
request.end_ndx)

View File

@ -20,7 +20,7 @@ def xfer_new(CONF, srcurl, dsturl, src_opts, dst_opts, start_ndx=0,
dst_module = utils.load_protocol_module(dst_module_name, CONF)
write_info = dst_module.new_write(dsturl_parts, dst_opts)
read_info = src_module.new_write(srcurl_parts, src_opts)
read_info = src_module.new_read(srcurl_parts, src_opts)
db_con = db.StaccatoDB(CONF)
xfer = db_con.get_new_xfer(srcurl,