Support post and list messages
Allow post and list messages with CLI so that user can easily verify Zaqar's messaging functions with command line. Change-Id: Ib1b82917cc67b604e0bf9d94a18763bc79de0f2b
This commit is contained in:
parent
60ce284184
commit
96370f27ed
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Allow post and list messages with CLI so that user can easily verify
|
||||
Zaqar's messaging functions with command line.
|
|
@ -102,6 +102,8 @@ openstack.messaging.v2 =
|
|||
queue_signed_url = zaqarclient.queues.v2.cli:CreateSignedUrl
|
||||
messaging_ping = zaqarclient.queues.v2.cli:Ping
|
||||
messaging_health = zaqarclient.queues.v2.cli:Health
|
||||
message_post = zaqarclient.queues.v2.cli:PostMessages
|
||||
message_list = zaqarclient.queues.v2.cli:ListMessages
|
||||
|
||||
openstack.cli.extension =
|
||||
messaging = zaqarclient.queues.cli
|
||||
|
|
|
@ -389,8 +389,7 @@ class ListPools(command.Lister):
|
|||
help="Page size limit")
|
||||
parser.add_argument(
|
||||
"--detailed",
|
||||
type=bool,
|
||||
metavar="<detailed>",
|
||||
action="store_true",
|
||||
help="Detailed output")
|
||||
|
||||
return parser
|
||||
|
@ -558,9 +557,7 @@ class ListFlavors(command.Lister):
|
|||
help="Page size limit")
|
||||
parser.add_argument(
|
||||
"--detailed",
|
||||
type=bool,
|
||||
default=False,
|
||||
metavar="<detailed>",
|
||||
action="store_true",
|
||||
help="If show detailed capabilities of flavor")
|
||||
return parser
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import utils
|
||||
|
@ -57,6 +58,115 @@ class GetQueueMetadata(cli.GetQueueMetadata):
|
|||
pass
|
||||
|
||||
|
||||
class PostMessages(command.Command):
|
||||
"""Post messages for a given queue"""
|
||||
|
||||
_description = _("Post messages for a given queue")
|
||||
log = logging.getLogger(__name__ + ".PostMessages")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(PostMessages, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
"queue_name",
|
||||
metavar="<queue_name>",
|
||||
help="Name of the queue")
|
||||
parser.add_argument(
|
||||
"messages",
|
||||
type=json.loads,
|
||||
metavar="<messages>",
|
||||
help="Messages to be posted.")
|
||||
parser.add_argument(
|
||||
"--client-id",
|
||||
metavar="<client_id>",
|
||||
default=os.environ.get("OS_MESSAGE_CLIENT_ID"),
|
||||
help="A UUID for each client instance.")
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = _get_client(self, parsed_args)
|
||||
|
||||
if not parsed_args.client_id:
|
||||
raise AttributeError("<--client-id> option is missing and "
|
||||
"environment variable OS_MESSAGE_CLIENT_ID "
|
||||
"is not set. Please at least either pass in "
|
||||
"the client id or set the environment "
|
||||
"variable")
|
||||
else:
|
||||
client.client_uuid = parsed_args.client_id
|
||||
|
||||
queue = client.queue(parsed_args.queue_name)
|
||||
queue.post(parsed_args.messages)
|
||||
|
||||
|
||||
class ListMessages(command.Lister):
|
||||
"""List all messages for a given queue"""
|
||||
|
||||
_description = _("List all messages for a given queue")
|
||||
log = logging.getLogger(__name__ + ".ListMessages")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListMessages, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
"queue_name",
|
||||
metavar="<queue_name>",
|
||||
help="Name of the queue")
|
||||
parser.add_argument(
|
||||
"--message-ids",
|
||||
metavar="<message_ids>",
|
||||
help="List of messages' ids to retrieve")
|
||||
parser.add_argument(
|
||||
"--limit",
|
||||
metavar="<limit>",
|
||||
type=int,
|
||||
help="Maximum number of messages to get")
|
||||
parser.add_argument(
|
||||
"--echo",
|
||||
action="store_true",
|
||||
help="Whether to get this client's own messages")
|
||||
parser.add_argument(
|
||||
"--include-claimed",
|
||||
action="store_true",
|
||||
help="Whether to include claimed messages")
|
||||
parser.add_argument(
|
||||
"--client-id",
|
||||
metavar="<client_id>",
|
||||
default=os.environ.get("OS_MESSAGE_CLIENT_ID"),
|
||||
help="A UUID for each client instance.")
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = _get_client(self, parsed_args)
|
||||
|
||||
if not parsed_args.client_id:
|
||||
raise AttributeError("<--client-id> option is missing and "
|
||||
"environment variable OS_MESSAGE_CLIENT_ID "
|
||||
"is not set. Please at least either pass in "
|
||||
"the client id or set the environment "
|
||||
"variable")
|
||||
else:
|
||||
client.client_uuid = parsed_args.client_id
|
||||
|
||||
kwargs = {}
|
||||
if parsed_args.limit is not None:
|
||||
kwargs["limit"] = parsed_args.limit
|
||||
if parsed_args.echo is not None:
|
||||
kwargs["echo"] = parsed_args.echo
|
||||
if parsed_args.include_claimed is not None:
|
||||
kwargs["include_claimed"] = parsed_args.include_claimed
|
||||
|
||||
queue = client.queue(parsed_args.queue_name)
|
||||
|
||||
if parsed_args.message_ids:
|
||||
messages = queue.messages(parsed_args.message_ids.split(','),
|
||||
**kwargs)
|
||||
else:
|
||||
messages = queue.messages(**kwargs)
|
||||
|
||||
columns = ("ID", "Body", "TTL", "Age", "Claim ID")
|
||||
return (columns,
|
||||
(utils.get_item_properties(s, columns) for s in messages))
|
||||
|
||||
|
||||
class PurgeQueue(command.Command):
|
||||
"""Purge a queue"""
|
||||
|
||||
|
|
|
@ -14,10 +14,45 @@
|
|||
# limitations under the License.
|
||||
|
||||
from zaqarclient.queues.v1 import message
|
||||
from zaqarclient.queues.v2 import core
|
||||
|
||||
|
||||
class Message(message.Message):
|
||||
pass
|
||||
def __init__(self, queue, ttl, age, body, href=None, id=None,
|
||||
claim_id=None):
|
||||
self.queue = queue
|
||||
self.href = href
|
||||
self.ttl = ttl
|
||||
self.age = age
|
||||
self.body = body
|
||||
|
||||
# NOTE(flaper87): Is this really
|
||||
# necessary? Should this be returned
|
||||
# by Zaqar?
|
||||
# The url has two forms depending on if it has been claimed.
|
||||
# /v1/queues/worker-jobs/messages/5c6939a8?claim_id=63c9a592
|
||||
# or
|
||||
# /v1/queues/worker-jobs/messages/5c6939a8
|
||||
if id is None:
|
||||
self.id = href.split('/')[-1]
|
||||
if '?' in self.id:
|
||||
self.id = self.id.split('?')[0]
|
||||
else:
|
||||
self.id = id
|
||||
|
||||
def __repr__(self):
|
||||
return '<Message id:{id} ttl:{ttl}>'.format(id=self.id,
|
||||
ttl=self.ttl)
|
||||
|
||||
@property
|
||||
def claim_id(self):
|
||||
if '=' in self.href:
|
||||
return self.href.split('=')[-1]
|
||||
|
||||
def delete(self):
|
||||
req, trans = self.queue.client._request_and_transport()
|
||||
core.message_delete(trans, req, self.queue._name,
|
||||
self.id, self.claim_id)
|
||||
|
||||
|
||||
def create_object(parent):
|
||||
|
|
Loading…
Reference in New Issue