1
0
Fork 0
mirror of https://codeberg.org/Mo8it/AdvLabDB.git synced 2024-11-08 21:21:06 +00:00
AdvLabDB/advlabdb/modelViews.py

575 lines
20 KiB
Python

from flask import flash, request, url_for
from flask_admin.contrib.sqla.filters import BaseSQLAFilter
from flask_admin.menu import MenuLink
from flask_admin.model.template import EndpointLinkRowAction
from flask_security import current_user, hash_password
from sqlalchemy import func
from wtforms import Form, BooleanField, SelectField, TextField, RadioField, FloatField
from wtforms.validators import DataRequired, Email, Optional
from flask_admin.contrib.sqla.fields import QuerySelectMultipleField, QuerySelectField
from flask_admin.helpers import get_form_data
from wtforms.fields.html5 import DateField
from advlabdb import admin, app, db, user_datastore
from advlabdb.configUtils import getConfig
from advlabdb.customClasses import SecureModelView
from advlabdb.models import (
Appointment,
Assistant,
Experiment,
ExperimentMark,
Group,
GroupExperiment,
Part,
SemesterExperiment,
PartStudent,
Role,
Semester,
Student,
User,
)
from advlabdb.utils import (
partFromLabelInUserActiveSemester,
randomPassword,
setUserActiveSemester,
userActiveSemester,
)
class UserView(SecureModelView):
column_list = ["email", "active", "roles", "assistant"]
column_searchable_list = ["email"]
column_filters = ["active"]
form_columns = ["email", "active", "roles"]
column_editable_list = ["active"]
form_args = {
"email": {"validators": [Email()]},
"active": {"default": True},
"roles": {"validators": [DataRequired(message="A role is required!")]},
}
deleteSelfException = "Tried to delete yourself as user!"
deactivateSelfException = "Tried to deactiavte yourself as user!"
def create_model(self, form):
password = randomPassword()
passwordHash = hash_password(password)
email = form.email.data.lower()
roles = [role.name for role in form.roles.data]
if "admin" in roles:
flash("You have registered a new admin!", "danger")
try:
model = user_datastore.create_user(email=email, password=passwordHash, roles=roles)
self.on_model_change(form, model, True)
self.session.commit()
except Exception as ex:
flash(ex, "error")
self.session.rollback()
else:
flash(
f"{email} registered with roles: {', '.join([role.name for role in form.roles.data])}.",
category="success",
)
flash(f"Random password: {password}", category="warning")
return model
def on_model_delete(self, model):
if model == current_user:
raise Exception(self.deleteSelfException)
def on_model_change(self, form, model, is_created):
if model == current_user and not form.active.data:
raise Exception(self.deactivateSelfException)
def handle_view_exception(self, exc):
if exc.args[0] in (self.deleteSelfException, self.deactivateSelfException):
pass
else:
return super().handle_view_exception(exc)
class RoleView(SecureModelView):
can_create = False
can_edit = False
can_delete = False
column_display_actions = False
column_list = ["name", "description"]
class SemesterView(SecureModelView):
can_edit = False
can_delete = False
column_display_actions = False
column_list = ["label", "parts"]
form_columns = ["semester_label", "year", "transfer_parts", "transfer_assistants"]
semesterLabels = ["WS", "SS"]
form_extra_fields = {
"semester_label": RadioField(
"Semester", choices=list(zip(semesterLabels, semesterLabels)), validators=[DataRequired()]
),
"year": TextField("Year", validators=[DataRequired()]),
"transfer_parts": BooleanField(
"Transfer parts",
description="This option transfers the parts you have in your current active semester. Make sure that your semester is the last semester before creating a new one (recommended)!",
default=True,
),
"transfer_assistants": BooleanField(
"Transfer Assistants",
description="This option transfers assistants of your active semester to active experiments in the new semester. Make sure that your semester is the last semester before creating a new one (recommended)! Active experiments are transfered anyway. If you do not want an experiment to be transfered, set it to inactive before creating the new semester. Experiments which are switched to active before creating the new semester will be created in the new semester without assistants. It is important to check the assistants of all experiments after creating a new semester.",
default=True,
),
}
def create_model(self, form):
try:
model = Semester(label=form.semester_label.data + form.year.data)
self.session.add(model)
self.on_model_change(form, model, True)
self.session.commit()
except Exception as ex:
flash(ex, "error")
self.session.rollback()
else:
self.after_model_change(form, model, True)
return model
def after_model_change(self, form, model, is_created):
admin.add_link(
MenuLink(
name=model.label,
url=url_for("set_semester") + "?semester_id=" + str(model.id),
category="Active semester",
)
)
oldSemester = userActiveSemester()
setUserActiveSemester(model.id)
try:
if form.transfer_parts.data:
model.transferPartsFrom(oldSemester)
oldSemesterExperiments = [
semesterExperiment.experiment for semesterExperiment in oldSemester.semester_experiments
]
for experiment in Experiment.query.filter(Experiment.active == True):
newSemesterExperiment = SemesterExperiment(experiment=experiment, semester=model)
if form.transfer_assistants.data:
for oldSemesterExperiment in oldSemester.semester_experiments:
if oldSemesterExperiment.experiment == experiment:
newSemesterExperiment.assistants = oldSemesterExperiment.assistants
self.session.add(newSemesterExperiment)
self.session.commit()
except Exception as ex:
flash(ex, "error")
self.session.rollback()
class PartView(SecureModelView):
can_view_details = True
column_details_list = ["label", "semester", "part_students", "groups"]
form_columns = ["label", "semester"]
def get_query(self):
return super().get_query().filter(Part.id.in_([part.id for part in userActiveSemester().parts]))
def get_count_query(self):
return (
self.session.query(func.count("*"))
.select_from(self.model)
.filter(Part.id.in_([part.id for part in userActiveSemester().parts]))
)
class StudentView(SecureModelView):
can_view_details = True
column_list = ["student_number", "first_name", "last_name", "uni_email", "contact_email", "part_students"]
column_details_list = column_list + ["bachelor_thesis", "bachelor_thesis_work_group", "note"]
column_sortable_list = ["student_number", "first_name", "last_name"]
column_searchable_list = column_sortable_list + ["uni_email", "contact_email"]
form_excluded_columns = ["part_students"]
form_args = {
"uni_email": {"validators": [Email()]},
"contact_email": {"validators": [Email()]},
}
column_extra_row_actions = [
EndpointLinkRowAction(
"glyphicon glyphicon-time",
id_arg="flt1_0",
title="Experiments history",
endpoint="experimentmark.index_view",
)
]
def partQueryFactory():
return Part.query.filter(Part.id.in_([part.id for part in userActiveSemester().parts]))
def groupQueryFactory():
return Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts]))
class PartStudentView(SecureModelView):
class CreateForm(Form):
def studentQueryFactory():
return Student.query
student = QuerySelectField(
"Student", query_factory=studentQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-"
)
part = QuerySelectField(
"Part", query_factory=partQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-"
)
group = QuerySelectField("Group", query_factory=groupQueryFactory, allow_blank=True, blank_text="-")
class EditForm(CreateForm):
student = None
part = None
final_part_mark = FloatField("Final Part Mark", validators=[Optional()])
form = EditForm
column_filters = ["part", "student", "group"]
partGroupPartMismatchException = "Student's part and group's part do not match!"
def create_form(self, obj=None):
form = self.CreateForm
return form(get_form_data(), obj=obj)
def on_model_change(self, form, model, is_created):
if model.group and model.part != model.group.part:
raise Exception(self.partGroupPartMismatchException)
def handle_view_exception(self, exc):
if exc.args[0] in (self.partGroupPartMismatchException):
pass
else:
return super().handle_view_exception(exc)
def get_query(self):
return super().get_query().filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts]))
def get_count_query(self):
return (
self.session.query(func.count("*"))
.select_from(self.model)
.filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts]))
)
class GroupView(SecureModelView):
class CreateForm(Form):
def partStudentsQueryFactory():
return PartStudent.query.filter(PartStudent.part_id.in_([part.id for part in userActiveSemester().parts]))
part = QuerySelectField(
"Part", query_factory=partQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-"
)
part_students = QuerySelectMultipleField("Part Students", query_factory=partStudentsQueryFactory)
class EditForm(CreateForm):
part = None
form = EditForm
column_list = ["number", "part", "part_students", "group_experiments"]
column_filters = ["number", "part"]
partStudentPartPartMismatchException = "Group's part and student's part do not match!"
def create_model(self, form):
try:
orderedPartGroups = Group.query.filter(Group.part == form.part.data).order_by(Group.number)
lastTakenGroupNumber = orderedPartGroups[-1].number if orderedPartGroups.count() > 0 else 0
model = Group(
number=lastTakenGroupNumber + 1,
part_students=form.part_students.data,
part=form.part.data,
)
self.session.add(model)
self.on_model_change(form, model, True)
self.session.commit()
except Exception as ex:
flash(ex, "error")
self.session.rollback()
else:
self.after_model_change(form, model, True)
return model
def on_model_change(self, form, model, is_created):
for partStudent in model.part_students:
if model.part != partStudent.part:
raise Exception(self.partStudentPartPartMismatchException)
def handle_view_exception(self, exc):
if exc.args[0] in (self.partStudentPartPartMismatchException):
pass
else:
return super().handle_view_exception(exc)
def create_form(self, obj=None):
form = self.CreateForm
return form(get_form_data(), obj=obj)
def get_query(self):
return super().get_query().filter(Group.part_id.in_([part.id for part in userActiveSemester().parts]))
def get_count_query(self):
return (
self.session.query(func.count("*"))
.select_from(self.model)
.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts]))
)
class ExperimentView(SecureModelView):
can_view_details = True
column_filters = ["active"]
column_list = ["label", "title", "active"]
column_details_list = column_list + [
"desciption",
"wiki_link",
"room",
"building",
"responsibility",
"duration_in_days",
"oral_weighting",
"protocol_weighting",
"final_weighting",
"semester_experiments",
]
column_editable_list = ["active"]
class SemesterExperimentView(SecureModelView):
column_list = ["experiment", "semester", "assistants"]
def get_query(self):
return super().get_query().filter(SemesterExperiment.semester == userActiveSemester())
def get_count_query(self):
return (
self.session.query(func.count("*"))
.select_from(self.model)
.filter(SemesterExperiment.semester == userActiveSemester())
)
class AssistantView(SecureModelView):
can_view_details = True
column_list = ["first_name", "last_name", "user", "semester_experiments"]
column_details_list = column_list + [
"phone_number",
"mobile_phone_number",
"room",
"building",
"appointments",
"experiment_marks",
]
column_filters = ["user.active"]
form_excluded_columns = ["experiment_marks"]
class GroupExperimentView(SecureModelView):
class CreateForm(Form):
def semesterExperimentQueryFactory():
return SemesterExperiment.query.filter(SemesterExperiment.semester == userActiveSemester())
def assistantQueryFactory():
return Assistant.query.filter(
Assistant.user_id.in_([user.id for user in User.query.filter(User.active == True)])
)
group = QuerySelectField(
"Group", query_factory=groupQueryFactory, validators=[DataRequired()], allow_blank=True, blank_text="-"
)
semester_experiment = QuerySelectField(
"Semester Experiment",
query_factory=semesterExperimentQueryFactory,
validators=[DataRequired()],
allow_blank=True,
blank_text="-",
)
assistantBlankText = "Auto assign if experiment has only one assistant"
appointment1_date = DateField("Appointment-1 Date", validators=[Optional()])
appointment1_special = BooleanField("Appointment-1 special", default=False)
appointment1_assistant = QuerySelectField(
"Appointment-1 Assistant",
query_factory=assistantQueryFactory,
allow_blank=True,
blank_text=assistantBlankText,
)
appointment2_date = DateField("Appointment-2 Date", validators=[Optional()])
appointment2_special = BooleanField("Appointment-2 special", default=False)
appointment2_assistant = QuerySelectField(
"Appointment-2 Assistant",
query_factory=assistantQueryFactory,
allow_blank=True,
blank_text=assistantBlankText,
)
form = CreateForm
can_edit = False
column_list = ["group", "semester_experiment", "appointments", "experiment_marks"]
column_filters = ["group", "semester_experiment.experiment", "appointments"]
def create_model(self, form):
try:
model = GroupExperiment.checkAndInit(
semester_experiment=form.semester_experiment.data, group=form.group.data
)
self.session.add(model)
for appointmentDate, special, assistant in zip(
[form.appointment1_date.data, form.appointment2_date.data],
[form.appointment1_special.data, form.appointment2_special.data],
[form.appointment1_assistant.data, form.appointment2_assistant.data],
):
if appointmentDate:
if assistant:
if assistant not in form.semester_experiment.data.assistants:
raise Exception(f"{assistant} not responsible for {form.semester_experiment.data}!")
else:
if len(form.semester_experiment.data.assistants) != 1:
raise Exception(
f"Experiment {form.semester_experiment.data} has more than one assistant. You have to assign one of these assistants: {form.semester_experiment.data.assistants}"
)
assistant = form.semester_experiment.data.assistants[0]
appointment = Appointment(
date=appointmentDate,
special=special,
group_experiment=model,
assistant=assistant,
)
self.session.add(appointment)
self.on_model_change(form, model, True)
self.session.commit()
except Exception as ex:
flash(ex, "error")
self.session.rollback()
else:
if model.appointments:
flash(f"Appointments {model.appointments} added.", "success")
self.after_model_change(form, model, True)
return model
def get_query(self):
return (
super()
.get_query()
.filter(
GroupExperiment.group_id.in_(
[
g.id
for g in Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts]))
]
)
)
)
def get_count_query(self):
return (
self.session.query(func.count("*"))
.select_from(self.model)
.filter(
GroupExperiment.group_id.in_(
[
g.id
for g in Group.query.filter(Group.part_id.in_([part.id for part in userActiveSemester().parts]))
]
)
)
)
class AppointmentView(SecureModelView):
column_list = ["date", "special", "group_experiment", "assistant"]
class ExperimentMarkView(SecureModelView):
class StudentIdFilter(BaseSQLAFilter):
def apply(self, query, value, alias=None):
return query.filter(self.column == value)
def operation(self):
return "equals"
def validate(self, value):
if Student.query.get(value):
return True
else:
return False
column_filters = [
StudentIdFilter(PartStudent.id, "Student / ID"),
"part_student.student",
"group_experiment.semester_experiment.semester",
"group_experiment.semester_experiment.experiment",
"assistant",
]
admin.add_view(StudentView(Student, db.session))
admin.add_view(PartStudentView(PartStudent, db.session))
admin.add_view(GroupView(Group, db.session))
admin.add_view(GroupExperimentView(GroupExperiment, db.session))
admin.add_view(AppointmentView(Appointment, db.session))
admin.add_view(ExperimentMarkView(ExperimentMark, db.session))
admin.add_view(ExperimentView(Experiment, db.session))
admin.add_view(SemesterExperimentView(SemesterExperiment, db.session))
admin.add_view(AssistantView(Assistant, db.session))
admin.add_view(PartView(Part, db.session))
admin.add_view(SemesterView(Semester, db.session))
admin.add_view(UserView(User, db.session))
admin.add_view(RoleView(Role, db.session))
with app.app_context():
semesters = Semester.query.order_by(Semester.id)
for semester in semesters:
admin.add_link(
MenuLink(
name=semester.label,
url=url_for("set_semester") + "?semester_id=" + str(semester.id),
category="Active semester",
)
)
admin.add_link(MenuLink(name="Logout", url=url_for("security.logout")))