aggent support resource tracker for FPGA

use PeriodicTasks to track resources.

Info:
 https://docs.openstack.org/oslo.service/latest/reference/periodic_task.html

Aggent is a bridge for FPGA driver and conductor.
It gets the lastest informations and update them to conductor.

This patch we call conductor API directly instead of object
remotable method.  Will support it in the next version.

This patch depends on the FPGA driver patch and conductor patch.

How to test this patch:
1. apply the fpga driver patch
   https://review.openstack.org/#/c/531129
2. generate a sysfs fpga data
   $ ./cyborg/tests/unit/accelerator/drivers/fpga/intel/prepare_test_data.py
3. Change the SYS_FPGA in cyborg/accelerator/drivers/fpga/intel/sysinfo.py to
   "/tmp/sys/class/fpga"
4. change the SYS_FPGA_PATH in cyborg/accelerator/drivers/fpga/utils.py to
   "/tmp/sys/class/fpga"
5. restart agent.

The unittest for this patch will be in a separated patch, and comes out ASAP.

Co-Authored-By: Dolpher Du <Dolpher.Du@intel.com>

Change-Id: I5e487cf939aa65d0fc79399ddd5d1337a8c2fa98
This commit is contained in:
Shaohe Feng 2018-02-02 09:11:15 +00:00
parent b22761ab0d
commit b96d0e2570
3 changed files with 178 additions and 3 deletions

View File

@ -14,24 +14,42 @@
# under the License.
import oslo_messaging as messaging
from oslo_service import periodic_task
from cyborg.accelerator.drivers.fpga.base import FPGADriver
from cyborg.agent.resource_tracker import ResourceTracker
from cyborg.conductor import rpcapi as cond_api
from cyborg.conf import CONF
class AgentManager(object):
class AgentManager(periodic_task.PeriodicTasks):
"""Cyborg Agent manager main class."""
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, topic, host=None):
super(AgentManager, self).__init__()
super(AgentManager, self).__init__(CONF)
self.topic = topic
self.host = host or CONF.host
self.fpga_driver = FPGADriver()
self.cond_api = cond_api.ConductorAPI()
self._rt = ResourceTracker(host, self.cond_api)
def periodic_tasks(self, context, raise_on_error=False):
pass
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def hardware_list(self, context, values):
"""List installed hardware."""
pass
def fpga_program(self, context, accelerator, image):
""" Program a FPGA regoin, image can be a url or local file"""
# TODO (Shaohe Feng) Get image from glance.
# And add claim and rollback logical.
raise NotImplementedError()
@periodic_task.periodic_task(run_immediately=True)
def update_available_resource(self, context, startup=True):
"""update all kinds of accelerator resources from their drivers."""
self._rt.update_usage(context)

View File

@ -0,0 +1,153 @@
# Copyright (c) 2018 Intel.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Track resources like FPGA GPU and QAT for a host. Provides the
conductor with useful information about availability through the accelerator
model.
"""
from oslo_log import log as logging
from oslo_messaging.rpc.client import RemoteError
from oslo_utils import uuidutils
from cyborg.accelerator.drivers.fpga.base import FPGADriver
from cyborg.common import utils
from cyborg import objects
LOG = logging.getLogger(__name__)
AGENT_RESOURCE_SEMAPHORE = "agent_resources"
DEPLOYABLE_VERSION = "1.0"
# need to change the driver field name
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
"pcie_address": "devices",
"board": "product_id",
"type": "function",
"vendor": "vendor_id",
"name": "name"}
class ResourceTracker(object):
"""Agent helper class for keeping track of resource usage as instances
are built and destroyed.
"""
def __init__(self, host, cond_api):
# FIXME (Shaohe) local cache for Accelerator.
# Will fix it in next release.
self.fpgas = None
self.host = host
self.conductor_api = cond_api
self.fpga_driver = FPGADriver()
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def claim(self, context):
pass
def _fpga_compare_and_update(self, host_dev, acclerator):
need_updated = False
for k, v in DEPLOYABLE_HOST_MAPS.items():
if acclerator[k] != host_dev[v]:
need_updated = True
acclerator[k] = host_dev[v]
return need_updated
def _gen_deployable_from_host_dev(self, host_dev):
dep = {}
for k, v in DEPLOYABLE_HOST_MAPS.items():
dep[k] = host_dev[v]
dep["host"] = self.host
dep["version"] = DEPLOYABLE_VERSION
dep["availability"] = "free"
dep["uuid"] = uuidutils.generate_uuid()
return dep
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def update_usage(self, context):
"""Update the resource usage and stats after a change in an
instance
"""
def create_deployable(fpgas, bdf, parent_uuid=None):
fpga = fpgas[bdf]
dep = self._gen_deployable_from_host_dev(fpga)
# if parent_uuid:
dep["parent_uuid"] = parent_uuid
obj_dep = objects.Deployable(context, **dep)
new_dep = self.conductor_api.deployable_create(context, obj_dep)
return new_dep
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
fpgas = self._get_fpga_devices()
bdfs = set(fpgas.keys())
deployables = self.conductor_api.deployable_get_by_host(
context, self.host)
# NOTE(Shaohe Feng) when no "pcie_address" in deployable?
accls = dict([(v["pcie_address"], v) for v in deployables])
accl_bdfs = set(accls.keys())
# Firstly update
for mutual in accl_bdfs & bdfs:
accl = accls[mutual]
if self._fpga_compare_and_update(fpgas[mutual], accl):
try:
self.conductor_api.deployable_update(context, accl)
except RemoteError as e:
LOG.error(e)
# Add
new = bdfs - accl_bdfs
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
for n in new_pf:
new_dep = create_deployable(fpgas, n)
accls[n] = new_dep
sub_vf = set()
if "regions" in n:
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
for vf in sub_vf & new:
new_dep = create_deployable(fpgas, vf, new_dep["uuid"])
accls[vf] = new_dep
new.remove(vf)
for n in new - new_pf:
p_bdf = fpgas[n]["parent_devices"]
p_accl = accls[p_bdf]
p_uuid = p_accl["uuid"]
new_dep = create_deployable(fpgas, n, p_uuid)
# Delete
for obsolete in accl_bdfs - bdfs:
try:
self.conductor_api.deployable_delete(context, accls[obsolete])
except RemoteError as e:
LOG.error(e)
del accls[obsolete]
def _get_fpga_devices(self):
def form_dict(devices, fpgas):
for v in devices:
fpgas[v["devices"]] = v
if "regions" in v:
form_dict(v["regions"], fpgas)
fpgas = {}
vendors = self.fpga_driver.discover_vendors()
for v in vendors:
driver = self.fpga_driver.create(v)
form_dict(driver.discover(), fpgas)
return fpgas

View File

@ -16,12 +16,16 @@
"""Utilities and helper functions."""
from oslo_log import log
from oslo_concurrency import lockutils
import six
LOG = log.getLogger(__name__)
synchronized = lockutils.synchronized_with_prefix('cyborg-')
def safe_rstrip(value, chars=None):
"""Removes trailing characters from a string if that does not make it empty