keystone/keystone/identity/backends/pam.py

143 lines
4.0 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
try:
import pam
except ImportError:
pam = None
import PAM
from keystone import identity
def PAM_authenticate(username, password):
def _pam_conv(auth, query_list):
resp = []
for query, q_type in query_list:
if q_type in [PAM.PAM_PROMPT_ECHO_ON, PAM.PAM_PROMPT_ECHO_OFF]:
resp.append((password, 0))
elif q_type in [PAM.PAM_PROMPT_ERROR_MSG,
PAM.PAM_PROMPT_TEXT_INFO]:
resp.append(('', 0))
return resp
auth = PAM.pam()
auth.start('passwd')
auth.set_item(PAM.PAM_USER, username)
auth.set_item(PAM.PAM_CONV, _pam_conv)
try:
auth.authenticate()
auth.acct_mgmt()
except PAM.error:
raise AssertionError('Invalid user / password')
return True
class PamIdentity(identity.Driver):
"""Very basic identity based on PAM.
Tenant is always the same as User, root user has admin role.
"""
def is_domain_aware(self):
return False
def authenticate(self, user_id, password):
auth = pam.authenticate if pam else PAM_authenticate
if not auth(user_id, password):
raise AssertionError('Invalid user / password')
user = {'id': user_id, 'name': user_id}
return user
def get_project(self, tenant_id):
return {'id': tenant_id, 'name': tenant_id}
def get_project_by_name(self, tenant_name, domain_id):
# TODO(henry-nash): Used domain_id once domains are implemented
# in LDAP backend
return {'id': tenant_name, 'name': tenant_name}
def get_user(self, user_id):
return {'id': user_id, 'name': user_id}
def get_user_by_name(self, user_name, domain_id):
# TODO(henry-nash): Used domain_id once domains are implemented
# in LDAP backend
return {'id': user_name, 'name': user_name}
def get_role(self, role_id):
raise NotImplementedError()
def list_users(self):
raise NotImplementedError()
def list_roles(self):
raise NotImplementedError()
def add_user_to_project(self, tenant_id, user_id):
pass
def remove_user_from_project(self, tenant_id, user_id):
pass
def get_projects_for_user(self, user_id):
return [user_id]
def get_roles_for_user_and_project(self, user_id, tenant_id):
raise NotImplementedError()
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
raise NotImplementedError()
def remove_role_from_user_and_project(self, user_id, tenant_id, role_id):
raise NotImplementedError()
def create_user(self, user_id, user):
raise NotImplementedError()
def update_user(self, user_id, user):
raise NotImplementedError()
def delete_user(self, user_id):
raise NotImplementedError()
def _get_metadata(self, user_id, tenant_id):
metadata = {}
if user_id == 'root':
metadata['is_admin'] = True
return metadata
def _create_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()
def _update_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()
def create_role(self, role_id, role):
raise NotImplementedError()
def update_role(self, role_id, role):
raise NotImplementedError()
def delete_role(self, role_id):
raise NotImplementedError()