From b86f09322522f4119afe89b60dc40d96529813fd Mon Sep 17 00:00:00 2001 From: Mo8it Date: Wed, 10 Aug 2022 01:15:29 +0200 Subject: [PATCH] Add test db command --- manage.py | 264 ++++++++++++++++++++++++++++++++-- scripts/test/test_database.py | 166 --------------------- 2 files changed, 256 insertions(+), 174 deletions(-) delete mode 100644 scripts/test/test_database.py diff --git a/manage.py b/manage.py index ecbf36c..bf20333 100755 --- a/manage.py +++ b/manage.py @@ -2,8 +2,14 @@ import secrets import subprocess # nosec 404 +from datetime import date +from math import ceil from pathlib import Path +from random import randint, random from shutil import copytree, rmtree +from test.assistant_names import assistant_names +from test.experiment_titles import experiment_titles +from test.student_names import student_names import click from email_validator import validate_email @@ -12,8 +18,26 @@ from flask_security import admin_change_password, hash_password from sqlalchemy import select from advlabdb import create_app, settings, user_datastore +from advlabdb.exceptions import DatabaseException from advlabdb.model_independent_funs import randomPassword -from advlabdb.models import MAX_YEAR, MIN_YEAR, Admin, Semester, User, db +from advlabdb.models import ( + MAX_YEAR, + MIN_YEAR, + Admin, + Appointment, + Assistant, + Experiment, + Group, + GroupExperiment, + Part, + PartStudent, + Program, + Semester, + SemesterExperiment, + Student, + User, + db, +) def run(command: str, **kwargs): @@ -26,6 +50,11 @@ def box(message: str): click.echo() +def db_add(obj): + db.session.add(obj) + return obj + + # Class to validate email in click.prompt class EmailParamType(click.ParamType): def convert(self, value, param, ctx): @@ -215,13 +244,6 @@ def copy_admin_templates(): dist = Path("advlabdb/templates/admin") if dist.is_dir(): - while True: - ans = input().lower() - if ans == "s": - print("Process stopped!") - return False - elif ans == "o": - break if not click.confirm( click.style(f"The directory {dist} already exists! Do you want to overwrite it?", fg="yellow") ): @@ -259,5 +281,231 @@ _________ ) +@cli.group( + short_help="Test commands.", + help="Commands used to test AdvLabDB.", +) +def test(): + pass + + +@test.command( + short_help="Generate test database.", + help="Generate a test database if no database already exists.", +) +def generate_test_db(): + db_file = Path(settings["SQLITE_DB_PATH"]) + if db_file.is_file(): + click.echo( + click.style( + f"Generating a test database is not allowed because a database does already exist at {db_file}.", + fg="red", + ) + ) + return + + app = create_app(create_for_server=False) + + with app.app_context(): + with db.session.begin(): + db.create_all() + + bs_prog = db_add(Program(label="BS")) + ms_prog = db_add(Program(label="MS")) + be_prog = db_add(Program(label="BE")) + programs = (bs_prog, ms_prog, be_prog) + + ss = db_add(Semester(label="SS", year=23)) + ws = db_add(Semester(label="WS", year=23)) + semesters = (ss, ws) + + bs_1_ss_part = db_add(Part(semester=ss, program=bs_prog, number=1)) + bs_2_ss_part = db_add(Part(semester=ss, program=bs_prog, number=2)) + ms_1_ss_part = db_add(Part(semester=ss, program=ms_prog, number=1)) + ms_2_ss_part = db_add(Part(semester=ss, program=ms_prog, number=2)) + be_1_ss_part = db_add(Part(semester=ss, program=be_prog, number=1)) + be_2_ss_part = db_add(Part(semester=ss, program=be_prog, number=2)) + ss_parts = (bs_1_ss_part, bs_2_ss_part, ms_1_ss_part, ms_2_ss_part, be_1_ss_part, be_2_ss_part) + + bs_1_ws_part = db_add(Part(semester=ws, program=bs_prog, number=1)) + bs_2_ws_part = db_add(Part(semester=ws, program=bs_prog, number=2)) + ms_1_ws_part = db_add(Part(semester=ws, program=ms_prog, number=1)) + ms_2_ws_part = db_add(Part(semester=ws, program=ms_prog, number=2)) + be_1_ws_part = db_add(Part(semester=ws, program=be_prog, number=1)) + be_2_ws_part = db_add(Part(semester=ws, program=be_prog, number=2)) + + semester_program_part_students = { + ss: {program: [] for program in programs}, + ws: {program: [] for program in programs}, + } + + def add_part_student(student, part): + part_student = db_add(PartStudent(student=student, part=part)) + semester_program_part_students[part.semester][part.program].append(part_student) + + for ind, name in enumerate(student_names): + if randint(0, 1) == 0: + contact_email = f"{name[0]}-{name[1]}@private.de".lower() + else: + contact_email = None + + student = db_add( + Student( + student_number=ind, + first_name=name[0], + last_name=name[1], + uni_email=f"{name[0]}-{name[1]}@uni.de".lower(), + contact_email=contact_email, + ) + ) + + part = ss_parts[ind % len(ss_parts)] + add_part_student(student, part) + + if random() < 0.9: + # Transfer to the next part in the second semester + if part == bs_1_ss_part: + add_part_student(student, bs_2_ws_part) + elif part == bs_2_ss_part: + add_part_student(student, ms_1_ws_part) + elif part == ms_1_ss_part: + add_part_student(student, ms_2_ws_part) + elif part == be_1_ss_part: + add_part_student(student, be_2_ws_part) + + program_groups = {program: [] for program in programs} + for semester, program_part_students in semester_program_part_students.items(): + for program, part_students in program_part_students.items(): + if len(part_students) % 2 == 1: + # Add the first 3 students into a special group for an uneven number of students + group_part_students = [] + for i in range(3): + if len(part_students) == 0: + break + + group_part_students.append(part_students.pop(0)) + + group = db_add(Group.customInit(group_part_students)) + program_groups[program].append(group) + + while len(part_students) >= 2: + # Add the rest of the students into groups of 2 + group = db_add(Group.customInit([part_students.pop(0) for i in range(2)])) + program_groups[program].append(group) + + program_semester_experiments = {program: [] for program in programs} + all_semester_experiments = [] + for ind, title in enumerate(experiment_titles): + program = programs[ind % len(programs)] + + experiment = db_add( + Experiment( + number=ind + 1, + program=program, + title=title, + building="Not assigned", + room="Not assigned", + duration_in_days=2, + ) + ) + + for semester in semesters: + semester_experiment = db_add(SemesterExperiment(experiment=experiment, semester=semester)) + program_semester_experiments[program].append(semester_experiment) + all_semester_experiments.append(semester_experiment) + + all_group_experiments = [] + for program in programs: + semester_experiments = program_semester_experiments[program] + num_semester_experiments = len(semester_experiments) + + for ind, group in enumerate(program_groups[program]): + num_tries = 0 + while True: + try: + semester_experiment = semester_experiments[ind % num_semester_experiments] + group_experiment = db_add( + GroupExperiment(semester_experiment=semester_experiment, group=group) + ) + except DatabaseException as ex: + # Catch an error when a student becomes the same experiment for the second time. + # Try the next experiment! + ind += 1 + num_tries += 1 + if num_tries == num_semester_experiments: + raise ex + else: + break + + all_group_experiments.append(group_experiment) + + adminRole = user_datastore.create_role(name="admin") + assistantRole = user_datastore.create_role(name="assistant") + + admin_names = ( + ("Aileen", "Grant"), + ("Ben", "Huber"), + ) + for name in admin_names: + user = user_datastore.create_user( + email=f"{name[0]}-{name[1]}@uni.de".lower(), + password=hash_password("admin"), + roles=[adminRole], + first_name=name[0], + last_name=name[1], + active_semester=ws, + ) + + db_add(Admin(user=user)) + + num_assistants = len(assistant_names) + num_assistant_experiments = ceil(len(all_semester_experiments) / num_assistants) + + for name in assistant_names: + user = user_datastore.create_user( + email=f"{name[0]}-{name[1]}@uni.de".lower(), + password=hash_password("assistant"), + roles=[assistantRole], + first_name=name[0], + last_name=name[1], + mobile_phone_number="01" + "".join(str(randint(0, 9)) for i in range(10)), + active_semester=ws, + ) + + semester_experiments = [] + for i in range(num_assistant_experiments): + if len(all_semester_experiments) == 0: + break + + semester_experiments.append(all_semester_experiments.pop(0)) + + db_add(Assistant(user=user, semester_experiments=semester_experiments)) + + for group_experiment in all_group_experiments: + semester_experiment = group_experiment.semester_experiment + special = False + semester = semester_experiment.semester + if semester.label == "SS": + month = randint(3, 8) + if month <= 4: + special = True + else: + month = randint(9, 12) + if month <= 10: + special = True + + year = 2000 + semester.year + day = randint(1, 28) + for appointment_date in (date(year, month, day), date(year, month, day + 1)): + db_add( + Appointment( + date=appointment_date, + special=special, + group_experiment=group_experiment, + assistant=semester_experiment.assistants[0], + ) + ) + + if __name__ == "__main__": cli() diff --git a/scripts/test/test_database.py b/scripts/test/test_database.py deleted file mode 100644 index ead32a1..0000000 --- a/scripts/test/test_database.py +++ /dev/null @@ -1,166 +0,0 @@ -from datetime import date - -from flask_security import hash_password - -from ... import app, db, user_datastore -from ...models import ( - Admin, - Appointment, - Assistant, - Experiment, - Group, - GroupExperiment, - Part, - PartStudent, - Program, - Semester, - SemesterExperiment, - Student, -) - - -def main(): - with app.app_context(): - with db.session.begin(): - db.drop_all() - db.create_all() - - program1 = Program(label="BS") - program2 = Program(label="MS") - program3 = Program(label="BE") - - db.session.add(program1) - db.session.add(program2) - db.session.add(program3) - - sem1 = Semester(label="SS", year=22) - sem2 = Semester(label="WS", year=22) - - db.session.add(sem1) - db.session.add(sem2) - - partKwargs = [ - {"program": program1, "number": 1}, - {"program": program1, "number": 2}, - {"program": program2, "number": 1}, - {"program": program2, "number": 2}, - {"program": program3, "number": 1}, - ] - for kwargs in partKwargs: - db.session.add(Part(semester=sem1, **kwargs)) - - sem2.transferPartsFrom(sem1) - - part1 = sem2.parts[0] - part2 = sem2.parts[2] - - student1 = Student(student_number=123, first_name="Mo", last_name="Bit", uni_email="m@test.com") - student2 = Student(student_number=1232, first_name="Mo2", last_name="Bit", uni_email="m2@test.com") - student3 = Student(student_number=1233, first_name="Mo3", last_name="Bit3", uni_email="m3@test.com") - - db.session.add(student1) - db.session.add(student2) - db.session.add(student3) - - ps1 = PartStudent(student=student1, part=part1) - ps2 = PartStudent(student=student2, part=part1) - ps3 = PartStudent(student=student3, part=part2) - - db.session.add(ps1) - db.session.add(ps2) - db.session.add(ps3) - - g1 = Group.customInit(part_students=[ps1, ps2]) - g2 = Group.customInit(part_students=[ps3]) - - db.session.add(g1) - db.session.add(g2) - - ex1 = Experiment( - number=1, - program=program1, - title="exp", - room="123", - building="phy", - responsibility="none", - duration_in_days=2, - ) - - ex2 = Experiment( - number=1, - program=program2, - title="exp2", - room="123", - building="phy", - responsibility="none", - duration_in_days=2, - ) - - db.session.add(ex1) - db.session.add(ex2) - - sx1 = SemesterExperiment(experiment=ex1, semester=sem2) - sx2 = SemesterExperiment(experiment=ex2, semester=sem2) - - db.session.add(sx1) - db.session.add(sx2) - - gx1 = GroupExperiment(semester_experiment=sx1, group=g1) - gx2 = GroupExperiment(semester_experiment=sx2, group=g2) - - db.session.add(gx1) - db.session.add(gx2) - - adminRole = user_datastore.create_role(name="admin") - assistantRole = user_datastore.create_role(name="assistant") - - admin_user = user_datastore.create_user( - email="admin@advlabdb.de", - password=hash_password("admin"), - roles=[adminRole], - first_name="Peter", - last_name="Blümler", - active_semester=sem2, - ) - - admin = Admin(user=admin_user) - - db.session.add(admin) - - us1 = user_datastore.create_user( - email="test@protonmail.com", - password=hash_password("h1"), - roles=[assistantRole], - first_name="As1", - last_name="l", - phone_number="012333212", - mobile_phone_number="012334123", - active_semester=sem2, - ) - us2 = user_datastore.create_user( - email="test2@protonmail.com", - password=hash_password("h2"), - roles=[assistantRole], - first_name="As2", - last_name="l", - active_semester=sem2, - ) - - as1 = Assistant(user=us1) - as2 = Assistant(user=us2) - - as1.semester_experiments.append(sx1) - as2.semester_experiments.append(sx2) - - db.session.add(as1) - db.session.add(as2) - - ap1 = Appointment(date=date(2022, 3, 21), special=True, group_experiment=gx1, assistant=as1) - ap2 = Appointment(date=date(2022, 3, 22), special=True, group_experiment=gx2, assistant=as2) - - db.session.add(ap1) - db.session.add(ap2) - - -if __name__ == "__main__": - main()