from flask import flash, request, url_for from flask_admin.contrib.sqla.filters import BaseSQLAFilter from flask_admin.menu import MenuLink from flask_admin.model.template import EndpointLinkRowAction from flask_security import current_user, hash_password from sqlalchemy import func from wtforms import Form, BooleanField, SelectField, TextField, RadioField, FloatField from wtforms.validators import DataRequired, Email, Optional from flask_admin.contrib.sqla.fields import QuerySelectMultipleField, QuerySelectField from flask_admin.helpers import get_form_data from wtforms.fields.html5 import DateField from advlabdb import admin, app, db, user_datastore from advlabdb.configUtils import getConfig from advlabdb.customClasses import SecureModelView from advlabdb.models import ( Appointment, Assistant, Experiment, ExperimentMark, Group, GroupExperiment, Part, SemesterExperiment, PartStudent, Role, Semester, Student, User, ) from advlabdb.utils import ( partFromLabelInUserActiveSemester, randomPassword, setUserActiveSemester, userActiveSemester, ) class UserView(SecureModelView): column_list = ["email", "active", "roles", "assistant"] column_searchable_list = ["email"] column_filters = ["active"] form_columns = ["email", "active", "roles"] column_editable_list = ["active"] form_args = { "email": {"validators": [Email()]}, "active": {"default": True}, "roles": {"validators": [DataRequired(message="A role is required!")]}, } deleteSelfException = "Tried to delete yourself as user!" deactivateSelfException = "Tried to deactiavte yourself as user!" def create_model(self, form): password = randomPassword() passwordHash = hash_password(password) email = form.email.data.lower() roles = [role.name for role in form.roles.data] if "admin" in roles: flash("You have registered a new admin!", "danger") try: model = user_datastore.create_user(email=email, password=passwordHash, roles=roles) self.on_model_change(form, model, True) self.session.commit() except Exception as ex: flash(ex, "error") self.session.rollback() else: flash( f"{email} registered with roles: {', '.join([role.name for role in form.roles.data])}.", category="success", ) flash(f"Random password: {password}", category="warning") return model def on_model_delete(self, model): if model == current_user: raise Exception(self.deleteSelfException) def on_model_change(self, form, model, is_created): if model == current_user and not form.active.data: raise Exception(self.deactivateSelfException) def handle_view_exception(self, exc): if exc.args[0] in (self.deleteSelfException, self.deactivateSelfException): pass else: return super().handle_view_exception(exc) class RoleView(SecureModelView): can_create = False can_edit = False can_delete = False column_display_actions = False column_list = ["name", "description"] class SemesterView(SecureModelView): can_edit = False can_delete = False column_display_actions = False column_list = ["label", "parts"] form_columns = ["semester_label", "year", "transfer_parts", "transfer_assistants"] semesterLabels = ["WS", "SS"] form_extra_fields = { "semester_label": RadioField( "Semester", choices=list(zip(semesterLabels, semesterLabels)), validators=[DataRequired()] ), "year": TextField("Year", validators=[DataRequired()]), "transfer_parts": BooleanField( "Transfer parts", description="This option transfers the parts you have in your current active semester. Make sure that your semester is the last semester before creating a new one (recommended)!", default=True, ), "transfer_assistants": BooleanField( "Transfer Assistants", description="This option transfers assistants of your active semester to active experiments in the new semester. Make sure that your semester is the last semester before creating a new one (recommended)! Active experiments are transfered anyway. If you don't want an experiment to be transfered, set it to inactive before creating the new semester. Experiments which are switched to active before creating the new semester will be created in the new semester without assistants. It is important to check the assistants of all experiments after creating a new semester.", default=True, ), } def create_model(self, form): try: model = Semester(label=form.semester_label.data + form.year.data) self.session.add(model) self.on_model_change(form, model, True) self.session.commit() except Exception as ex: flash(ex, "error") self.session.rollback() else: self.after_model_change(form, model, True) return model def after_model_change(self, form, model, is_created): admin.add_link( MenuLink( name=model.label, url=url_for("set_semester") + "?semester_id=" + str(model.id), category="Active semester", ) ) oldSemester = userActiveSemester() setUserActiveSemester(model.id) try: if form.transfer_parts.data: model.transferPartsFrom(oldSemester) oldSemesterExperiments = [ semesterExperiment.experiment for semesterExperiment in oldSemester.semester_experiments ] for experiment in Experiment.query.filter(Experiment.active == True): newSemesterExperiment = SemesterExperiment(experiment=experiment, semester=model) if form.transfer_assistants.data: for oldSemesterExperiment in oldSemester.semester_experiments: if oldSemesterExperiment.experiment == experiment: newSemesterExperiment.assistants = oldSemesterExperiment.assistants self.session.add(newSemesterExperiment) self.session.commit() except Exception as ex: flash(ex, "error") self.session.rollback() class PartView(SecureModelView): can_view_details = True column_details_list = ["label", "semester", "part_students", "groups"] form_columns = ["label", "semester"] def get_query(self): return super().get_query().filter(Part.id.in_([part.id for part in userActiveSemester().parts])) def get_count_query(self): return ( self.session.query(func.count("*")) .select_from(self.model) .filter(Part.id.in_([part.id for part in userActiveSemester().parts])) ) class StudentView(SecureModelView): can_view_details = True column_list = ["student_number", "first_name", "last_name", "uni_email", "contact_email", "part_students"] column_details_list = column_list + ["bachelor_thesis", "bachelor_thesis_work_group", "note"] column_sortable_list = ["student_number", "first_name", "last_name"] column_searchable_list = column_sortable_list + ["uni_email", "contact_email"] form_excluded_columns = ["part_students"] form_args = { "uni_email": {"validators": [Email()]}, "contact_email": {"validators": [Email()]}, } column_extra_row_actions = [ EndpointLinkRowAction( "glyphicon glyphicon-time", id_arg="flt1_0", title="Experiments history", endpoint="experimentmark.index_view", ) ] def partQueryFactory(): return Part.query.filter(Part.id.in_([part.id for part in userActiveSemester().parts])) def groupQueryFactory(): return Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts])) class PartStudentView(SecureModelView): class CreateForm(Form): def studentQueryFactory(): return Student.query student = QuerySelectField( "Student", query_factory=studentQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-" ) part = QuerySelectField( "Part", query_factory=partQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-" ) group = QuerySelectField("Group", query_factory=groupQueryFactory, allow_blank=True, blank_text="-") class EditForm(CreateForm): student = None part = None final_part_mark = FloatField("Final Part Mark", validators=[Optional()]) form = EditForm column_filters = ["part", "student", "group"] partGroupPartMismatchException = "Part and groups part don't match!" def create_form(self, obj=None): form = self.CreateForm return form(get_form_data(), obj=obj) def on_model_change(self, form, model, is_created): if model.group and model.part != model.group.part: raise Exception(self.partGroupPartMismatchException) def handle_view_exception(self, exc): if exc.args[0] in (self.partGroupPartMismatchException): pass else: return super().handle_view_exception(exc) def get_query(self): return super().get_query().filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts])) def get_count_query(self): return ( self.session.query(func.count("*")) .select_from(self.model) .filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts])) ) class GroupView(SecureModelView): class CreateForm(Form): def partStudentsQueryFactory(): return PartStudent.query.filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts])) part = QuerySelectField( "Part", query_factory=partQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-" ) part_students = QuerySelectMultipleField("Part Students", query_factory=partStudentsQueryFactory) class EditForm(CreateForm): part = None form = EditForm column_list = ["number", "part", "part_students", "group_experiments"] column_filters = ["number", "part"] partStudentPartPartMismatchException = "Part and StudentParts part don't match!" def create_model(self, form): try: orderedPartGroups = Group.query.filter(Group.part == form.part.data).order_by(Group.number) lastTakenGroupNumber = orderedPartGroups[-1].number if orderedPartGroups.count() > 0 else 0 model = Group( number=lastTakenGroupNumber + 1, part_students=form.part_students.data, part=form.part.data, ) self.session.add(model) self.on_model_change(form, model, True) self.session.commit() except Exception as ex: flash(ex, "error") self.session.rollback() else: self.after_model_change(form, model, True) return model def on_model_change(self, form, model, is_created): for partStudent in model.part_students: if model.part != partStudent.part: raise Exception(self.partStudentPartPartMismatchException) def handle_view_exception(self, exc): if exc.args[0] in (self.partStudentPartPartMismatchException): pass else: return super().handle_view_exception(exc) def create_form(self, obj=None): form = self.CreateForm return form(get_form_data(), obj=obj) def get_query(self): return super().get_query().filter(Group.part_id.in_([part.id for part in userActiveSemester().parts])) def get_count_query(self): return ( self.session.query(func.count("*")) .select_from(self.model) .filter(Group.part_id.in_([part.id for part in userActiveSemester().parts])) ) class ExperimentView(SecureModelView): can_view_details = True column_filters = ["active"] column_list = ["number", "name", "active"] column_editable_list = ["active"] class SemesterExperimentView(SecureModelView): column_list = ["experiment", "semester", "assistants"] def get_query(self): return super().get_query().filter(SemesterExperiment.semester == userActiveSemester()) def get_count_query(self): return ( self.session.query(func.count("*")) .select_from(self.model) .filter(SemesterExperiment.semester == userActiveSemester()) ) class AssistantView(SecureModelView): can_view_details = True column_list = ["first_name", "last_name", "email", "user", "semester_experiments"] column_details_list = column_list + [ "phone_number", "mobile_phone_number", "room", "building", "appointments", "experiment_marks", ] column_filters = ["user.active"] form_excluded_columns = ["experiment_marks"] class GroupExperimentView(SecureModelView): class CreateForm(Form): def semesterExperimentQueryFactory(): return SemesterExperiment.query.filter(SemesterExperiment.semester == userActiveSemester()) def assistantQueryFactory(): return Assistant.query.filter( Assistant.user_id.in_([user.id for user in User.query.filter(User.active == True)]) ) group = QuerySelectField( "Group", query_factory=groupQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-" ) semester_experiment = QuerySelectField( "Semester Experiment", query_factory=semesterExperimentQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-", ) assistantBlankText = "Auto assign if experiment has only one assistant" appointment1_date = DateField("Appointment-1 Date", validators=[Optional()]) appointment1_special = BooleanField("Appointment-1 special", default=False) appointment1_assistant = QuerySelectField( "Appointment-1 Assistant", query_factory=assistantQueryFactory, allow_blank=True, blank_text=assistantBlankText, ) appointment2_date = DateField("Appointment-2 Date", validators=[Optional()]) appointment2_special = BooleanField("Appointment-2 special", default=False) appointment2_assistant = QuerySelectField( "Appointment-2 Assistant", query_factory=assistantQueryFactory, allow_blank=True, blank_text=assistantBlankText, ) form = CreateForm can_edit = False column_list = ["group", "semester_experiment", "appointments", "experiment_marks"] column_filters = ["group", "semester_experiment.experiment", "appointments"] def create_model(self, form): try: model = GroupExperiment.checkAndInit( semester_experiment=form.semester_experiment.data, group=form.group.data ) self.session.add(model) for appointmentDate, special, assistant in zip( [form.appointment1_date.data, form.appointment2_date.data], [form.appointment1_special.data, form.appointment2_special.data], [form.appointment1_assistant.data, form.appointment2_assistant.data], ): if appointmentDate: if assistant: if assistant not in form.semester_experiment.data.assistants: raise Exception(f"{assistant} not responsible for {form.semester_experiment.data}!") else: if len(form.semester_experiment.data.assistants) != 1: raise Exception( f"Experiment {form.semester_experiment.data} has more than one assistant. You have to assign one of these assistants: {form.semester_experiment.data.assistants}" ) assistant = form.semester_experiment.data.assistants[0] appointment = Appointment( date=appointmentDate, special=special, group_experiment=model, assistant=assistant, ) self.session.add(appointment) self.on_model_change(form, model, True) self.session.commit() except Exception as ex: flash(ex, "error") self.session.rollback() else: if model.appointments: flash(f"Appointments {model.appointments} added.", "success") self.after_model_change(form, model, True) return model def get_query(self): return ( super() .get_query() .filter( GroupExperiment.group_id.in_( [ g.id for g in Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts])) ] ) ) ) def get_count_query(self): return ( self.session.query(func.count("*")) .select_from(self.model) .filter( GroupExperiment.group_id.in_( [ g.id for g in Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts])) ] ) ) ) class AppointmentView(SecureModelView): column_list = ["date", "special", "group_experiment", "assistant"] class ExperimentMarkView(SecureModelView): class StudentIdFilter(BaseSQLAFilter): def apply(self, query, value, alias=None): return query.filter(self.column == value) def operation(self): return "equals" def validate(self, value): if Student.query.get(value): return True else: return False column_filters = [ StudentIdFilter(PartStudent.id, "Student / ID"), "part_student.student", "group_experiment.semester_experiment.semester", "group_experiment.semester_experiment.experiment", "assistant", ] admin.add_view(StudentView(Student, db.session)) admin.add_view(PartStudentView(PartStudent, db.session)) admin.add_view(GroupView(Group, db.session)) admin.add_view(GroupExperimentView(GroupExperiment, db.session)) admin.add_view(AppointmentView(Appointment, db.session)) admin.add_view(ExperimentMarkView(ExperimentMark, db.session)) admin.add_view(ExperimentView(Experiment, db.session)) admin.add_view(SemesterExperimentView(SemesterExperiment, db.session)) admin.add_view(AssistantView(Assistant, db.session)) admin.add_view(PartView(Part, db.session)) admin.add_view(SemesterView(Semester, db.session)) admin.add_view(UserView(User, db.session)) admin.add_view(RoleView(Role, db.session)) with app.app_context(): semesters = Semester.query.order_by(Semester.id) for semester in semesters: admin.add_link( MenuLink( name=semester.label, url=url_for("set_semester") + "?semester_id=" + str(semester.id), category="Active semester", ) ) admin.add_link(MenuLink(name="Logout", url=url_for("security.logout")))