1
0
Fork 0
mirror of https://codeberg.org/Mo8it/AdvLabDB.git synced 2024-11-08 21:21:06 +00:00
AdvLabDB/advlabdb/customClasses.py

345 lines
11 KiB
Python

from flask import flash, redirect, request, url_for
from flask_admin import AdminIndexView, BaseView, expose
from flask_admin.contrib.sqla import ModelView
from flask_admin.helpers import get_form_data
from flask_admin.model.helpers import get_mdict_item_or_list
from flask_security import current_user
from sqlalchemy import and_, select
from . import db
from .exceptions import DataBaseException, ModelViewException
from .model_independent_funs import get_count, reportBadAttempt
from .models import (
Assistant,
ExperimentMark,
GroupExperiment,
Part,
PartStudent,
SemesterExperiment,
)
def adminViewIsAccessible():
return current_user.has_role("admin")
def assistantViewIsAccessible():
return current_user.has_role("assistant")
def get_url(kwargs):
url = kwargs["url"]
if "/" in url:
raise ModelViewException("url can not contain a slash!")
return url
class CustomIndexView(AdminIndexView):
def inaccessible_callback(self, name, **kwargs):
# Redirect to login page if user doesn't have access
return redirect(url_for("security.login", next=request.url))
class SecureAdminIndexView(CustomIndexView):
def is_accessible(self):
return adminViewIsAccessible()
@expose("/")
def index(self):
active_semester_experiment_mark_ids_stmt = (
select(ExperimentMark.final_experiment_mark)
.join(PartStudent)
.join(Part)
.where(Part.semester == current_user.active_semester)
)
number_of_all_experiment_marks = get_count(active_semester_experiment_mark_ids_stmt)
number_of_missing_final_experiment_marks = get_count(
active_semester_experiment_mark_ids_stmt.where(ExperimentMark.final_experiment_mark == None)
)
return self.render(
"admin_index.html",
number_of_missing_final_experiment_marks=number_of_missing_final_experiment_marks,
number_of_all_experiment_marks=number_of_all_experiment_marks,
)
class SecureAssistantIndexView(CustomIndexView):
def is_accessible(self):
return assistantViewIsAccessible()
@expose("/")
def index(self):
active_semester_experiment_mark_ids_stmt = (
select(ExperimentMark.final_experiment_mark)
.join(GroupExperiment)
.join(SemesterExperiment)
.where(SemesterExperiment.semester == current_user.active_semester)
.join(SemesterExperiment.assistants)
.where(Assistant.user == current_user)
)
number_of_all_experiment_marks = get_count(active_semester_experiment_mark_ids_stmt)
number_of_missing_final_experiment_marks = get_count(
active_semester_experiment_mark_ids_stmt.where(ExperimentMark.final_experiment_mark == None)
)
return self.render(
"assistant_index.html",
number_of_missing_final_experiment_marks=number_of_missing_final_experiment_marks,
number_of_all_experiment_marks=number_of_all_experiment_marks,
)
class CustomModelView(ModelView):
create_modal = True
edit_modal = True
details_modal = True
can_view_details = False
refreshFiltersCache = False
# Used in the UserView because of create_user
# Should not be touched in other views
_skip_session_addition_on_model_creation = False
@expose("/")
def index_view(self):
if self.refreshFiltersCache:
# Update filter options
self._refresh_filters_cache()
return super().index_view()
def inaccessible_callback(self, name, **kwargs):
# Redirect to login page if user doesn't have access
return redirect(url_for("security.login", next=request.url))
def query_modifier(self, query):
return query
def get_query(self):
return self.query_modifier(super().get_query())
def get_count_query(self):
return self.query_modifier(super().get_count_query())
def handle_view_exception(self, exc):
if type(exc) in (ModelViewException, DataBaseException):
flash(str(exc), "error")
return True
return super().handle_view_exception(exc)
def customCreateModel(self, form):
model = self.build_new_instance()
form.populate_obj(model)
return model
def create_model(self, form):
try:
model = self.customCreateModel(form)
if not self._skip_session_addition_on_model_creation:
self.session.add(model)
self.on_model_change(form, model, True)
self.session.commit()
except Exception as ex:
if not self.handle_view_exception(ex):
flash(str(ex), "error")
self.session.rollback()
else:
self.after_model_change(form, model, True)
return model
def customUpdateModel(self, form, model):
"""
Retrun True if something changed during update, False otherwise.
"""
form.populate_obj(model)
# No way to know if something changed. Therefore, return True anyway.
return True
def update_model(self, form, model):
try:
if self.customUpdateModel(form, model) is False:
# Nothing changed
return True
self.on_model_change(form, model, False)
self.session.commit()
except Exception as ex:
if not self.handle_view_exception(ex):
flash(str(ex), "error")
self.session.rollback()
return False
else:
self.after_model_change(form, model, False)
return True
def create_form(self, obj=None):
if hasattr(self, "CreateForm"):
formClass = self.CreateForm
elif hasattr(self, "CreateAndEditForm"):
formClass = self.CreateAndEditForm
else:
return super().create_form(obj)
return formClass(get_form_data(), obj=obj)
def edit_form(self, obj=None):
if hasattr(self, "EditForm"):
formClass = self.EditForm
elif hasattr(self, "CreateAndEditForm"):
formClass = self.CreateAndEditForm
else:
return super().edit_form(obj)
return formClass(get_form_data(), obj=obj)
class SecureAdminModelView(CustomModelView):
can_export = True
can_set_page_size = True
can_create = True
can_edit = True
can_delete = True
column_display_actions = True
can_view_details = True
list_template = "admin_list.html"
create_template = "admin_create.html"
edit_template = "admin_edit.html"
details_template = "admin_details.html"
def __init__(self, model, **kwargs):
url = get_url(kwargs)
super().__init__(model, db.session, endpoint="admin_" + url, **kwargs)
def is_accessible(self):
return adminViewIsAccessible()
class SecureAssistantModelView(CustomModelView):
can_export = False
can_set_page_size = False
can_edit = False
column_display_actions = False
list_template = "assistant_list.html"
create_template = "assistant_create.html"
edit_template = "assistant_edit.html"
details_template = "assistant_details.html"
"""
SECURITY NOTES:
- Every variable and method defined below in this class is NOT ALLOWED TO BE (completely) OVERWRITTEN!
You can only extend the predefined methods.
- The method query_modifier(self, query) has to be implemented!
"""
# Assistants are not allowed to create or delete.
can_create = False
can_delete = False
def __init__(self, model, **kwargs):
url = get_url(kwargs)
super().__init__(model, db.session, endpoint="assistant_" + url, **kwargs)
def is_accessible(self):
return assistantViewIsAccessible()
def query_modifier(self, query):
"""
A default query modifier has to be implemented to restrict assistant's read/write access.
See on_model_change!
"""
raise NotImplementedError()
def on_model_change(self, form, model, is_created):
"""
This method uses the modified query returned by query_modifier (which has to be implemented!) to prevent assistants
from modifying models not listed on their view by sending a POST request with a different id.
You can extend this method by implementing a custom on_model_change and then calling super().on_model_change within it.
"""
if is_created:
reportBadAttempt("An assistant tried to create a model!")
raise ModelViewException("Assistants can not create models!")
if model not in self.get_query():
reportBadAttempt("An assistant tried to change a model not in his filter!")
raise ModelViewException("Unauthorized action!")
def on_model_delete(self, model):
reportBadAttempt("An assistant tried to delete a model!")
raise ModelViewException("Assistants can not delete models!")
@expose("/edit/", methods=("GET", "POST"))
def edit_view(self):
"""
Prevent an assistant from seeing the edit form of a model not in his filter by using the GET method and entering an id.
This is important if can_edit is set to True.
"""
id = get_mdict_item_or_list(request.args, "id")
if id is not None:
model = self.get_one(id)
if model not in self.get_query():
reportBadAttempt("An assistant tried to edit a model not in his filter!")
raise ModelViewException("Unauthorized action!")
return super().edit_view()
@expose("/details/")
def details_view(self):
"""
Prevent an assistant from seeing the details of a model not in his filter by using the GET method and entering an id.
This is important if can_view_details is set to True.
"""
id = get_mdict_item_or_list(request.args, "id")
if id is not None:
model = self.get_one(id)
if model not in self.get_query():
reportBadAttempt("An assistant tried to see details of a model not in his filter!")
raise ModelViewException("Unauthorized action!")
return super().details_view()
def get_details_columns(self):
"""
Prevent showing unintended data if can_view_details is set to True and column_details_list was not set (which equals None).
"""
if not self.column_details_list:
self.column_details_list = self.column_list
return super().get_details_columns()
class SecureAdminBaseView(BaseView):
def __init__(self, **kwargs):
url = get_url(kwargs)
super().__init__(endpoint="admin_" + url, **kwargs)
def is_accessible(self):
return adminViewIsAccessible()
class SecureAssistantBaseView(BaseView):
def __init__(self, **kwargs):
url = get_url(kwargs)
super().__init__(endpoint="assistant_" + url, **kwargs)
def is_accessible(self):
return assistantViewIsAccessible()