import random
from datetime import datetime, timedelta
from time import time
from string import ascii_lowercase, ascii_uppercase, digits
from hashlib import sha224
from clink.type import Service, AuthConf
from clink.com import stamp
from clink.dflow import verify, ExistError, NonExistError, ExpiredError
from clink.model.std import acc_name as name_model, acc_pwd as pwd_model
from clink.model.std import email as email_model, phone as phone_model
from clink.model.acc import confirm_code as confirm_code_model
from .authdb_sv import AuthDbSv
from .type import ConfirmCodeSpec
_ACT_REGISTERED = 'REGISTERED'
_ACT_CHANGE_PWD = 'CHANGE_PWD'
_ACT_RESET_PWD = 'RESET_PWD'
_ACT_ADD_TO_GRP = 'ADD_TO_GRP'
_ACT_RM_FRM_GRP = 'RM_FRM_GRP'
_PWD_CHARS = ascii_lowercase + ascii_uppercase + digits
_CODE_CHARS = ascii_uppercase
def _hash_pwd(password):
return sha224(password.encode('utf-8')).hexdigest()
def _rand_pwd():
return ''.join(random.sample(_PWD_CHARS, 6))
def rand_code():
a = ''.join(random.sample(_CODE_CHARS, 4))
b = ''.join(random.sample(_CODE_CHARS, 4))
c = ''.join(random.sample(_CODE_CHARS, 4))
d = ''.join(random.sample(_CODE_CHARS, 4))
return '-'.join([a, b, c, d])
@stamp(AuthDbSv, AuthConf)
[docs]class AccSv(Service):
'''
Manage accounts and related concepts
'''
def __init__(self, authdb_sv, auth_conf):
'''
:param AuthDbSv authdb_sv:
:param AuthConf auth_conf:
'''
self._acc_doc = authdb_sv.acc_doc()
self._grp_doc = authdb_sv.grp_doc()
self._rpwd_doc = authdb_sv.rpwd_doc()
self._acctmp_doc = authdb_sv.acctmp_doc()
self.rpwd_time = 3600
self.create_time = 3600
root_acc = self.find_name('root')
if root_acc is None:
self.mk_acc('root', auth_conf.root_pwd, auth_conf.root_email)
@verify(None, name_model, pwd_model, email_model, phone_model)
[docs] def mk_acc(self, name, password, email, phone=None):
'''
Create new account
:param str name:
:param str password:
:param str email:
:param str phone:
:rtype: bson.objectid.ObjectId
:raise TypeError:
'''
account = {
'name': name,
'hashpwd': _hash_pwd(password),
'email': email,
'phone': phone,
'groups': [],
'created_date': datetime.utcnow(),
'modified_date': datetime.utcnow(),
'last_action': _ACT_REGISTERED
}
result = self._acc_doc.insert_one(account)
return result.inserted_id
@verify(None, name_model, pwd_model, email_model, phone_model)
[docs] def mk_reg_code(self, name, password, email, phone=None):
'''
Create a registration code. Use returned code with cf_reg_code()
to create account
:param str name:
:param str password:
:param str email:
:param str phone:
:rtype: ConfirmCodeSpec
:raise TypeError:
:raise ExistError:
'''
if self._acc_doc.find_one({'name': name}) is not None:
raise ExistError({'name': name})
if self._acc_doc.find_one({'email': email}) is not None:
raise ExistError({'email': email})
if phone is not None:
if self._acc_doc.find_one({'phone': phone}) is not None:
raise ExistError({'phone': phone})
if self._acctmp_doc.find_one({'name': name}) is not None:
raise ExistError({'name': name})
if self._acctmp_doc.find_one({'email': email}) is not None:
raise ExistError({'email': email})
if phone is not None:
if self._acctmp_doc.find_one({'phone': phone}) is not None:
raise ExistError({'phone': 'phone'})
datetime_now = datetime.utcnow().timestamp()
self._acctmp_doc.delete_many({'_expired_date': {'$lt': datetime_now}})
creation_code = rand_code()
expired_date = datetime.utcnow() + timedelta(hours=self.create_time)
acctmp = {
'name': name,
'hashpwd': _hash_pwd(password),
'email': email,
'phone': phone,
'groups': [],
'created_date': datetime.utcnow(),
'modified_date': datetime.utcnow(),
'last_action': _ACT_REGISTERED,
'_expired_date': expired_date.timestamp(),
'_creation_code': creation_code
}
self._acctmp_doc.insert_one(acctmp)
return ConfirmCodeSpec(creation_code, expired_date)
@verify(None, confirm_code_model)
[docs] def cf_reg_code(self, code):
'''
Use registration code to create account
:param str code:
:rtype: dict
:raise ExistError:
:raise ExpiredError:
'''
acctmp = self._acctmp_doc.find_one({'_creation_code': code})
if acctmp is None:
raise NonExistError({'code': code})
if acctmp['_expired_date'] < datetime.utcnow().timestamp():
raise ExpiredError({'code': time()})
self._acctmp_doc.delete_one({'_creation_code': code})
del acctmp['_id']
del acctmp['_expired_date']
del acctmp['_creation_code']
self._acc_doc.insert_one(acctmp)
del acctmp['hashpwd']
return acctmp
[docs] def find_id(self, id):
'''
Find account by identity
:param bson.objectid.ObjectId id:
:rtype: dict
'''
return self._acc_doc.find_one({'_id': id})
@verify(None, name_model)
[docs] def find_name(self, name):
'''
Find account by name
:param str name:
:rtype: dict
:raise TypeError:
'''
return self._acc_doc.find_one({'name': name})
@verify(None, email_model)
[docs] def find_email(self, email):
'''
Find account by email
:param str email:
:rtype: dict
:raise TypeError:
'''
return self._acc_doc.find_one({'email': email})
@verify(None, phone_model)
[docs] def find_phone(self, phone):
'''
Find account by phone number
:param str phone:
:rtype: dict
:raise TypeError:
'''
return self._acc_doc.find_one({'phone': phone})
@verify(None, name_model, pwd_model)
[docs] def find_pwd(self, name, pwd):
'''
Find account by name and password
:param str name:
:param str pwd:
:rtype: dict
:raise TypeError:
'''
hashpwd = _hash_pwd(pwd)
return self._acc_doc.find_one({'name': name, 'hashpwd': hashpwd})
[docs] def rm_acc(self, id):
'''
Remove account by identity
:param bson.objectid.ObjectId id:
'''
result = self._acc_doc.delete_one({'_id': id})
if result.deleted_count != 1:
raise NonExistError({'id': id})
@verify(None, None, pwd_model)
[docs] def ch_pwd(self, id, new_pwd):
'''
Change password of account by identity
:param bson.objectid.ObjectId id:
:param str new_pwd:
:raise TypeError:
'''
upd = {
'$set': {
'hashpwd': _hash_pwd(new_pwd),
'modified_date': datetime.utcnow(),
'last_action': _ACT_CHANGE_PWD
}
}
result = self._acc_doc.update_one({'_id': id}, upd)
if result.modified_count != 1:
raise NonExistError({'id': id})
@verify(None, email_model)
[docs] def mk_rpwd_code(self, email):
'''
Create reset password code from email.
Use returned code with cf_rpwd_code() to reset to new password
:param str email:
:rtype: ConfirmCodeSpec
:raise TypeError:
'''
acc = self._acc_doc.find_one({'email': email})
if acc is None:
raise NonExistError({'email': email})
self._rpwd_doc.delete_many({'acc_id': acc['_id']})
reset_code = rand_code()
exp_date = datetime.utcnow() + timedelta(hours=self.rpwd_time)
code_spec = {
'code': reset_code,
'expired_date': exp_date.timestamp(),
'acc_id': acc['_id'],
'acc_email': acc['email']
}
self._rpwd_doc.insert_one(code_spec)
return ConfirmCodeSpec(reset_code, exp_date)
@verify(None, confirm_code_model, pwd_model)
[docs] def cf_rpwd_code(self, code, new_pwd):
'''
Reset password from code
:param str code:
:param str new_pwd:
:rtype: bson.objectid.ObjectId
:raise TypeError:
'''
code_spec = self._rpwd_doc.find_one({'code': code})
if code_spec is None:
raise NonExistError({'code': code})
if code_spec['expired_date'] < time():
raise ExpiredError({'code': time()})
acc_id = code_spec['acc_id']
self._rpwd_doc.delete_many({'acc_id': acc_id})
new_hashpwd = _hash_pwd(new_pwd)
upd = {
'$set': {
'hashpwd': new_hashpwd,
'last_action': _ACT_RESET_PWD
}
}
self._acc_doc.update_one({'_id': acc_id}, upd)
return acc_id
[docs] def mk_group(self, group_name):
'''
Create new account group
:param str group_name:
'''
self._grp_doc.insert_one({'name': group_name})
[docs] def rm_group(self, group_name):
'''
Remove account group
:param str group_name:
'''
result = self._grp_doc.delete_one({'name': group_name})
if result.deleted_count != 1:
raise NonExistError({'group_name': group_name})
[docs] def add_to_group(self, acc_id, group_name):
'''
Put an account into group
:param bson.objectid.ObjectId acc_id:
:param str group_name:
'''
acc = self._acc_doc.find_one({'_id': acc_id})
if acc is None:
raise NonExistError({'id': acc_id})
grp = self._grp_doc.find_one({'name': group_name})
if grp is None:
raise NonExistError({'group_name': group_name})
if group_name in acc['groups']:
raise ExistError({'group_name': group_name})
upd = {
'$push': {'groups': group_name},
'$set': {'last_action': _ACT_ADD_TO_GRP}
}
self._acc_doc.update_one({'_id': acc_id}, upd)
[docs] def del_fm_group(self, acc_id, group_name):
'''
Remove an account from group
:param bson.objectid.ObjectId acc_id:
:param str group_name:
'''
acc = self._acc_doc.find_one({'_id': acc_id})
if acc is None:
raise NonExistError({'id': acc_id})
if group_name not in acc['groups']:
raise NonExistError({'group_name': group_name})
upd = {
'$pull': {'groups': group_name},
'$set': {'last_action': _ACT_RM_FRM_GRP}
}
self._acc_doc.update_one({'_id': acc_id}, upd)