1
0
Fork 0
mirror of https://codeberg.org/Mo8it/AdvLabDB.git synced 2024-09-19 18:31:16 +00:00
AdvLabDB/cli/test/generate_test_db.py

248 lines
10 KiB
Python
Raw Normal View History

2022-08-14 00:54:22 +00:00
from datetime import date
from math import ceil
from pathlib import Path
from random import randint, random
from test.assistant_names import assistant_names
from test.experiment_titles import experiment_titles
from test.student_names import student_names
import click
from flask_security import hash_password
from advlabdb import create_app, settings, user_datastore
from advlabdb.exceptions import DatabaseException
from advlabdb.models import (
Admin,
Appointment,
Assistant,
Experiment,
Group,
GroupExperiment,
Part,
PartStudent,
Program,
Semester,
SemesterExperiment,
Student,
db,
)
def db_add(obj):
db.session.add(obj)
return obj
def _generate_test_db():
db_file = Path(settings["SQLITE_DB_PATH"])
if db_file.is_file():
click.echo(
click.style(
f"Generating a test database is not allowed because a database does already exist at {db_file}.",
fg="red",
)
)
return
app = create_app(create_for_server=False)
with app.app_context():
with db.session.begin():
db.create_all()
bs_prog = db_add(Program(label="BS"))
ms_prog = db_add(Program(label="MS"))
be_prog = db_add(Program(label="BE"))
programs = (bs_prog, ms_prog, be_prog)
ss = db_add(Semester(label="SS", year=23))
ws = db_add(Semester(label="WS", year=23))
semesters = (ss, ws)
bs_1_ss_part = db_add(Part(semester=ss, program=bs_prog, number=1))
bs_2_ss_part = db_add(Part(semester=ss, program=bs_prog, number=2))
ms_1_ss_part = db_add(Part(semester=ss, program=ms_prog, number=1))
ms_2_ss_part = db_add(Part(semester=ss, program=ms_prog, number=2))
be_1_ss_part = db_add(Part(semester=ss, program=be_prog, number=1))
be_2_ss_part = db_add(Part(semester=ss, program=be_prog, number=2))
ss_parts = (bs_1_ss_part, bs_2_ss_part, ms_1_ss_part, ms_2_ss_part, be_1_ss_part, be_2_ss_part)
db_add(Part(semester=ws, program=bs_prog, number=1))
bs_2_ws_part = db_add(Part(semester=ws, program=bs_prog, number=2))
ms_1_ws_part = db_add(Part(semester=ws, program=ms_prog, number=1))
ms_2_ws_part = db_add(Part(semester=ws, program=ms_prog, number=2))
db_add(Part(semester=ws, program=be_prog, number=1))
be_2_ws_part = db_add(Part(semester=ws, program=be_prog, number=2))
semester_program_part_students = {
ss: {program: [] for program in programs},
ws: {program: [] for program in programs},
}
def add_part_student(student, part):
part_student = db_add(PartStudent(student=student, part=part))
semester_program_part_students[part.semester][part.program].append(part_student)
for ind, name in enumerate(student_names):
if randint(0, 1) == 0: # nosec: B311
contact_email = f"{name[0]}-{name[1]}@private.de".lower()
else:
contact_email = None
student = db_add(
Student(
student_number=ind,
first_name=name[0],
last_name=name[1],
uni_email=f"{name[0]}-{name[1]}@uni.de".lower(),
contact_email=contact_email,
)
)
part = ss_parts[ind % len(ss_parts)]
add_part_student(student, part)
if random() < 0.9: # nosec: B311
# Transfer to the next part in the second semester
if part == bs_1_ss_part:
add_part_student(student, bs_2_ws_part)
elif part == bs_2_ss_part:
add_part_student(student, ms_1_ws_part)
elif part == ms_1_ss_part:
add_part_student(student, ms_2_ws_part)
elif part == be_1_ss_part:
add_part_student(student, be_2_ws_part)
program_groups = {program: [] for program in programs}
for semester, program_part_students in semester_program_part_students.items():
for program, part_students in program_part_students.items():
if len(part_students) % 2 == 1:
# Add the first 3 students into a special group for an uneven number of students
group_part_students = []
for i in range(3):
if len(part_students) == 0:
break
group_part_students.append(part_students.pop(0))
group = db_add(Group.customInit(group_part_students))
program_groups[program].append(group)
while len(part_students) >= 2:
# Add the rest of the students into groups of 2
group = db_add(Group.customInit([part_students.pop(0) for i in range(2)]))
program_groups[program].append(group)
program_semester_experiments = {program: [] for program in programs}
all_semester_experiments = []
for ind, title in enumerate(experiment_titles):
program = programs[ind % len(programs)]
experiment = db_add(
Experiment(
number=ind + 1,
program=program,
title=title,
building="Not assigned",
room="Not assigned",
duration_in_days=2,
)
)
for semester in semesters:
semester_experiment = db_add(SemesterExperiment(experiment=experiment, semester=semester))
program_semester_experiments[program].append(semester_experiment)
all_semester_experiments.append(semester_experiment)
all_group_experiments = []
for program in programs:
semester_experiments = program_semester_experiments[program]
num_semester_experiments = len(semester_experiments)
for ind, group in enumerate(program_groups[program]):
num_tries = 0
while True:
try:
semester_experiment = semester_experiments[ind % num_semester_experiments]
group_experiment = db_add(
GroupExperiment(semester_experiment=semester_experiment, group=group)
)
except DatabaseException as ex:
# Catch an error when a student becomes the same experiment for the second time.
# Try the next experiment!
ind += 1
num_tries += 1
if num_tries == num_semester_experiments:
raise ex
else:
break
all_group_experiments.append(group_experiment)
adminRole = user_datastore.create_role(name="admin")
assistantRole = user_datastore.create_role(name="assistant")
admin_names = (
("Aileen", "Grant"),
("Ben", "Huber"),
)
for name in admin_names:
user = user_datastore.create_user(
email=f"{name[0]}-{name[1]}@uni.de".lower(),
password=hash_password("admin"),
roles=[adminRole],
first_name=name[0],
last_name=name[1],
active_semester=ws,
)
db_add(Admin(user=user))
num_assistants = len(assistant_names)
num_assistant_experiments = ceil(len(all_semester_experiments) / num_assistants)
for name in assistant_names:
user = user_datastore.create_user(
email=f"{name[0]}-{name[1]}@uni.de".lower(),
password=hash_password("assistant"),
roles=[assistantRole],
first_name=name[0],
last_name=name[1],
mobile_phone_number="01" + "".join(str(randint(0, 9)) for i in range(10)), # nosec: B311
active_semester=ws,
)
semester_experiments = []
for i in range(num_assistant_experiments):
if len(all_semester_experiments) == 0:
break
semester_experiments.append(all_semester_experiments.pop(0))
db_add(Assistant(user=user, semester_experiments=semester_experiments))
for group_experiment in all_group_experiments:
semester_experiment = group_experiment.semester_experiment
special = False
semester = semester_experiment.semester
if semester.label == "SS":
month = randint(3, 8) # nosec: B311
if month <= 4:
special = True
else:
month = randint(9, 12) # nosec: B311
if month <= 10:
special = True
year = 2000 + semester.year
day = randint(1, 28) # nosec: B311
for appointment_date in (date(year, month, day), date(year, month, day + 1)):
db_add(
Appointment(
date=appointment_date,
special=special,
group_experiment=group_experiment,
assistant=semester_experiment.assistants[0],
)
)