Source code for clink.ctl.auth

from clink import stamp, mapper, Controller
from clink.service import OAuthSv, AccSv, SmtpSv, TemplateSv
from clink.error.http import Http406Error, Http400Error, Http401Error
from clink.util import fs


@stamp(OAuthSv, AccSv, SmtpSv, TemplateSv)
@mapper.path('/auth')
[docs]class AuthCtl(Controller): def __init__(self, oauth_sv, acc_sv, smtp_sv, tpl_sv): self._oauth_sv = oauth_sv self._acc_sv = acc_sv self._smtp_sv = smtp_sv self._tpl_sv = tpl_sv self._init_tpl_files() @mapper.post('/token') def get_token(self, req, res): info = req.body if info is None or 'grant_type' not in info: raise Http400Error(req, 'Require grant_type') grant_type = info['grant_type'] if grant_type == 'password': if 'username' not in info: raise Http400Error(req, 'Require username') if 'password' not in info: raise Http400Error(req, 'Require password') acc = self._acc_sv.find_pwd(info['username'], info['password']) if acc is None: raise Http401Error(req, 'Name or password is invalid') res.body = self._oauth_sv.mktoken_pwd( info['username'], info['password'] ) values = { 'acc_name': acc['name'], 'remote_addr': req.remote_addr } txt_body = self._tpl_sv.build_file(self._LOGIN_TPL, values) subject = 'New login' self._smtp_sv.send(acc['email'], subject, txt_body) elif grant_type == 'refresh_token': if 'refresh_token' not in info: raise Http400Error(req, 'Require refresh_token') res.body = self._oauth_sv.mktoken_rtoken(info['refresh_token']) else: raise Http406Error(req, 'Not support grant_type=%s' % grant_type) def _init_tpl_files(self): ''' Detect template files ''' self._LOGIN_TPL = fs.asset_path('tpl/login.txt')