25 changed files with 665 additions and 29 deletions
@ -0,0 +1,7 @@ |
|||
FLASK_APP=monsun_backend |
|||
FLASK_ENV=development |
|||
SECRET=\\\x10#\xd9\x89,2|hBN\xe4\xdf\xe0\xf7W |
|||
POSTGRES_PASSWORD=pass |
|||
POSTGRES_USER=usr |
|||
POSTGRES_HOST=localhost |
|||
POSTGRES_DB_NAME=monsun_backend |
|||
@ -0,0 +1,3 @@ |
|||
from .admin import make_admin_user # noqa |
|||
from .init import init_app # noqa |
|||
from .permissions import admin_permission # noqa |
|||
@ -0,0 +1,25 @@ |
|||
from flask import Flask |
|||
from monsun_backend import models |
|||
|
|||
from ..container import get_initialize_container |
|||
|
|||
|
|||
def init_app(app: Flask): |
|||
app.before_first_request(make_admin_user) |
|||
|
|||
|
|||
def make_admin_user() -> models.User: |
|||
"""Makes the admin user if he does not exist |
|||
|
|||
The admin is always the first user (id==1) |
|||
""" |
|||
container = get_initialize_container() |
|||
admin_user: models.User = models.user_datastore.get_user(identifier=1) |
|||
if admin_user is None: |
|||
admin_user = models.user_datastore.create_user( |
|||
email=container.config.admin_user_email(), |
|||
password=container.config.admin_user_password(), |
|||
) |
|||
admin_user.save() |
|||
|
|||
return admin_user |
|||
@ -0,0 +1,10 @@ |
|||
"""Cannot be part ot __init__.py because it must be executed AFTER imports""" |
|||
from flask import Flask |
|||
|
|||
from . import admin |
|||
from . import permissions |
|||
|
|||
|
|||
def init_app(app: Flask): |
|||
admin.init_app(app=app) |
|||
permissions.init_app(app=app) |
|||
@ -0,0 +1,24 @@ |
|||
from flask import Flask |
|||
from flask_login import current_user |
|||
from flask_principal import AnonymousIdentity |
|||
from flask_principal import Identity |
|||
from flask_principal import Permission |
|||
from flask_principal import Principal |
|||
from flask_principal import RoleNeed |
|||
from monsun_backend import models |
|||
|
|||
principals = Principal() |
|||
|
|||
|
|||
@principals.identity_loader |
|||
def read_identity_from_flask_login(): |
|||
if current_user.is_authenticated: |
|||
return Identity(current_user.id) |
|||
return AnonymousIdentity() |
|||
|
|||
|
|||
def init_app(app: Flask): |
|||
principals.init_app(app=app) |
|||
|
|||
|
|||
admin_permission = Permission(RoleNeed(models.RoleType.admin.name)) |
|||
@ -0,0 +1,40 @@ |
|||
from flask_sqlalchemy import SQLAlchemy |
|||
from sqlalchemy import Column |
|||
from sqlalchemy import Integer |
|||
from sqlalchemy.ext.declarative import as_declarative |
|||
from sqlalchemy.ext.declarative import declared_attr |
|||
|
|||
|
|||
@as_declarative() |
|||
class BaseModel: |
|||
"""This class provides helper methods that can be used by its subclasses""" |
|||
|
|||
id = Column(Integer, primary_key=True) |
|||
|
|||
@declared_attr |
|||
def __tablename__(cls): |
|||
return cls.__name__.lower() |
|||
|
|||
def save(self): |
|||
db.session.add(self) |
|||
db.session.commit() |
|||
return self |
|||
|
|||
def delete(self): |
|||
db.session.delete(self) |
|||
db.session.commit() |
|||
|
|||
@classmethod |
|||
def get_all(cls): |
|||
return cls.query.all() |
|||
|
|||
def __repr__(self): |
|||
return f"<{self.__class__.__name__} {self.name!r}>" |
|||
|
|||
|
|||
def session_commit(): |
|||
"""Shortcut for ``db.session.commit()``""" |
|||
db.session.commit() |
|||
|
|||
|
|||
db = SQLAlchemy(model_class=BaseModel) |
|||
@ -0,0 +1,15 @@ |
|||
from flask import blueprints |
|||
from flask_api import status |
|||
|
|||
from .. import access |
|||
|
|||
bp = blueprints.Blueprint("admin", __name__) |
|||
|
|||
|
|||
@bp.route( |
|||
rule="/admin", |
|||
methods=["GET"], |
|||
) |
|||
@access.admin_permission.require(http_exception=status.HTTP_403_FORBIDDEN) |
|||
def admin(): |
|||
return "success", 200 |
|||
@ -0,0 +1,38 @@ |
|||
from typing import Optional |
|||
|
|||
from flask import blueprints |
|||
from flask import make_response |
|||
from flask import request |
|||
from flask_api import status |
|||
from flask_login import login_user |
|||
from monsun_backend import error |
|||
from monsun_backend import marshmallow_schemas |
|||
from monsun_backend import models |
|||
|
|||
bp = blueprints.Blueprint("login", __name__) |
|||
|
|||
|
|||
@bp.route( |
|||
rule="/login", |
|||
methods=["POST"], |
|||
) |
|||
def login(): |
|||
email_: Optional[str] = request.form.get("email") |
|||
if email_ is None: |
|||
raise error.BadRequest("email is missing") |
|||
|
|||
user_: models.User = models.User.query.filter_by(email=email_).first() |
|||
if not user_: |
|||
raise error.BadRequest("User does not exist") |
|||
|
|||
if user_.check_password(password=request.form.get("password")): |
|||
if login_user(user_): |
|||
return make_response( |
|||
marshmallow_schemas.user_schema.jsonify(user_), |
|||
status.HTTP_200_OK, |
|||
) |
|||
else: |
|||
# this shouldn't happen, but one never knows... |
|||
raise error.InternalServerError("Could not login user") |
|||
else: |
|||
raise error.Unauthorized("Password is incorrect") |
|||
@ -0,0 +1,23 @@ |
|||
from flask import blueprints |
|||
from flask import jsonify |
|||
from flask import make_response |
|||
from flask_api import status |
|||
from flask_login import login_required |
|||
from flask_login import logout_user |
|||
|
|||
bp = blueprints.Blueprint("logout", __name__) |
|||
|
|||
|
|||
@bp.route( |
|||
rule="/logout", |
|||
methods=["DELETE"], |
|||
) |
|||
@login_required |
|||
def logout(): |
|||
# Remove the user information from the session |
|||
logout_user() |
|||
|
|||
return make_response( |
|||
jsonify(message="logout successful"), |
|||
status.HTTP_200_OK, |
|||
) |
|||
@ -0,0 +1,52 @@ |
|||
from typing import Dict |
|||
from typing import Optional |
|||
from typing import Union |
|||
|
|||
from flask import Response |
|||
from flask import jsonify |
|||
from flask_api import status |
|||
|
|||
|
|||
class MonsunError(Exception): |
|||
status_code = status.HTTP_501_NOT_IMPLEMENTED |
|||
|
|||
data: Dict |
|||
message: Optional[str] |
|||
|
|||
def __init__( |
|||
self, |
|||
content: Union[str, Dict], |
|||
): |
|||
if isinstance(content, str): |
|||
self.message = content |
|||
self.data = {"message": self.message} |
|||
elif isinstance(content, Dict): |
|||
self.data = content |
|||
else: |
|||
raise ValueError("'content' must be either a string or an dict") |
|||
|
|||
@property |
|||
def response(self) -> Response: |
|||
response: Response = jsonify(self.data) |
|||
response.status_code = self.status_code |
|||
return response |
|||
|
|||
|
|||
class BadRequest(MonsunError): |
|||
status_code = status.HTTP_400_BAD_REQUEST |
|||
|
|||
|
|||
class Unauthorized(MonsunError): |
|||
status_code = status.HTTP_401_UNAUTHORIZED |
|||
|
|||
|
|||
class Forbidden(MonsunError): |
|||
status_code = status.HTTP_403_FORBIDDEN |
|||
|
|||
|
|||
class InternalServerError(MonsunError): |
|||
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR |
|||
|
|||
|
|||
class ConfigurationError(InternalServerError): |
|||
"""raised in case of invalid configuration""" |
|||
@ -0,0 +1,19 @@ |
|||
from flask_marshmallow import Marshmallow |
|||
from marshmallow import fields |
|||
|
|||
from .models import User |
|||
|
|||
ma = Marshmallow() |
|||
|
|||
|
|||
class UserSchema(ma.SQLAlchemySchema): # type: ignore |
|||
class Meta: |
|||
model = User |
|||
|
|||
id = fields.Int() |
|||
email = fields.Str() |
|||
active = fields.Bool() |
|||
|
|||
|
|||
user_schema = UserSchema() |
|||
user_schema_public = UserSchema(only=("id", "email")) |
|||
@ -0,0 +1,3 @@ |
|||
from .user_role import RoleType # noqa |
|||
from .user_role import User # noqa |
|||
from .user_role import user_datastore # noqa |
|||
@ -0,0 +1,152 @@ |
|||
from __future__ import annotations |
|||
|
|||
import enum |
|||
from typing import Dict |
|||
from typing import Optional |
|||
from typing import Sequence |
|||
|
|||
from flask_security import RoleMixin |
|||
from flask_security import SQLAlchemyUserDatastore |
|||
from flask_security import UserMixin |
|||
from werkzeug.security import check_password_hash |
|||
from werkzeug.security import generate_password_hash |
|||
|
|||
from ..database import BaseModel |
|||
from ..database import db |
|||
|
|||
roles_users = db.Table( |
|||
"roles_users", |
|||
db.Column("user_id", db.Integer(), db.ForeignKey("user.id")), |
|||
db.Column("role_id", db.Integer(), db.ForeignKey("role.id")), |
|||
) |
|||
|
|||
|
|||
class RoleType(enum.Enum): |
|||
"""The type of a role""" |
|||
|
|||
admin: int = 1 |
|||
|
|||
|
|||
ROLE_DESCRIPTIONS: Dict[RoleType, str] = { |
|||
RoleType.admin: "Has admin rights", |
|||
} |
|||
|
|||
|
|||
class Role(BaseModel, RoleMixin): |
|||
id: int # type: ignore |
|||
"""The DB id""" |
|||
|
|||
name: str = db.Column(db.String(80), unique=True) |
|||
"""The name of the role, see :class:`RoleType`""" |
|||
|
|||
description: str = db.Column(db.String(255)) |
|||
"""A short description of the role""" |
|||
|
|||
def __init__( |
|||
self, |
|||
name: str, |
|||
description: str, |
|||
): |
|||
""" |
|||
:param name: The name of the role, see :class:`RoleType` |
|||
:param description: A short description of the role |
|||
""" |
|||
self.name = name |
|||
self.description = description |
|||
|
|||
@classmethod |
|||
def get_role( |
|||
cls, |
|||
role_type: RoleType, |
|||
) -> Role: |
|||
"""Fetches the role from the DB or creates it if necessary |
|||
|
|||
:param role_type: The type of the role |
|||
:return: The role instance |
|||
""" |
|||
role_: Optional[Role] = cls.query.filter_by(name=role_type.name).first() |
|||
|
|||
if role_ is None: |
|||
role_ = cls( |
|||
name=role_type.name, |
|||
description=ROLE_DESCRIPTIONS[role_type], |
|||
) |
|||
role_.save() |
|||
|
|||
return role_ |
|||
|
|||
|
|||
class User(BaseModel, UserMixin): |
|||
id: int # type: ignore |
|||
"""The DB id""" |
|||
|
|||
email: str = db.Column(db.String(255), unique=True) |
|||
"""The email address""" |
|||
|
|||
password: str = db.Column(db.String(512)) |
|||
"""The password as hash""" |
|||
|
|||
active: bool = db.Column(db.Boolean()) |
|||
"""Is the user activated?""" |
|||
|
|||
roles: Sequence[Role] = db.relationship( |
|||
"Role", |
|||
secondary=roles_users, |
|||
backref=db.backref("users", lazy="dynamic"), |
|||
) |
|||
"""The roles which the user has""" |
|||
|
|||
def __init__( |
|||
self, |
|||
email: str, |
|||
password: str, |
|||
active: bool = True, |
|||
roles: Sequence[Role] = None, |
|||
): |
|||
""" |
|||
|
|||
:param email: The email address |
|||
:param password: The password as plain text |
|||
:param active: *True* if the user is active |
|||
:param roles: The Roles of the user. |
|||
""" |
|||
self.email = email |
|||
self.set_password(password=password) |
|||
self.active = active |
|||
self.roles = roles or [Role.get_role(RoleType.admin)] |
|||
|
|||
def set_password( |
|||
self, |
|||
password: str, |
|||
): |
|||
"""Create hashed password.""" |
|||
self.password = generate_password_hash( |
|||
password=password, |
|||
) |
|||
|
|||
def check_password( |
|||
self, |
|||
password: str, |
|||
) -> bool: |
|||
"""Check hashed password |
|||
|
|||
:param password: The password in plain text |
|||
:return: True if equal |
|||
""" |
|||
return bool( |
|||
check_password_hash( |
|||
pwhash=self.password, |
|||
password=password, |
|||
), |
|||
) |
|||
|
|||
def __repr__(self): |
|||
return ( |
|||
f"<{self.__class__.__name__} " |
|||
f"email={self.email!r}, " |
|||
f"active={self.active!r}, " |
|||
f"roles={self.roles!r}>" |
|||
) |
|||
|
|||
|
|||
user_datastore = SQLAlchemyUserDatastore(db, User, Role) |
|||
@ -0,0 +1,5 @@ |
|||
[pytest] |
|||
addopts = -s |
|||
|
|||
markers = |
|||
client: required client device (dongle) |
|||
@ -1,4 +1,13 @@ |
|||
dependency-injector[yaml]>=4.34.0,<5 |
|||
email_validator |
|||
flask-api>=3.0.post1,<4 |
|||
flask-migrate |
|||
flask-script |
|||
flask-sqlalchemy |
|||
flask_marshmallow |
|||
flask_security |
|||
marshmallow-sqlalchemy |
|||
psycopg2-binary |
|||
pyserial>=3.5,<4 |
|||
sqlalchemy-utils |
|||
uwsgi |
|||
|
|||
@ -0,0 +1,60 @@ |
|||
import os |
|||
from typing import Iterator |
|||
from typing import Type |
|||
|
|||
import pytest |
|||
import utilities |
|||
from flask.testing import FlaskClient |
|||
from monsun_backend.container import get_initialize_container |
|||
from sqlalchemy import create_engine |
|||
from sqlalchemy_utils import create_database |
|||
from sqlalchemy_utils import database_exists |
|||
from utilities import setup_app |
|||
|
|||
|
|||
@pytest.fixture(scope="session", autouse=True) |
|||
def db_test(): |
|||
"""makes sure that the DB exists""" |
|||
engine = create_engine(os.getenv("DATABASE_URI")) |
|||
if not database_exists(engine.url): |
|||
create_database(engine.url) |
|||
|
|||
|
|||
@pytest.fixture(scope="class") |
|||
def app_class_scope(db_test): |
|||
with setup_app() as app: |
|||
yield app |
|||
|
|||
|
|||
@pytest.fixture(scope="class") |
|||
def client_class_scope(app_class_scope): |
|||
return app_class_scope.test_client |
|||
|
|||
|
|||
@pytest.fixture() |
|||
def app(db_test): |
|||
with setup_app() as app: |
|||
yield app |
|||
|
|||
|
|||
@pytest.fixture() |
|||
def client(app): |
|||
return app.test_client |
|||
|
|||
|
|||
@pytest.fixture(scope="class") |
|||
def as_admin( |
|||
client_class_scope: Type[FlaskClient], |
|||
) -> Iterator[utilities.TestUser]: |
|||
container = get_initialize_container() |
|||
with client_class_scope() as client_: |
|||
yield utilities.TestUser( |
|||
client_.post( |
|||
"/login", |
|||
data={ |
|||
"email": container.config.admin_user_email(), |
|||
"password": container.config.admin_user_password(), |
|||
}, |
|||
), |
|||
client_, |
|||
) |
|||
@ -0,0 +1,50 @@ |
|||
from flask import Response |
|||
from flask_api import status |
|||
from monsun_backend.container import get_initialize_container |
|||
from utilities import TestUser |
|||
|
|||
|
|||
def test_login_as_admin_then_get_status_ok(client): |
|||
container = get_initialize_container() |
|||
|
|||
response: Response = client().post( |
|||
"/login", |
|||
data={ |
|||
"email": container.config.admin_user_email(), |
|||
"password": container.config.admin_user_password(), |
|||
}, |
|||
) |
|||
|
|||
assert response.status_code == status.HTTP_200_OK |
|||
|
|||
|
|||
def test_logout_as_not_logged_in_user_then_get_status_found(client): |
|||
response: Response = client().get( |
|||
"/logout", |
|||
) |
|||
assert response.status_code == status.HTTP_302_FOUND |
|||
|
|||
|
|||
def test_logout_as_admin_then_get_status_found(as_admin: TestUser): |
|||
response: Response = as_admin.client.get( |
|||
"/logout", |
|||
) |
|||
assert response.status_code == status.HTTP_302_FOUND |
|||
|
|||
|
|||
def test_as_admin_access_admin_endpoint_then_get_status_ok(as_admin: TestUser): |
|||
response: Response = as_admin.client.get( |
|||
"/admin", |
|||
) |
|||
|
|||
assert response.status_code == status.HTTP_200_OK |
|||
|
|||
|
|||
def test_as_not_logged_in_user_access_admin_endpoint_then_get_status_forbidden( |
|||
client, |
|||
): # noqa |
|||
response: Response = client().get( |
|||
"/admin", |
|||
) |
|||
|
|||
assert response.status_code == status.HTTP_403_FORBIDDEN |
|||
@ -0,0 +1 @@ |
|||
pytest |
|||
@ -0,0 +1,40 @@ |
|||
from contextlib import contextmanager |
|||
from dataclasses import dataclass |
|||
from pathlib import Path |
|||
from typing import Iterator |
|||
|
|||
from flask import Flask |
|||
from flask import Response |
|||
from flask.testing import FlaskClient |
|||
from monsun_backend import access |
|||
from monsun_backend import create_app |
|||
from monsun_backend import database |
|||
|
|||
TESTS_DIR = Path(__file__).parent |
|||
|
|||
|
|||
@contextmanager |
|||
def setup_app() -> Iterator[Flask]: |
|||
app: Flask = create_app() |
|||
app.config["SERVER_NAME"] = "monsun_backend.localdomain" |
|||
app.config["TESTING"] = True |
|||
|
|||
# binds the app to the current context |
|||
with app.app_context(): |
|||
# create all tables |
|||
database.db.create_all() |
|||
|
|||
access.make_admin_user() |
|||
|
|||
yield app |
|||
with app.app_context(): |
|||
# drop all tables |
|||
database.db.session.remove() |
|||
database.db.drop_all() |
|||
|
|||
|
|||
@dataclass |
|||
class TestUser: |
|||
__test__ = False |
|||
response: Response |
|||
client: FlaskClient |
|||
Loading…
Reference in new issue