807 lines
32 KiB
Python
807 lines
32 KiB
Python
import os
|
|
import base64
|
|
import traceback
|
|
import bcrypt
|
|
import pyotp
|
|
import ldap
|
|
import ldap.filter
|
|
from collections import OrderedDict
|
|
from flask import current_app
|
|
from flask_login import AnonymousUserMixin
|
|
from sqlalchemy import orm
|
|
import qrcode as qrc
|
|
import qrcode.image.svg as qrc_svg
|
|
from io import BytesIO
|
|
|
|
from .base import db
|
|
from .role import Role
|
|
from .setting import Setting
|
|
from .domain_user import DomainUser
|
|
from .account_user import AccountUser
|
|
|
|
|
|
class Anonymous(AnonymousUserMixin):
|
|
def __init__(self):
|
|
self.username = 'Anonymous'
|
|
|
|
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(64), index=True, unique=True)
|
|
password = db.Column(db.String(64))
|
|
firstname = db.Column(db.String(64))
|
|
lastname = db.Column(db.String(64))
|
|
email = db.Column(db.String(128))
|
|
otp_secret = db.Column(db.String(16))
|
|
confirmed = db.Column(db.SmallInteger, nullable=False, default=0)
|
|
role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
|
|
role = db.relationship('Role', back_populates="users", lazy=True)
|
|
accounts = None
|
|
|
|
def __init__(self,
|
|
id=None,
|
|
username=None,
|
|
password=None,
|
|
plain_text_password=None,
|
|
firstname=None,
|
|
lastname=None,
|
|
role_id=None,
|
|
email=None,
|
|
otp_secret=None,
|
|
confirmed=False,
|
|
reload_info=True):
|
|
self.id = id
|
|
self.username = username
|
|
self.password = password
|
|
self.plain_text_password = plain_text_password
|
|
self.firstname = firstname
|
|
self.lastname = lastname
|
|
self.role_id = role_id
|
|
self.email = email
|
|
self.otp_secret = otp_secret
|
|
self.confirmed = confirmed
|
|
|
|
if reload_info:
|
|
user_info = self.get_user_info_by_id(
|
|
) if id else self.get_user_info_by_username()
|
|
|
|
if user_info:
|
|
self.id = user_info.id
|
|
self.username = user_info.username
|
|
self.firstname = user_info.firstname
|
|
self.lastname = user_info.lastname
|
|
self.email = user_info.email
|
|
self.role_id = user_info.role_id
|
|
self.otp_secret = user_info.otp_secret
|
|
self.confirmed = user_info.confirmed
|
|
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
def is_active(self):
|
|
return True
|
|
|
|
def is_anonymous(self):
|
|
return False
|
|
|
|
def get_id(self):
|
|
return str(self.id)
|
|
|
|
def __repr__(self):
|
|
return '<User {0}>'.format(self.username)
|
|
|
|
def get_totp_uri(self):
|
|
return "otpauth://totp/{0}:{1}?secret={2}&issuer=PowerDNS-Admin".format(
|
|
Setting().get('site_name'), self.username, self.otp_secret)
|
|
|
|
def verify_totp(self, token):
|
|
totp = pyotp.TOTP(self.otp_secret)
|
|
return totp.verify(token, valid_window = 5)
|
|
|
|
def get_hashed_password(self, plain_text_password=None):
|
|
# Hash a password for the first time
|
|
# (Using bcrypt, the salt is saved into the hash itself)
|
|
if plain_text_password is None:
|
|
return plain_text_password
|
|
|
|
pw = plain_text_password if plain_text_password else self.plain_text_password
|
|
return bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt())
|
|
|
|
def check_password(self, hashed_password):
|
|
# Check hashed password. Using bcrypt, the salt is saved into the hash itself
|
|
if hasattr(self, "plain_text_password"):
|
|
if self.plain_text_password != None:
|
|
return bcrypt.checkpw(self.plain_text_password.encode('utf-8'),
|
|
hashed_password.encode('utf-8'))
|
|
return False
|
|
|
|
def get_user_info_by_id(self):
|
|
user_info = User.query.get(int(self.id))
|
|
return user_info
|
|
|
|
def get_user_info_by_username(self):
|
|
user_info = User.query.filter(User.username == self.username).first()
|
|
return user_info
|
|
|
|
def ldap_init_conn(self):
|
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
|
conn = ldap.initialize(Setting().get('ldap_uri'))
|
|
conn.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
|
|
conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
|
conn.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
|
conn.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
|
conn.protocol_version = ldap.VERSION3
|
|
return conn
|
|
|
|
def escape_filter_chars(self, filter_str):
|
|
"""
|
|
Escape chars for ldap search
|
|
"""
|
|
escape_chars = ['\\', '*', '(', ')', '\x00']
|
|
replace_chars = ['\\5c', '\\2a', '\\28', '\\29', '\\00']
|
|
for escape_char in escape_chars:
|
|
filter_str = filter_str.replace(escape_char, replace_chars[escape_chars.index(escape_char)])
|
|
return filter_str
|
|
|
|
def ldap_search(self, searchFilter, baseDN, retrieveAttributes=None):
|
|
searchScope = ldap.SCOPE_SUBTREE
|
|
|
|
try:
|
|
conn = self.ldap_init_conn()
|
|
if Setting().get('ldap_type') == 'ad':
|
|
conn.simple_bind_s(
|
|
"{0}@{1}".format(self.username,
|
|
Setting().get('ldap_domain')),
|
|
self.password)
|
|
else:
|
|
conn.simple_bind_s(Setting().get('ldap_admin_username'),
|
|
Setting().get('ldap_admin_password'))
|
|
ldap_result_id = conn.search(baseDN, searchScope, searchFilter,
|
|
retrieveAttributes)
|
|
result_set = []
|
|
|
|
while 1:
|
|
result_type, result_data = conn.result(ldap_result_id, 0)
|
|
if (result_data == []):
|
|
break
|
|
else:
|
|
if result_type == ldap.RES_SEARCH_ENTRY:
|
|
result_set.append(result_data)
|
|
return result_set
|
|
|
|
except ldap.LDAPError as e:
|
|
current_app.logger.error(e)
|
|
current_app.logger.debug('baseDN: {0}'.format(baseDN))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
|
|
def ldap_auth(self, ldap_username, password):
|
|
try:
|
|
conn = self.ldap_init_conn()
|
|
conn.simple_bind_s(ldap_username, password)
|
|
return True
|
|
except ldap.LDAPError as e:
|
|
current_app.logger.error(e)
|
|
return False
|
|
|
|
def is_validate(self, method, src_ip='', trust_user=False):
|
|
"""
|
|
Validate user credential
|
|
"""
|
|
role_name = 'User'
|
|
|
|
if method == 'LOCAL':
|
|
user_info = User.query.filter(
|
|
User.username == self.username).first()
|
|
|
|
if user_info:
|
|
if trust_user or (user_info.password and self.check_password(
|
|
user_info.password)):
|
|
current_app.logger.info(
|
|
'User "{0}" logged in successfully. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
return True
|
|
current_app.logger.error(
|
|
'User "{0}" inputted a wrong password. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
return False
|
|
|
|
current_app.logger.warning(
|
|
'User "{0}" does not exist. Authentication request from {1}'.
|
|
format(self.username, src_ip))
|
|
return False
|
|
|
|
if method == 'LDAP':
|
|
LDAP_TYPE = Setting().get('ldap_type')
|
|
LDAP_BASE_DN = Setting().get('ldap_base_dn')
|
|
LDAP_FILTER_BASIC = Setting().get('ldap_filter_basic')
|
|
LDAP_FILTER_USERNAME = Setting().get('ldap_filter_username')
|
|
LDAP_FILTER_GROUP = Setting().get('ldap_filter_group')
|
|
LDAP_FILTER_GROUPNAME = Setting().get('ldap_filter_groupname')
|
|
LDAP_ADMIN_GROUP = Setting().get('ldap_admin_group')
|
|
LDAP_OPERATOR_GROUP = Setting().get('ldap_operator_group')
|
|
LDAP_USER_GROUP = Setting().get('ldap_user_group')
|
|
LDAP_GROUP_SECURITY_ENABLED = Setting().get('ldap_sg_enabled')
|
|
|
|
# validate AD user password
|
|
if Setting().get('ldap_type') == 'ad' and not trust_user:
|
|
ldap_username = "{0}@{1}".format(self.username,
|
|
Setting().get('ldap_domain'))
|
|
if not self.ldap_auth(ldap_username, self.password):
|
|
current_app.logger.error(
|
|
'User "{0}" input a wrong LDAP password. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
return False
|
|
|
|
searchFilter = "(&({0}={1}){2})".format(LDAP_FILTER_USERNAME,
|
|
self.username,
|
|
LDAP_FILTER_BASIC)
|
|
current_app.logger.debug('Ldap searchFilter {0}'.format(searchFilter))
|
|
|
|
ldap_result = self.ldap_search(searchFilter, LDAP_BASE_DN)
|
|
current_app.logger.debug('Ldap search result: {0}'.format(ldap_result))
|
|
|
|
if not ldap_result:
|
|
current_app.logger.warning(
|
|
'LDAP User "{0}" does not exist. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
return False
|
|
else:
|
|
try:
|
|
ldap_username = ldap.filter.escape_filter_chars(
|
|
ldap_result[0][0][0])
|
|
|
|
if Setting().get('ldap_type') != 'ad' and not trust_user:
|
|
# validate ldap user password
|
|
if not self.ldap_auth(ldap_username, self.password):
|
|
current_app.logger.error(
|
|
'User "{0}" input a wrong LDAP password. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
return False
|
|
|
|
# check if LDAP_GROUP_SECURITY_ENABLED is True
|
|
# user can be assigned to ADMIN or USER role.
|
|
if LDAP_GROUP_SECURITY_ENABLED:
|
|
try:
|
|
if LDAP_TYPE == 'ldap':
|
|
groupSearchFilter = "(&({0}={1}){2})".format(LDAP_FILTER_GROUPNAME, ldap_username, LDAP_FILTER_GROUP)
|
|
current_app.logger.debug('Ldap groupSearchFilter {0}'.format(groupSearchFilter))
|
|
if (LDAP_ADMIN_GROUP and self.ldap_search(groupSearchFilter, LDAP_ADMIN_GROUP)):
|
|
role_name = 'Administrator'
|
|
current_app.logger.info(
|
|
'User {0} is part of the "{1}" group that allows admin access to PowerDNS-Admin'
|
|
.format(self.username, LDAP_ADMIN_GROUP))
|
|
elif (LDAP_OPERATOR_GROUP and self.ldap_search(groupSearchFilter, LDAP_OPERATOR_GROUP)):
|
|
role_name = 'Operator'
|
|
current_app.logger.info(
|
|
'User {0} is part of the "{1}" group that allows operator access to PowerDNS-Admin'
|
|
.format(self.username, LDAP_OPERATOR_GROUP))
|
|
elif (LDAP_USER_GROUP and self.ldap_search(groupSearchFilter, LDAP_USER_GROUP)):
|
|
current_app.logger.info(
|
|
'User {0} is part of the "{1}" group that allows user access to PowerDNS-Admin'
|
|
.format(self.username, LDAP_USER_GROUP))
|
|
else:
|
|
current_app.logger.error(
|
|
'User {0} is not part of any security groups that allow access to PowerDNS-Admin'
|
|
.format(self.username))
|
|
return False
|
|
elif LDAP_TYPE == 'ad':
|
|
ldap_group_security_roles = OrderedDict(
|
|
Administrator=LDAP_ADMIN_GROUP,
|
|
Operator=LDAP_OPERATOR_GROUP,
|
|
User=LDAP_USER_GROUP,
|
|
)
|
|
user_dn = self.escape_filter_chars(ldap_result[0][0][0])
|
|
sf_groups = ""
|
|
|
|
for group in ldap_group_security_roles.values():
|
|
if not group:
|
|
continue
|
|
|
|
sf_groups += f"(distinguishedName={group})"
|
|
|
|
sf_member_user = f"(member:1.2.840.113556.1.4.1941:={user_dn})"
|
|
search_filter = f"(&(|{sf_groups}){sf_member_user})"
|
|
current_app.logger.debug(f"LDAP groupSearchFilter '{search_filter}'")
|
|
|
|
ldap_user_groups = [
|
|
group[0][0]
|
|
for group in self.ldap_search(
|
|
search_filter,
|
|
LDAP_BASE_DN
|
|
)
|
|
]
|
|
|
|
if not ldap_user_groups:
|
|
current_app.logger.error(
|
|
f"User '{self.username}' "
|
|
"does not belong to any group "
|
|
"while LDAP_GROUP_SECURITY_ENABLED is ON"
|
|
)
|
|
return False
|
|
|
|
current_app.logger.debug(
|
|
"LDAP User security groups "
|
|
f"for user '{self.username}': "
|
|
" ".join(ldap_user_groups)
|
|
)
|
|
|
|
for role, ldap_group in ldap_group_security_roles.items():
|
|
# Continue when groups is not defined or
|
|
# user is'nt member of LDAP group
|
|
if not ldap_group or not ldap_group in ldap_user_groups:
|
|
continue
|
|
|
|
role_name = role
|
|
current_app.logger.info(
|
|
f"User '{self.username}' member of "
|
|
f"the '{ldap_group}' group that allows "
|
|
f"'{role}' access to to PowerDNS-Admin"
|
|
)
|
|
|
|
# Stop loop on first found
|
|
break
|
|
|
|
else:
|
|
current_app.logger.error('Invalid LDAP type')
|
|
return False
|
|
except Exception as e:
|
|
current_app.logger.error(
|
|
'LDAP group lookup for user "{0}" has failed. Authentication request from {1}'
|
|
.format(self.username, src_ip))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
return False
|
|
|
|
except Exception as e:
|
|
current_app.logger.error('Wrong LDAP configuration. {0}'.format(e))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
return False
|
|
|
|
# create user if not exist in the db
|
|
if not User.query.filter(User.username == self.username).first():
|
|
self.firstname = self.username
|
|
self.lastname = ''
|
|
try:
|
|
# try to get user's firstname, lastname and email address from LDAP attributes
|
|
if LDAP_TYPE == 'ldap':
|
|
self.firstname = ldap_result[0][0][1]['givenName'][
|
|
0].decode("utf-8")
|
|
self.lastname = ldap_result[0][0][1]['sn'][0].decode(
|
|
"utf-8")
|
|
self.email = ldap_result[0][0][1]['mail'][0].decode(
|
|
"utf-8")
|
|
elif LDAP_TYPE == 'ad':
|
|
self.firstname = ldap_result[0][0][1]['name'][
|
|
0].decode("utf-8")
|
|
self.email = ldap_result[0][0][1]['userPrincipalName'][
|
|
0].decode("utf-8")
|
|
except Exception as e:
|
|
current_app.logger.warning(
|
|
"Reading ldap data threw an exception {0}".format(e))
|
|
current_app.logger.debug(traceback.format_exc())
|
|
|
|
# first register user will be in Administrator role
|
|
if User.query.count() == 0:
|
|
self.role_id = Role.query.filter_by(
|
|
name='Administrator').first().id
|
|
else:
|
|
self.role_id = Role.query.filter_by(
|
|
name=role_name).first().id
|
|
|
|
self.create_user()
|
|
current_app.logger.info('Created user "{0}" in the DB'.format(
|
|
self.username))
|
|
|
|
# user already exists in database, set their role based on group membership (if enabled)
|
|
if LDAP_GROUP_SECURITY_ENABLED:
|
|
self.set_role(role_name)
|
|
|
|
return True
|
|
else:
|
|
current_app.logger.error('Unsupported authentication method')
|
|
return False
|
|
|
|
def create_user(self):
|
|
"""
|
|
If user logged in successfully via LDAP in the first time
|
|
We will create a local user (in DB) in order to manage user
|
|
profile such as name, roles,...
|
|
"""
|
|
|
|
# Set an invalid password hash for non local users
|
|
self.password = '*'
|
|
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
|
|
def create_local_user(self):
|
|
"""
|
|
Create local user witch stores username / password in the DB
|
|
"""
|
|
# check if username existed
|
|
user = User.query.filter(str(User.username).lower() == self.username.lower()).first()
|
|
if user:
|
|
return {'status': False, 'msg': 'Username is already in use'}
|
|
|
|
# check if email existed
|
|
user = User.query.filter(str(User.email).lower() == self.email.lower()).first()
|
|
if user:
|
|
return {'status': False, 'msg': 'Email address is already in use'}
|
|
|
|
# first register user will be in Administrator role
|
|
if self.role_id is None:
|
|
self.role_id = Role.query.filter_by(name='User').first().id
|
|
if User.query.count() == 0:
|
|
self.role_id = Role.query.filter_by(
|
|
name='Administrator').first().id
|
|
|
|
if hasattr(self, "plain_text_password"):
|
|
if self.plain_text_password != None:
|
|
self.password = self.get_hashed_password(
|
|
self.plain_text_password)
|
|
else:
|
|
self.password = '*'
|
|
|
|
if self.password and self.password != '*':
|
|
self.password = self.password.decode("utf-8")
|
|
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
return {'status': True, 'msg': 'Created user successfully'}
|
|
|
|
def update_local_user(self):
|
|
"""
|
|
Update local user
|
|
"""
|
|
# Sanity check - account name
|
|
if self.username == "":
|
|
return {'status': False, 'msg': 'No user name specified'}
|
|
|
|
# read user and check that it exists
|
|
user = User.query.filter(User.username == self.username).first()
|
|
if not user:
|
|
return {'status': False, 'msg': 'User does not exist'}
|
|
|
|
# check if new email exists (only if changed)
|
|
if user.email != self.email:
|
|
checkuser = User.query.filter(User.email == self.email).first()
|
|
if checkuser:
|
|
return {
|
|
'status': False,
|
|
'msg': 'New email address is already in use'
|
|
}
|
|
|
|
user.firstname = self.firstname
|
|
user.lastname = self.lastname
|
|
user.email = self.email
|
|
|
|
# store new password hash (only if changed)
|
|
if hasattr(self, "plain_text_password"):
|
|
if self.plain_text_password != None:
|
|
user.password = self.get_hashed_password(
|
|
self.plain_text_password).decode("utf-8")
|
|
|
|
db.session.commit()
|
|
return {'status': True, 'msg': 'User updated successfully'}
|
|
|
|
def update_profile(self, enable_otp=None):
|
|
"""
|
|
Update user profile
|
|
"""
|
|
user = User.query.filter(User.username == self.username).first()
|
|
if not user:
|
|
return False
|
|
|
|
user.firstname = self.firstname if self.firstname else user.firstname
|
|
user.lastname = self.lastname if self.lastname else user.lastname
|
|
|
|
if hasattr(self, "plain_text_password"):
|
|
if self.plain_text_password != None:
|
|
user.password = self.get_hashed_password(
|
|
self.plain_text_password).decode("utf-8")
|
|
|
|
if self.email:
|
|
# Can not update to a new email that
|
|
# already been used.
|
|
existing_email = User.query.filter(
|
|
User.email == self.email,
|
|
User.username != self.username).first()
|
|
if existing_email:
|
|
return False
|
|
# If need to verify new email,
|
|
# update the "confirmed" status.
|
|
if user.email != self.email:
|
|
user.email = self.email
|
|
if Setting().get('verify_user_email'):
|
|
user.confirmed = 0
|
|
|
|
if enable_otp is not None:
|
|
user.otp_secret = ""
|
|
|
|
if enable_otp == True:
|
|
# generate the opt secret key
|
|
user.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
|
|
|
|
try:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return True
|
|
except Exception:
|
|
db.session.rollback()
|
|
return False
|
|
|
|
def update_confirmed(self, confirmed):
|
|
"""
|
|
Update user email confirmation status
|
|
"""
|
|
self.confirmed = confirmed
|
|
db.session.commit()
|
|
|
|
def get_domains(self):
|
|
"""
|
|
Get list of zones which the user is granted to have
|
|
access.
|
|
|
|
Note: This doesn't include the permission granting from Account
|
|
which user belong to
|
|
"""
|
|
return self.get_domain_query().all()
|
|
|
|
def get_user_domains(self):
|
|
from ..models.base import db
|
|
from .account import Account
|
|
from .domain import Domain
|
|
from .account_user import AccountUser
|
|
from .domain_user import DomainUser
|
|
|
|
domains = db.session.query(Domain) \
|
|
.outerjoin(DomainUser, Domain.id == DomainUser.domain_id) \
|
|
.outerjoin(Account, Domain.account_id == Account.id) \
|
|
.outerjoin(AccountUser, Account.id == AccountUser.account_id) \
|
|
.filter(
|
|
db.or_(
|
|
DomainUser.user_id == self.id,
|
|
AccountUser.user_id == self.id
|
|
)).all()
|
|
return domains
|
|
|
|
def delete(self):
|
|
"""
|
|
Delete a user
|
|
"""
|
|
# revoke all user privileges first
|
|
self.revoke_privilege()
|
|
|
|
try:
|
|
User.query.filter(User.username == self.username).delete()
|
|
db.session.commit()
|
|
return True
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error('Cannot delete user {0} from DB. DETAIL: {1}'.format(
|
|
self.username, e))
|
|
return False
|
|
|
|
def revoke_privilege(self, update_user=False):
|
|
"""
|
|
Revoke all privileges from a user
|
|
"""
|
|
user = User.query.filter(User.username == self.username).first()
|
|
|
|
if user:
|
|
user_id = user.id
|
|
try:
|
|
DomainUser.query.filter(DomainUser.user_id == user_id).delete()
|
|
if (update_user)==True:
|
|
AccountUser.query.filter(AccountUser.user_id == user_id).delete()
|
|
db.session.commit()
|
|
return True
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
current_app.logger.error(
|
|
'Cannot revoke user {0} privileges. DETAIL: {1}'.format(
|
|
self.username, e))
|
|
return False
|
|
return False
|
|
|
|
def set_role(self, role_name):
|
|
role = Role.query.filter(Role.name == role_name).first()
|
|
if role:
|
|
user = User.query.filter(User.username == self.username).first()
|
|
user.role_id = role.id
|
|
db.session.commit()
|
|
return {'status': True, 'msg': 'Set user role successfully'}
|
|
else:
|
|
return {'status': False, 'msg': 'Role does not exist'}
|
|
|
|
@orm.reconstructor
|
|
def set_account(self):
|
|
self.accounts = self.get_accounts()
|
|
|
|
def get_accounts(self):
|
|
"""
|
|
Get accounts associated with this user
|
|
"""
|
|
from .account import Account
|
|
from .account_user import AccountUser
|
|
accounts = []
|
|
query = db.session\
|
|
.query(
|
|
AccountUser,
|
|
Account)\
|
|
.filter(self.id == AccountUser.user_id)\
|
|
.filter(Account.id == AccountUser.account_id)\
|
|
.order_by(Account.name)\
|
|
.all()
|
|
for q in query:
|
|
accounts.append(q[1])
|
|
return accounts
|
|
|
|
def get_qrcode_value(self):
|
|
img = qrc.make(self.get_totp_uri(),
|
|
image_factory=qrc_svg.SvgPathImage)
|
|
stream = BytesIO()
|
|
img.save(stream)
|
|
return stream.getvalue()
|
|
|
|
|
|
def read_entitlements(self, key):
|
|
"""
|
|
Get entitlements from ldap server associated with this user
|
|
"""
|
|
LDAP_BASE_DN = Setting().get('ldap_base_dn')
|
|
LDAP_FILTER_USERNAME = Setting().get('ldap_filter_username')
|
|
LDAP_FILTER_BASIC = Setting().get('ldap_filter_basic')
|
|
searchFilter = "(&({0}={1}){2})".format(LDAP_FILTER_USERNAME,
|
|
self.username,
|
|
LDAP_FILTER_BASIC)
|
|
current_app.logger.debug('Ldap searchFilter {0}'.format(searchFilter))
|
|
ldap_result = self.ldap_search(searchFilter, LDAP_BASE_DN, [key])
|
|
current_app.logger.debug('Ldap search result: {0}'.format(ldap_result))
|
|
entitlements=[]
|
|
if ldap_result:
|
|
dict=ldap_result[0][0][1]
|
|
if len(dict)!=0:
|
|
for entitlement in dict[key]:
|
|
entitlements.append(entitlement.decode("utf-8"))
|
|
else:
|
|
e="Not found value in the autoprovisioning attribute field "
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
return entitlements
|
|
|
|
def updateUser(self, Entitlements):
|
|
"""
|
|
Update user associations based on ldap attribute
|
|
"""
|
|
entitlements= getCorrectEntitlements(Entitlements)
|
|
if len(entitlements)!=0:
|
|
self.revoke_privilege(True)
|
|
for entitlement in entitlements:
|
|
arguments=entitlement.split(':')
|
|
entArgs=arguments[arguments.index('powerdns-admin')+1:]
|
|
role= entArgs[0]
|
|
self.set_role(role)
|
|
if (role=="User") and len(entArgs)>1:
|
|
current_domains=getUserInfo(self.get_user_domains())
|
|
current_accounts=getUserInfo(self.get_accounts())
|
|
domain=entArgs[1]
|
|
self.addMissingDomain(domain, current_domains)
|
|
if len(entArgs)>2:
|
|
account=entArgs[2]
|
|
self.addMissingAccount(account, current_accounts)
|
|
|
|
def addMissingDomain(self, autoprovision_domain, current_domains):
|
|
"""
|
|
Add domain gathered by autoprovisioning to the current zones list of a user
|
|
"""
|
|
from ..models.domain import Domain
|
|
user = db.session.query(User).filter(User.username == self.username).first()
|
|
if autoprovision_domain not in current_domains:
|
|
domain= db.session.query(Domain).filter(Domain.name == autoprovision_domain).first()
|
|
if domain!=None:
|
|
domain.add_user(user)
|
|
|
|
def addMissingAccount(self, autoprovision_account, current_accounts):
|
|
"""
|
|
Add account gathered by autoprovisioning to the current accounts list of a user
|
|
"""
|
|
from ..models.account import Account
|
|
user = db.session.query(User).filter(User.username == self.username).first()
|
|
if autoprovision_account not in current_accounts:
|
|
account= db.session.query(Account).filter(Account.name == autoprovision_account).first()
|
|
if account!=None:
|
|
account.add_user(user)
|
|
|
|
def getCorrectEntitlements(Entitlements):
|
|
"""
|
|
Gather a list of valid records from the ldap attribute given
|
|
"""
|
|
from ..models.role import Role
|
|
urn_value=Setting().get('urn_value')
|
|
urnArgs=[x.lower() for x in urn_value.split(':')]
|
|
entitlements=[]
|
|
for Entitlement in Entitlements:
|
|
arguments=Entitlement.split(':')
|
|
|
|
if ('powerdns-admin' in arguments):
|
|
prefix=arguments[0:arguments.index('powerdns-admin')]
|
|
prefix=[x.lower() for x in prefix]
|
|
if (prefix!=urnArgs):
|
|
e= "Typo in first part of urn value"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
|
|
else:
|
|
e="Entry not a PowerDNS-Admin record"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
|
|
if len(arguments)<=len(urnArgs)+1: #prefix:powerdns-admin
|
|
e="No value given after the prefix"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
|
|
entArgs=arguments[arguments.index('powerdns-admin')+1:]
|
|
role=entArgs[0]
|
|
roles= Role.query.all()
|
|
role_names=get_role_names(roles)
|
|
|
|
if role not in role_names:
|
|
e="Role given by entry not a role availabe in PowerDNS-Admin. Check for spelling errors"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
|
|
if len(entArgs)>1:
|
|
if (role!="User"):
|
|
e="Too many arguments for Admin or Operator"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
else:
|
|
if len(entArgs)<=3:
|
|
if entArgs[1] and not checkIfDomainExists(entArgs[1]):
|
|
continue
|
|
if len(entArgs)==3:
|
|
if entArgs[2] and not checkIfAccountExists(entArgs[2]):
|
|
continue
|
|
else:
|
|
e="Too many arguments"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
continue
|
|
|
|
entitlements.append(Entitlement)
|
|
|
|
return entitlements
|
|
|
|
|
|
def checkIfDomainExists(domainName):
|
|
from ..models.domain import Domain
|
|
domain= db.session.query(Domain).filter(Domain.name == domainName)
|
|
if len(domain.all())==0:
|
|
e= domainName + " is not found in the database"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
return False
|
|
return True
|
|
|
|
def checkIfAccountExists(accountName):
|
|
from ..models.account import Account
|
|
account= db.session.query(Account).filter(Account.name == accountName)
|
|
if len(account.all())==0:
|
|
e= accountName + " is not found in the database"
|
|
current_app.logger.warning("Cannot apply autoprovisioning on user: {}".format(e))
|
|
return False
|
|
return True
|
|
|
|
def get_role_names(roles):
|
|
"""
|
|
returns all the roles available in database in string format
|
|
"""
|
|
roles_list=[]
|
|
for role in roles:
|
|
roles_list.append(role.name)
|
|
return roles_list
|
|
|
|
def getUserInfo(DomainsOrAccounts):
|
|
current=[]
|
|
for DomainOrAccount in DomainsOrAccounts:
|
|
current.append(DomainOrAccount.name)
|
|
return current
|