from flask import flash, redirect, request, url_for from flask_admin import AdminIndexView, BaseView, expose from flask_admin.contrib.sqla import ModelView from flask_admin.helpers import get_form_data from flask_admin.model.helpers import get_mdict_item_or_list from flask_login import current_user from sqlalchemy import func, select from .exceptions import DatabaseException, ModelViewException from .model_independent_funs import reportBadAttempt from .models import Assistant, ExperimentMark, GroupExperiment, SemesterExperiment, db def adminViewIsAccessible(): return current_user.has_role("admin") def assistantViewIsAccessible(): return current_user.has_role("assistant") def get_url(kwargs): url = kwargs["url"] if "/" in url: raise ModelViewException("url can not contain a slash!") return url class CustomIndexView(AdminIndexView): def inaccessible_callback(self, name, **kwargs): # Redirect to login page if user doesn't have access return redirect(url_for("security.login", next=request.url)) class SecureAdminIndexView(CustomIndexView): def is_accessible(self): return adminViewIsAccessible() @expose("/") def index(self): assistants_num_missing = db.session.execute( select(Assistant, func.count()) .join(Assistant.semester_experiments) .where(SemesterExperiment.semester == current_user.active_semester) .join(SemesterExperiment.group_experiments) .where(GroupExperiment.experiment_marks_missing is True) .join(GroupExperiment.experiment_marks) .where(ExperimentMark.final_experiment_mark is None) .group_by(Assistant.id) .order_by(func.count().desc()) ) return self.render( "admin_index.jinja.html", assistants_num_missing=assistants_num_missing, ) class SecureAssistantIndexView(CustomIndexView): def is_accessible(self): return assistantViewIsAccessible() @expose("/") def index(self): number_of_missing_final_experiment_marks = db.session.scalar( select(func.count()) .select_from(ExperimentMark) .join(GroupExperiment) .join(SemesterExperiment) .where(SemesterExperiment.semester == current_user.active_semester) .join(SemesterExperiment.assistants) .where(Assistant.user == current_user) .where(ExperimentMark.final_experiment_mark is None) ) return self.render( "assistant_index.jinja.html", number_of_missing_final_experiment_marks=number_of_missing_final_experiment_marks, ) class CustomModelView(ModelView): create_modal = True edit_modal = True details_modal = True can_view_details = False refreshFiltersCache = False # Should not be a copy of column_formatters # because of link formatting. column_formatters_export: dict = {} # Used in the UserView because of create_user # Should not be touched in other views _skip_session_addition_on_model_creation = False @expose("/") def index_view(self): if self.refreshFiltersCache: # Update filter options self._refresh_filters_cache() return super().index_view() def inaccessible_callback(self, name, **kwargs): # Redirect to login page if user doesn't have access return redirect(url_for("security.login", next=request.url)) def query_modifier(self, query): return query def get_query(self): return self.query_modifier(super().get_query()) def get_count_query(self): return self.query_modifier(super().get_count_query()) def handle_view_exception(self, exc): if type(exc) in (ModelViewException, DatabaseException): flash(str(exc), "error") return True return super().handle_view_exception(exc) def customCreateModel(self, form): model = self.build_new_instance() form.populate_obj(model) return model def create_model(self, form): try: model = self.customCreateModel(form) if not self._skip_session_addition_on_model_creation: self.session.add(model) self.on_model_change(form, model, True) self.session.commit() except Exception as ex: if not self.handle_view_exception(ex): flash(str(ex), "error") self.session.rollback() else: self.after_model_change(form, model, True) return model def customUpdateModel(self, form, model): """ Return True if something changed during update, False otherwise. """ form.populate_obj(model) # No way to know if something changed. Therefore, return True anyway. return True def update_model(self, form, model): try: if self.customUpdateModel(form, model) is False: # Nothing changed return True self.on_model_change(form, model, False) self.session.commit() except Exception as ex: if not self.handle_view_exception(ex): flash(str(ex), "error") self.session.rollback() return False else: self.after_model_change(form, model, False) return True def create_form(self, obj=None): if hasattr(self, "CreateForm"): formClass = self.CreateForm elif hasattr(self, "CreateAndEditForm"): formClass = self.CreateAndEditForm else: return super().create_form(obj) return formClass(get_form_data(), obj=obj) def edit_form(self, obj=None): if hasattr(self, "EditForm"): formClass = self.EditForm elif hasattr(self, "CreateAndEditForm"): formClass = self.CreateAndEditForm else: return super().edit_form(obj) return formClass(get_form_data(), obj=obj) def get_export_columns(self): # Use column_details_list instead of column_list only_columns = self.column_export_list or self.column_details_list or self.scaffold_list_columns() return self.get_column_names( only_columns=only_columns, excluded_columns=self.column_export_exclude_list, ) class SecureAdminModelView(CustomModelView): can_export = True can_set_page_size = True can_create = True can_edit = True can_delete = True column_display_actions = True can_view_details = True list_template = "admin_list.jinja.html" create_template = "admin_create.jinja.html" edit_template = "admin_edit.jinja.html" details_template = "admin_details.jinja.html" def __init__(self, model, **kwargs): url = get_url(kwargs) super().__init__(model, db.session, endpoint="admin_" + url, category="Tables", **kwargs) def is_accessible(self): return adminViewIsAccessible() class SecureAssistantModelView(CustomModelView): can_export = False can_set_page_size = False can_edit = False column_display_actions = False list_template = "assistant_list.jinja.html" create_template = "assistant_create.jinja.html" edit_template = "assistant_edit.jinja.html" details_template = "assistant_details.jinja.html" """ SECURITY NOTES: - Every variable and method defined below in this class is NOT ALLOWED TO BE (completely) OVERWRITTEN! You can only extend the predefined methods. - The method query_modifier(self, query) has to be implemented! """ # Assistants are not allowed to create or delete. can_create = False can_delete = False def __init__(self, model, **kwargs): url = get_url(kwargs) super().__init__(model, db.session, endpoint="assistant_" + url, **kwargs) def is_accessible(self): return assistantViewIsAccessible() def query_modifier(self, query): """ A default query modifier has to be implemented to restrict assistant's read/write access. See on_model_change! """ raise NotImplementedError() def on_model_change(self, form, model, is_created): """ This method uses the modified query returned by query_modifier (which has to be implemented!) to prevent assistants from modifying models not listed on their view by sending a POST request with a different id. You can extend this method by implementing a custom on_model_change and then calling super().on_model_change within it. """ if is_created: reportBadAttempt("An assistant tried to create a model!") raise ModelViewException("Assistants can not create models!") if model not in self.get_query(): reportBadAttempt("An assistant tried to change a model not in his filter!") self.handle_view_exception(ModelViewException("Unauthorized action!")) return redirect(self.url) return None def on_model_delete(self, model): reportBadAttempt("An assistant tried to delete a model!") raise ModelViewException("Assistants can not delete models!") @expose("/edit/", methods=("GET", "POST")) def edit_view(self): """ Prevent an assistant from seeing the edit form of a model not in his filter by using the GET method and entering an id. This is important if can_edit is set to True. """ id = get_mdict_item_or_list(request.args, "id") if id is not None: model = self.get_one(id) if model not in self.get_query(): reportBadAttempt("An assistant tried to edit a model not in his filter!") self.handle_view_exception(ModelViewException("Unauthorized action!")) return redirect(self.url) return super().edit_view() @expose("/details/") def details_view(self): """ Prevent an assistant from seeing the details of a model not in his filter by using the GET method and entering an id. This is important if can_view_details is set to True. """ id = get_mdict_item_or_list(request.args, "id") if id is not None: model = self.get_one(id) if model not in self.get_query(): reportBadAttempt("An assistant tried to see details of a model not in his filter!") self.handle_view_exception(ModelViewException("Unauthorized action!")) return redirect(self.url) return super().details_view() def get_details_columns(self): """ Prevent showing unintended data if can_view_details is set to True and column_details_list was not set (which equals None). """ if not self.column_details_list: self.column_details_list = self.column_list return super().get_details_columns() class CustomBaseView(BaseView): def inaccessible_callback(self, name, **kwargs): # Redirect to login page if user doesn't have access return redirect(url_for("security.login", next=request.url)) class SecureAdminBaseView(CustomBaseView): def __init__(self, **kwargs): url = get_url(kwargs) super().__init__(endpoint="admin_" + url, **kwargs) def is_accessible(self): return adminViewIsAccessible() class SecureAssistantBaseView(CustomBaseView): def __init__(self, **kwargs): url = get_url(kwargs) super().__init__(endpoint="assistant_" + url, **kwargs) def is_accessible(self): return assistantViewIsAccessible()