move to RPC mode

* support tracer list/start
* support old style stop command

Change-Id: Ia0437eb7bd8775d5c8f50fc3d60cf4ffdc82b494
This commit is contained in:
Kun Huang 2015-11-10 17:53:31 +08:00
parent 2b4fa30af0
commit 0de31ed183
11 changed files with 147 additions and 88 deletions

83
scalpels/agents/server.py Normal file
View File

@ -0,0 +1,83 @@
from oslo_config import cfg
import oslo_messaging
from scalpels.db import api as db_api
from scalpels.agents.base import run_agent
import psutil
import signal
class ServerControlEndpoint(object):
target = oslo_messaging.Target(topic="test", version='1.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if server:
self.server.stop()
class TraceEndpoint(object):
target = oslo_messaging.Target(topic="test", version='1.0')
def tracer_list(self, ctx):
# TODO db_api
# XXX ctx required?
from scalpels.cli.actions.start import agents_map
return agents_map
def start_tracers(self, ctx, tracers):
print locals()
task = db_api.task_create(results=[], pids=[])
pids = []
for tr in tracers:
pid = run_agent(task.uuid, tr)
pids.append(pid)
task = db_api.task_update(task.uuid, pids=pids)
print "task <%s> runs successfully!" % task.uuid
class TaskEndpoint(object):
target = oslo_messaging.Target(topic="test", version='1.0')
LOWEST = 8
def get_last_task(self):
# XXX put it tu utils?
last_task = db_api.task_get_last()
return last_task
def stop_task(self, ctx):
uuid = ctx.get("uuid")
last = ctx.get("last")
if last and uuid:
raise ValueError("can't assign last and uuid togther")
elif not last and not uuid:
task = self.get_last_task()
elif last:
task = self.get_last_task()
elif uuid and len(uuid) < self.LOWEST:
print "at least %d to find a task" % self.LOWEST
return
else:
# len(uuid) > LOWEST
task = db_api.task_get(uuid, fuzzy=True)
print "command stop: %s" % ctx
print "task: <%s>" % task.uuid
for pid in task.pids:
p = psutil.Process(int(pid))
p.send_signal(signal.SIGINT)
transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='localhost')
endpoints = [
ServerControlEndpoint(None),
TraceEndpoint(),
TaskEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='blocking')

View File

@ -4,11 +4,7 @@
import os
import json
from scalpels.db import api as db_api
import subprocess
import time
import signal
from scalpels.agents.base import run_agent
from scalpels.cli.api import api as agent_api
def _parse_agents_from_args(config):
parsed_agents = set()
@ -50,16 +46,4 @@ def run(config):
print "command start: %s" % config
agents = _parse_agents_from_args(config)
agents |= _parse_agents_from_file(config)
task = db_api.task_create(results=[], pids=[])
data_dir = db_api.setup_config_get()["data_dir"].rstrip("/")
pids = []
for ag in agents:
ag_exec = agents_map.get(ag) % data_dir
if ag_exec:
pid = run_agent(task.uuid, ag)
pids.append(pid)
task = db_api.task_update(task.uuid, pids=pids)
print "task <%s> runs successfully!" % task.uuid
agent_api.start_tracers(agents)

View File

@ -5,6 +5,7 @@
from scalpels.db import api as db_api
import psutil
import signal
from scalpels.cli.api import api as agent_api
LOWEST=8
@ -13,24 +14,4 @@ def get_last_task():
return last_task
def run(config):
uuid = config.get("uuid")
last = config.get("last")
if last and uuid:
raise ValueError("can't assign last and uuid togther")
elif not last and not uuid:
task = get_last_task()
elif last:
task = get_last_task()
elif uuid and len(uuid) < LOWEST:
print "at least %d to find a task" % LOWEST
return
else:
# len(uuid) > LOWEST
task = db_api.task_get(uuid, fuzzy=True)
print "command stop: %s" % config
print "task: <%s>" % task.uuid
for pid in task.pids:
p = psutil.Process(int(pid))
p.send_signal(signal.SIGINT)
agent_api.stop_task(config)

View File

@ -2,12 +2,13 @@
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
from scalpels.cli.actions.start import agents_map
from scalpels.cli.api import api as agent_api
from prettytable import PrettyTable
def run(config):
tracers = agent_api.get_tracer_list()
t = PrettyTable(["tracer", "tracer script"])
for ag in agents_map:
t.add_row([ag, agents_map[ag]])
for tracer, script in tracers.items():
t.add_row([tracer, script])
print t

21
scalpels/cli/api.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
from scalpels.cli.rpcapi import rpcapi
class API(object):
def __init__(self):
pass
def get_tracer_list(self):
return rpcapi.tracer_list()
def start_tracers(self, tracers):
rpcapi.start_tracers(tracers=tracers)
def stop_task(self, config):
rpcapi.stop_task(config)
api = API()

12
scalpels/cli/manage.py Normal file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
from scalpels.agents.server import server
def main():
server.start()
server.wait()
if __name__ == "__main__":
main()

20
scalpels/cli/rpcapi.py Normal file
View File

@ -0,0 +1,20 @@
import oslo_messaging as messaging
from oslo_config import cfg
class RPCAPI(object):
def __init__(self, transport):
target = messaging.Target(topic='test', version='1.0')
self._client = messaging.RPCClient(transport, target)
def tracer_list(self, ctxt={}):
return self._client.call(ctxt, "tracer_list")
def start_tracers(self, ctxt={}, tracers=None):
self._client.cast(ctxt, "start_tracers", tracers=tracers)
def stop_task(self, ctxt={}):
self._client.cast(ctxt, "stop_task")
transport = messaging.get_transport(cfg.CONF)
rpcapi = RPCAPI(transport)

View File

@ -50,8 +50,8 @@ def main():
result.add_argument("--short", action="store_true", dest="short", help="report uuid only")
# agent command
agent = subparsers.add_parser("agent")
agent.add_argument("-l", "--list", action="store_true", dest="list", help="list all agents")
tracer = subparsers.add_parser("tracer")
tracer.add_argument("-l", "--list", action="store_true", dest="list", help="list all agents")
parser = rootparser.parse_args()

View File

@ -1,13 +0,0 @@
import oslo_messaging as messaging
from oslo_config import cfg
class TestClient(object):
def __init__(self, transport):
target = messaging.Target(topic='test', version='2.0')
self._client = messaging.RPCClient(transport, target)
def test(self, ctxt, arg):
return self._client.call(ctxt, 'test', arg=arg)
transport = messaging.get_transport(cfg.CONF)
print TestClient(transport).test({"ctx":"this is context"}, 'ping')

View File

@ -1,31 +0,0 @@
from oslo_config import cfg
import oslo_messaging
class ServerControlEndpoint(object):
#target = oslo_messaging.Target(namespace='control', version='2.0')
target = oslo_messaging.Target(topic="test", version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if server:
self.server.stop()
class TestEndpoint(object):
target = oslo_messaging.Target(topic="test", version='2.0')
def test(self, ctx, arg):
return arg
transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='localhost')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='blocking')
server.start()
server.wait()

View File

@ -28,3 +28,4 @@ packages = scalpels
[entry_points]
console_scripts =
sca = scalpels.cli.shell:main
sca-m = scalpels.cli.manage:main