from datetime import date from math import ceil from random import randint import click from flask_security.utils import hash_password from advlabdb import create_app, data_dir, user_datastore from advlabdb.exceptions import DatabaseException from advlabdb.models import ( Admin, Appointment, Assistant, Experiment, ExperimentMark, Group, GroupExperiment, Part, PartStudent, Program, Semester, SemesterExperiment, Student, db, ) from cli.test.generate_test_db.assistant_names import assistant_names from cli.test.generate_test_db.experiment_titles import experiment_titles from cli.test.generate_test_db.student_names import student_names def db_add(obj): db.session.add(obj) return obj def _generate_test_db(): db_file = data_dir / "db/advlabdb.sqlite" if db_file.is_file(): click.echo( click.style( f"Generating a test database is not allowed because a database already exists at {db_file}.", fg="red", ) ) return app = create_app(create_for_server=False) with app.app_context(), db.session.begin(): db.create_all() bs_prog = db_add(Program(label="BS")) ms_prog = db_add(Program(label="MS")) be_prog = db_add(Program(label="BE")) programs = (bs_prog, ms_prog, be_prog) ss = db_add(Semester(label="SS", year=23)) ws = db_add(Semester(label="WS", year=23)) semesters = (ss, ws) bs_1_ss_part = db_add(Part(semester=ss, program=bs_prog, number=1)) bs_2_ss_part = db_add(Part(semester=ss, program=bs_prog, number=2)) ms_1_ss_part = db_add(Part(semester=ss, program=ms_prog, number=1)) ms_2_ss_part = db_add(Part(semester=ss, program=ms_prog, number=2)) be_1_ss_part = db_add(Part(semester=ss, program=be_prog, number=1)) be_2_ss_part = db_add(Part(semester=ss, program=be_prog, number=2)) ss_parts = (bs_1_ss_part, bs_2_ss_part, ms_1_ss_part, ms_2_ss_part, be_1_ss_part, be_2_ss_part) db_add(Part(semester=ws, program=bs_prog, number=1)) bs_2_ws_part = db_add(Part(semester=ws, program=bs_prog, number=2)) ms_1_ws_part = db_add(Part(semester=ws, program=ms_prog, number=1)) ms_2_ws_part = db_add(Part(semester=ws, program=ms_prog, number=2)) db_add(Part(semester=ws, program=be_prog, number=1)) be_2_ws_part = db_add(Part(semester=ws, program=be_prog, number=2)) semester_program_part_students = { ss: {program: [] for program in programs}, ws: {program: [] for program in programs}, } def add_part_student(student, part): part_student = db_add(PartStudent(student=student, part=part)) semester_program_part_students[part.semester][part.program].append(part_student) for ind, name in enumerate(student_names): contact_email = f"{name[0]}-{name[1]}@private.de".lower() if ind % 2 == 0 else None student = db_add( Student( student_number=ind, first_name=name[0], last_name=name[1], uni_email=f"{name[0]}-{name[1]}@uni.de".lower(), contact_email=contact_email, ) ) part = ss_parts[ind % len(ss_parts)] add_part_student(student, part) if ind % 10 < 9: # Transfer to the next part in the second semester if part == bs_1_ss_part: add_part_student(student, bs_2_ws_part) elif part == bs_2_ss_part: add_part_student(student, ms_1_ws_part) elif part == ms_1_ss_part: add_part_student(student, ms_2_ws_part) elif part == be_1_ss_part: add_part_student(student, be_2_ws_part) program_groups = {program: [] for program in programs} for program_part_students in semester_program_part_students.values(): for program, part_students in program_part_students.items(): if len(part_students) % 2 == 1: # Add the first 3 students into a special group for an uneven number of students group_part_students = [] for _ in range(3): if len(part_students) == 0: break group_part_students.append(part_students.pop(0)) group = db_add(Group.customInit(group_part_students)) program_groups[program].append(group) while len(part_students) >= 2: # Add the rest of the students into groups of 2 group = db_add(Group.customInit([part_students.pop(0) for i in range(2)])) program_groups[program].append(group) program_semester_experiments = {program: [] for program in programs} all_semester_experiments = [] for ind, title in enumerate(experiment_titles): program = programs[ind % len(programs)] experiment = db_add( Experiment( number=ind + 1, program=program, title=title, building="Not assigned", room="Not assigned", duration_in_days=2, ) ) for semester in semesters: semester_experiment = db_add(SemesterExperiment(experiment=experiment, semester=semester)) program_semester_experiments[program].append(semester_experiment) all_semester_experiments.append(semester_experiment) all_group_experiments = [] for program in programs: semester_experiments = program_semester_experiments[program] num_semester_experiments = len(semester_experiments) for ind, group in enumerate(program_groups[program]): num_tries = 0 while True: try: semester_experiment = semester_experiments[ind % num_semester_experiments] group_experiment = db_add(GroupExperiment(semester_experiment=semester_experiment, group=group)) for part_student in group.part_students: db.session.add(ExperimentMark(part_student=part_student, group_experiment=group_experiment)) except DatabaseException as ex: # Catch an error when a student becomes the same experiment for the second time. # Try the next experiment! ind += 1 num_tries += 1 if num_tries == num_semester_experiments: raise ex else: break all_group_experiments.append(group_experiment) adminRole = user_datastore.create_role(name="admin") assistantRole = user_datastore.create_role(name="assistant") admin_names = ( ("Aileen", "Grant"), ("Ben", "Huber"), ) for name in admin_names: user = user_datastore.create_user( email=f"{name[0]}-{name[1]}@uni.de".lower(), password=hash_password("admin"), roles=[adminRole], first_name=name[0], last_name=name[1], active_semester=ws, ) db_add(Admin(user=user)) num_assistants = len(assistant_names) num_assistant_experiments = ceil(len(all_semester_experiments) / num_assistants) for name in assistant_names: user = user_datastore.create_user( email=f"{name[0]}-{name[1]}@uni.de".lower(), password=hash_password("assistant"), roles=[assistantRole], first_name=name[0], last_name=name[1], mobile_phone_number="01" + "".join(str(randint(0, 9)) for i in range(10)), active_semester=ws, ) semester_experiments = [] for _ in range(num_assistant_experiments): if len(all_semester_experiments) == 0: break semester_experiments.append(all_semester_experiments.pop(0)) db_add(Assistant(user=user, semester_experiments=semester_experiments)) for group_experiment in all_group_experiments: semester_experiment = group_experiment.semester_experiment special = False semester = semester_experiment.semester if semester.label == "SS": month = randint(3, 8) if month <= 4: special = True else: month = randint(9, 12) if month <= 10: special = True year = 2000 + semester.year day = randint(1, 28) for appointment_date in (date(year, month, day), date(year, month, day + 1)): db_add( Appointment( date=appointment_date, special=special, group_experiment=group_experiment, assistant=semester_experiment.assistants[0], ) )