1
0
Fork 0
mirror of https://codeberg.org/Mo8it/AdvLabDB.git synced 2024-09-17 18:31:15 +00:00
AdvLabDB/manage.py
2022-08-10 01:15:29 +02:00

511 lines
18 KiB
Python
Executable file

#!/usr/bin/python3
import secrets
import subprocess # nosec 404
from datetime import date
from math import ceil
from pathlib import Path
from random import randint, random
from shutil import copytree, rmtree
from test.assistant_names import assistant_names
from test.experiment_titles import experiment_titles
from test.student_names import student_names
import click
from email_validator import validate_email
from flask_admin import __file__ as flask_admin_path
from flask_security import admin_change_password, hash_password
from sqlalchemy import select
from advlabdb import create_app, settings, user_datastore
from advlabdb.exceptions import DatabaseException
from advlabdb.model_independent_funs import randomPassword
from advlabdb.models import (
MAX_YEAR,
MIN_YEAR,
Admin,
Appointment,
Assistant,
Experiment,
Group,
GroupExperiment,
Part,
PartStudent,
Program,
Semester,
SemesterExperiment,
Student,
User,
db,
)
def run(command: str, **kwargs):
return subprocess.run(command, shell=True, **kwargs) # nosec B602
def box(message: str):
click.echo()
click.echo(click.style(f" {message} ", bg="white", fg="black"))
click.echo()
def db_add(obj):
db.session.add(obj)
return obj
# Class to validate email in click.prompt
class EmailParamType(click.ParamType):
def convert(self, value, param, ctx):
try:
return validate_email(value).email
except Exception:
self.fail(f"{value} is not a valid email!", param, ctx)
@click.group(
help="Command line tool to manage AdvLabDB.",
)
def cli():
pass
@cli.group(
short_help="Setup commands.",
help="Commands used to setup an AdvLabDB instance.",
)
def setup():
pass
@setup.command(
short_help="Generate required secrets.",
help="Generate the file secrets.ini if it does not already exist.",
)
def generate_secrets():
file = Path("secrets.ini")
if file.is_file():
click.echo(f"Skipping secrets generation because the secrets file does already exist at {file}.")
return
with open(file, "w") as f:
f.write("[Secrets]\n")
key = secrets.SystemRandom().getrandbits(128)
f.write(f"SECRET_KEY = {key}\n")
salt = secrets.token_hex()
f.write(f"SECURITY_PASSWORD_SALT = {salt}\n")
@setup.command(
short_help="Initialize the database.",
help="Initialize the database if it does not already exist.",
)
def init_db():
db_file = Path(settings["SQLITE_DB_PATH"])
if db_file.is_file():
click.echo(f"Skipping database initialization because the database does already exist at {db_file}.")
return
app = create_app(create_for_server=False)
with app.app_context():
with db.session.begin():
# Create new database
db.create_all()
semester_label = click.prompt(
"Enter the label of the current semester",
type=click.Choice(("SS", "WS")),
)
semester_year = click.prompt(
f"Enter the year of the current semester (between {MIN_YEAR} and {MAX_YEAR})",
type=click.IntRange(MIN_YEAR, MAX_YEAR),
)
semester = Semester(label=semester_label, year=semester_year)
db.session.add(semester)
adminRole = user_datastore.create_role(name="admin")
user_datastore.create_role(name="assistant")
box("The first admin account will be created now.")
admin_email = click.prompt(
"Enter the admin's email address",
type=EmailParamType(),
)
admin_first_name = click.prompt("Enter the admin's first name")
admin_last_name = click.prompt("Enter the admin's last name")
admin_phone_number = click.prompt(
"Enter the admin's phone number (optional)", default="", show_default=False
)
admin_mobile_phone_number = click.prompt(
"Enter the admin's mobile phone number (optional)", default="", show_default=False
)
admin_building = click.prompt("Enter the admin's building (optional)", default="", show_default=False)
admin_room = click.prompt("Enter the admin's room (optional)", default="", show_default=False)
admin_password = randomPassword()
admin_hashed_password = hash_password(admin_password)
admin_user = user_datastore.create_user(
email=admin_email,
password=admin_hashed_password,
roles=[adminRole],
first_name=admin_first_name.strip(),
last_name=admin_last_name.strip(),
phone_number=admin_phone_number.strip() or None,
mobile_phone_number=admin_mobile_phone_number.strip() or None,
building=admin_building.strip() or None,
room=admin_room.strip() or None,
active_semester=semester,
)
admin = Admin(user=admin_user)
db.session.add(admin)
box(f"Admin password: {admin_password}")
click.echo(click.style("Done database initialization!", fg="green"))
@cli.group(
short_help="Maintenance commands.",
help="Commands used to maintain an AdvLabDB instance.",
)
def maintain():
pass
@maintain.command(
short_help="Reset the password of an admin.",
help="Reset the password of a chosen active admin. A random password will be generated. If no admins are active, a chosen admin will be activated.",
)
def reset_admin_password():
click.echo("This script will generate a new random password for a chosen admin.\n")
app = create_app(create_for_server=False)
with app.app_context():
with db.session.begin():
admins = db.session.execute(select(Admin).join(User).where(User.active == True)).scalars().all()
activate_user = False
if len(admins) == 0:
click.echo("There is no admin with an active user. The user of the chosen admin will be activated.")
admins = db.session.execute(select(Admin)).scalars().all()
activate_user = True
num_admins = len(admins)
prompt = "Admins:\n"
for ind, admin in enumerate(admins):
user = admin.user
prompt += f"[{ind}] {user.first_name} {user.last_name}: {user.email}\n"
prompt += f"Enter number [0-{num_admins - 1}]"
admin_index = click.prompt(
prompt,
type=click.IntRange(0, num_admins - 1),
)
chosen_admin_user = admins[admin_index].user
new_password = randomPassword()
admin_change_password(
chosen_admin_user, new_password, notify=False
) # Password is automatically hashed with this function
if activate_user:
chosen_admin_user.active = True
box(f"New password: {new_password}")
click.echo(click.style("Done!", fg="green"))
@maintain.command(
short_help="Copy admin templates",
help="Copy the templates from the Flask-Admin package. This is only needed if the templates should be updated to a new version after a new release of Flask-Admin.",
)
def copy_admin_templates():
src = Path(flask_admin_path).parent / "templates/bootstrap4/admin"
if not src.is_dir():
click.echo(click.style(f"Templates could not be found at {src}", fg="red"))
return
dist = Path("advlabdb/templates/admin")
if dist.is_dir():
if not click.confirm(
click.style(f"The directory {dist} already exists! Do you want to overwrite it?", fg="yellow")
):
return
rmtree(dist)
click.echo(click.style("Old templates deleted!", fg="yellow"))
copytree(src, dist)
click.echo(click.style(f"Copied {src} -> {dist}", fg="green"))
click.echo(
click.style(
f"""
_________
| WARNING
| -------
| You might have to edit the file {dist}/base.html
| by adding nav in the following way:
| This line:\t<ul class="navbar-nav mr-auto">
| Becomes:\t<ul class="nav navbar-nav mr-auto">
|
| This will prevent the navigation bar from expanding
| such that some elements can not be seen.
| Refer to this pull request:
| https://github.com/flask-admin/flask-admin/pull/2233
|
| If the above pull request is merged and flask-admin
| is on a new release after the merge,
| then this step is not needed.
_________
""",
fg="yellow",
)
)
@cli.group(
short_help="Test commands.",
help="Commands used to test AdvLabDB.",
)
def test():
pass
@test.command(
short_help="Generate test database.",
help="Generate a test database if no database already exists.",
)
def generate_test_db():
db_file = Path(settings["SQLITE_DB_PATH"])
if db_file.is_file():
click.echo(
click.style(
f"Generating a test database is not allowed because a database does already exist at {db_file}.",
fg="red",
)
)
return
app = create_app(create_for_server=False)
with app.app_context():
with db.session.begin():
db.create_all()
bs_prog = db_add(Program(label="BS"))
ms_prog = db_add(Program(label="MS"))
be_prog = db_add(Program(label="BE"))
programs = (bs_prog, ms_prog, be_prog)
ss = db_add(Semester(label="SS", year=23))
ws = db_add(Semester(label="WS", year=23))
semesters = (ss, ws)
bs_1_ss_part = db_add(Part(semester=ss, program=bs_prog, number=1))
bs_2_ss_part = db_add(Part(semester=ss, program=bs_prog, number=2))
ms_1_ss_part = db_add(Part(semester=ss, program=ms_prog, number=1))
ms_2_ss_part = db_add(Part(semester=ss, program=ms_prog, number=2))
be_1_ss_part = db_add(Part(semester=ss, program=be_prog, number=1))
be_2_ss_part = db_add(Part(semester=ss, program=be_prog, number=2))
ss_parts = (bs_1_ss_part, bs_2_ss_part, ms_1_ss_part, ms_2_ss_part, be_1_ss_part, be_2_ss_part)
bs_1_ws_part = db_add(Part(semester=ws, program=bs_prog, number=1))
bs_2_ws_part = db_add(Part(semester=ws, program=bs_prog, number=2))
ms_1_ws_part = db_add(Part(semester=ws, program=ms_prog, number=1))
ms_2_ws_part = db_add(Part(semester=ws, program=ms_prog, number=2))
be_1_ws_part = db_add(Part(semester=ws, program=be_prog, number=1))
be_2_ws_part = db_add(Part(semester=ws, program=be_prog, number=2))
semester_program_part_students = {
ss: {program: [] for program in programs},
ws: {program: [] for program in programs},
}
def add_part_student(student, part):
part_student = db_add(PartStudent(student=student, part=part))
semester_program_part_students[part.semester][part.program].append(part_student)
for ind, name in enumerate(student_names):
if randint(0, 1) == 0:
contact_email = f"{name[0]}-{name[1]}@private.de".lower()
else:
contact_email = None
student = db_add(
Student(
student_number=ind,
first_name=name[0],
last_name=name[1],
uni_email=f"{name[0]}-{name[1]}@uni.de".lower(),
contact_email=contact_email,
)
)
part = ss_parts[ind % len(ss_parts)]
add_part_student(student, part)
if random() < 0.9:
# Transfer to the next part in the second semester
if part == bs_1_ss_part:
add_part_student(student, bs_2_ws_part)
elif part == bs_2_ss_part:
add_part_student(student, ms_1_ws_part)
elif part == ms_1_ss_part:
add_part_student(student, ms_2_ws_part)
elif part == be_1_ss_part:
add_part_student(student, be_2_ws_part)
program_groups = {program: [] for program in programs}
for semester, program_part_students in semester_program_part_students.items():
for program, part_students in program_part_students.items():
if len(part_students) % 2 == 1:
# Add the first 3 students into a special group for an uneven number of students
group_part_students = []
for i in range(3):
if len(part_students) == 0:
break
group_part_students.append(part_students.pop(0))
group = db_add(Group.customInit(group_part_students))
program_groups[program].append(group)
while len(part_students) >= 2:
# Add the rest of the students into groups of 2
group = db_add(Group.customInit([part_students.pop(0) for i in range(2)]))
program_groups[program].append(group)
program_semester_experiments = {program: [] for program in programs}
all_semester_experiments = []
for ind, title in enumerate(experiment_titles):
program = programs[ind % len(programs)]
experiment = db_add(
Experiment(
number=ind + 1,
program=program,
title=title,
building="Not assigned",
room="Not assigned",
duration_in_days=2,
)
)
for semester in semesters:
semester_experiment = db_add(SemesterExperiment(experiment=experiment, semester=semester))
program_semester_experiments[program].append(semester_experiment)
all_semester_experiments.append(semester_experiment)
all_group_experiments = []
for program in programs:
semester_experiments = program_semester_experiments[program]
num_semester_experiments = len(semester_experiments)
for ind, group in enumerate(program_groups[program]):
num_tries = 0
while True:
try:
semester_experiment = semester_experiments[ind % num_semester_experiments]
group_experiment = db_add(
GroupExperiment(semester_experiment=semester_experiment, group=group)
)
except DatabaseException as ex:
# Catch an error when a student becomes the same experiment for the second time.
# Try the next experiment!
ind += 1
num_tries += 1
if num_tries == num_semester_experiments:
raise ex
else:
break
all_group_experiments.append(group_experiment)
adminRole = user_datastore.create_role(name="admin")
assistantRole = user_datastore.create_role(name="assistant")
admin_names = (
("Aileen", "Grant"),
("Ben", "Huber"),
)
for name in admin_names:
user = user_datastore.create_user(
email=f"{name[0]}-{name[1]}@uni.de".lower(),
password=hash_password("admin"),
roles=[adminRole],
first_name=name[0],
last_name=name[1],
active_semester=ws,
)
db_add(Admin(user=user))
num_assistants = len(assistant_names)
num_assistant_experiments = ceil(len(all_semester_experiments) / num_assistants)
for name in assistant_names:
user = user_datastore.create_user(
email=f"{name[0]}-{name[1]}@uni.de".lower(),
password=hash_password("assistant"),
roles=[assistantRole],
first_name=name[0],
last_name=name[1],
mobile_phone_number="01" + "".join(str(randint(0, 9)) for i in range(10)),
active_semester=ws,
)
semester_experiments = []
for i in range(num_assistant_experiments):
if len(all_semester_experiments) == 0:
break
semester_experiments.append(all_semester_experiments.pop(0))
db_add(Assistant(user=user, semester_experiments=semester_experiments))
for group_experiment in all_group_experiments:
semester_experiment = group_experiment.semester_experiment
special = False
semester = semester_experiment.semester
if semester.label == "SS":
month = randint(3, 8)
if month <= 4:
special = True
else:
month = randint(9, 12)
if month <= 10:
special = True
year = 2000 + semester.year
day = randint(1, 28)
for appointment_date in (date(year, month, day), date(year, month, day + 1)):
db_add(
Appointment(
date=appointment_date,
special=special,
group_experiment=group_experiment,
assistant=semester_experiment.assistants[0],
)
)
if __name__ == "__main__":
cli()