Source code for fardel.core.auth.views

"""
Objects
=======

Object are json objects.

1. Permission
    :name: Name of the permission to display it.
    :code_name: the code for the permission.

2. Group
    :id: ID of the group in database.
    :name: Name of the group to display it.
    :permissions: List of permission object.

3. User
    :id: ID of the user in database.
    :first_name: First name of the user.
    :last_name: Last name of the user.
    :email: email of the user.


4. Access
    At login API, if the user is staffmember or admin we have such an object.

    :group: The group object which user is member of it.
    :is_admin: If the user is admin then its true.
    :is_staff: If the user is staffmember then its true.  
    

Errors
======

If a token needs to be refreshed:

:status_code: 401
:response:
    .. code-block:: json
        
        {"message": "Fresh token required"}

Invalid token:

:status_code: 422
:response:
    .. code-block:: json

        {"message": "reason"}

Expired token:

:status_code: 401
:response:
    .. code-block:: json

        {"message": "Token has expired"}

Revoked token:

:status_code: 401
:response:
    .. code-block:: json

        {"message":"Token has been revoked"}
"""


from sqlalchemy import or_
from flask import render_template, redirect, url_for, jsonify, request, make_response

from fardel.core.rest import create_api, abort, Resource
from flask_jwt_extended import (
    create_access_token, create_refresh_token,
    jwt_required, jwt_refresh_token_required,
    get_jwt_identity, get_raw_jwt, current_user,
    jwt_optional
)

from fardel.ext import jwt, db
from fardel.core.base import BaseResource

from . import mod
from .models import *


auth_api = create_api(mod)

@mod.before_app_first_request
def create_permissions():
    setup_permissions()


def rest_resource(resource_cls):
    """ Decorator for adding resources to Api App """
    auth_api.add_resource(resource_cls, *resource_cls.endpoints)
    return resource_cls


[docs]@rest_resource class RegistrationApi(BaseResource): """ :URL: ``/api/auth/register/`` """ endpoints = ['/register/'] def already_exists(self): return { 'message': 'A user with this email already exists.' }, 409
[docs] def post(self): """ :required arguments: * email * password :optional arguments: * first_name * last_name :response: .. code-block:: json { "message":"Successfully registered", "access_token":"access_token", "refresh_token":"refresh_token" } :errors: if email or password does not provided: :status_code: 400 :response: .. code-block:: json {"message":"Unvalid form submitted"} If email already exists: :status_code: 409 :response: .. code-block:: json {"message": "A user with this email already exists."} """ data = request.get_json() if not self.check_data(data, ['password', 'email']): return self.bad_request() password = data['password'] email = data['email'] first_name = data.get('first_name') last_name = data.get('last_name') if User.query.filter_by(email=email).first(): return self.already_exists() u = User(password=password, email=email, first_name=first_name, last_name=last_name) db.session.add(u) db.session.commit() access_token = create_access_token(identity=u.email) refresh_token = create_refresh_token(identity=u.email) return { 'message':'Successfully registered', 'access_token': access_token, 'refresh_token': refresh_token }
[docs]@rest_resource class LoginApi(BaseResource): """ :URL: ``/api/auth/login/`` """ endpoints = ['/login/']
[docs] def post(self): """ :required arguments: * email * password :response: .. code-block:: python { "message":"Successfully registered", "access_token":"access_token", "refresh_token":"refresh_token", "access": AccessObject } :errors: if email or password does not provided: :status_code: 400 :response: .. code-block:: json {"message":"Unvalid form submitted"} If email or password is not correct: :status_code: 401 :response: .. code-block:: json {"message":"Username or password is not correct"} """ data = request.get_json() if not self.check_data(data, ['email', 'password']): return self.bad_request() password = data['password'] email = data['email'] u = User.query.filter_by(email=email).scalar() if u and u.check_password(password): access_token = create_access_token(identity=u.email) refresh_token = create_refresh_token(identity=u.email) obj = { 'message':'Successfully logined', 'access_token': access_token, 'refresh_token': refresh_token, } access = u.access_dict() if access: obj['access'] = access return obj return { 'message':'Username or password is not correct' }, 401
[docs]@rest_resource class LogoutApi(BaseResource): """ :URL: ``/api/auth/logout/`` """ endpoints = ['/logout/'] method_decorators = [jwt_required]
[docs] def post(self): """ * Authorization header containing access token is required :status_code: 200 :response: .. code-block:: json { "message": "Access token has been revoked" } """ jti = get_raw_jwt()['jti'] revoked_token = RevokedToken(jti=jti) revoked_token.add() return {'message': 'Access token has been revoked'}
[docs]@rest_resource class LogoutRefreshApi(BaseResource): """ :URL: ``/api/auth/logout-refresh/`` """ endpoints = ['/logout-refresh/'] method_decorators = [jwt_refresh_token_required]
[docs] def post(self): """ * Authorization header containing refresh token is required :status_code: 200 :response: .. code-block:: json { "message": "Refresh token has been revoked" } """ jti = get_raw_jwt()['jti'] revoked_token = RevokedToken(jti=jti) revoked_token.add() return {'message': 'Refresh token has been revoked'}
[docs]@rest_resource class RefreshTokenApi(BaseResource): """ :URL: ``/api/auth/refresh-token/`` """ endpoints = ['/refresh-token/'] method_decorators = [jwt_refresh_token_required]
[docs] def post(self): """ * Authorization header containing refresh token is required :status_code: 200 :response: .. code-block:: json { "access_token": "access_token" } """ current_user = get_jwt_identity() access_token = create_access_token(identity=current_user) return {'access_token': access_token}
[docs]@rest_resource class ProfileApi(BaseResource): """ :URL: ``/api/auth/profile/`` """ endpoints = ['/profile/'] method_decorators = { 'get':[jwt_required], 'put':[jwt_required] }
[docs] def get(self): """ * Authorization header containing access token is required :status_code: 200 :response: .. code-block:: python { "user": UserObject } """ return {"user":current_user.dict()}
[docs] def put(self): """ * Authorization header containing refresh token is required :status_code: 200 :response: .. code-block:: python { "message": "Profile successfully updated" "user": UserObject } """ data = request.get_json() fields = { "first_name": data.get('first_name'), "last_name": data.get('last_name'), "email": data.get('email'), "password": data.get('password'), } for field in fields: if data.get(field): setattr(current_user, field, data[field]) db.session.commit() return {"message":"Profile successfully updated", 'user':current_user.dict()}