1
0
Fork 0
mirror of https://codeberg.org/Mo8it/AdvLabDB.git synced 2024-11-08 21:21:06 +00:00
AdvLabDB/advlabdb/database_import.py
2022-05-08 21:26:25 +02:00

287 lines
10 KiB
Python

from datetime import datetime
from os import environ
from pathlib import Path
from shutil import copy2
from flask import flash, has_request_context
from . import db
from .exceptions import DataBaseImportException
from .models import (
Appointment,
Assistant,
Experiment,
Group,
GroupExperiment,
Part,
PartStudent,
Program,
Semester,
SemesterExperiment,
Student,
User,
)
relative_db_dir = Path(environ["RELATIVE_DB_DIR"])
relative_db_path = relative_db_dir / "adblab.db"
relative_db_bk_dir = relative_db_dir / "backups"
relative_db_bk_dir.mkdir(exist_ok=True)
def now():
return datetime.now().strftime("%d_%m_%Y_%H_%M_%S")
def importFromFile(filePath):
if filePath[-4:] != ".txt":
raise DataBaseImportException(
"The import file has to be a text file with txt extension (.txt at the end of the filename)!"
)
semesters = {}
parts = {}
students = {}
groups = {}
partStudents = {}
experiments = {}
groupExperiments = {}
appointments = {}
with open(filePath, "r") as f: # encoding="iso-8859-15"
if has_request_context():
show = flash
else:
show = print
show("Reading file...")
expectingTable = True
readHeader = False
activeDict = None
for line in f:
line = line[:-1]
if not line:
expectingTable = True
continue
if expectingTable:
if line[0] == "#":
expectingTable = False
tableName = line[1:]
if tableName == "Semester":
activeDict = semesters
elif tableName == "Part":
activeDict = parts
elif tableName == "Student":
activeDict = students
elif tableName == "Group":
activeDict = groups
elif tableName == "PartStudent":
activeDict = partStudents
elif tableName == "Experiment":
activeDict = experiments
elif tableName == "GroupExperiment":
activeDict = groupExperiments
elif tableName == "Appointment":
activeDict = appointments
else:
raise DataBaseImportException(f"{tableName} is not a valid table name!")
readHeader = True
continue
else:
raise DataBaseImportException(f"Expected a Table name starting with # but got this line: {line}")
cells = line.split("\t")
if readHeader:
readHeader = False
activeDict["_header"] = cells
for cell in cells:
activeDict[cell] = []
continue
cellsLen = len(cells)
if cellsLen == len(activeDict["_header"]):
for i in range(cellsLen):
activeDict[activeDict["_header"][i]].append(cells[i])
else:
raise DataBaseImportException(
f"The number of header cells is not equal to the number of row cells in row {cells}!"
)
db.session.rollback()
with db.session.begin():
# Semester
show("Semester...")
if len(semesters["label"]) * len(semesters["year"]) != 1:
raise DataBaseImportException("Only one semester is allowed in an import file!")
semesterLabel = semesters["label"][0]
semesterYear = int(semesters["year"][0])
dbSemester = Semester.query.filter(Semester.label == semesterLabel, Semester.year == semesterYear).first()
if not dbSemester:
raise DataBaseImportException(
f"{semesterLabel}{semesterYear} does not exist in the database! Please make sure that you create the semester from the web interface first."
)
# Part
show("Part...")
dbParts = {}
for i, id in enumerate(parts["id"]):
id = int(id)
partNumber = int(parts["number"][i])
partProgramLabel = parts["program_label"][i]
dbPart = Part.query.filter(
Part.number == partNumber,
Part.program.has(Program.label == partProgramLabel),
Part.semester == dbSemester,
).first()
if not dbPart:
raise DataBaseImportException(
f"Part with number {partNumber} and program_label {partProgramLabel} does not exist in the database! Please make sure that you create parts from the web interface first."
)
dbParts[id] = dbPart
# Student
show("Student...")
dbStudents = {}
for i, studentNumber in enumerate(students["student_number"]):
studentNumber = int(studentNumber)
dbStudent = Student.query.filter(Student.student_number == studentNumber).first()
if not dbStudent:
dbStudent = Student(
student_number=studentNumber,
first_name=students["first_name"][i],
last_name=students["last_name"][i],
uni_email=students["uni_email"][i],
contact_email=students["contact_email"][i] or None,
bachelor_thesis=students["bachelor_thesis"][i] or None,
bachelor_thesis_work_group=students["bachelor_thesis_work_group"][i] or None,
note=students["note"][i] or None,
)
db.session.add(dbStudent)
else:
dbStudent.contact_email = students["contact_email"][i] or None
dbStudent.bachelor_thesis = students["bachelor_thesis"][i] or None
dbStudent.bachelor_thesis_work_group = students["bachelor_thesis_work_group"][i] or None
dbStudent.note = students["note"][i] or None
dbStudents[studentNumber] = dbStudent
# Group
show("Group...")
dbGroups = {}
for i, id in enumerate(groups["id"]):
id = int(id)
dbGroup = Group(
number=int(groups["number"][i]),
program=Program.query.filter(Program.label == groups["program_label"][i]).first(),
semester=dbSemester,
)
db.session.add(dbGroup)
dbGroups[id] = dbGroup
# PartStudent
show("PartStudent...")
for i, studentNumber in enumerate(partStudents["student_number"]):
studentNumber = int(studentNumber)
dbPartStudent = PartStudent.customInit(
student=dbStudents[studentNumber],
part=dbParts[int(partStudents["part_id"][i])],
group=dbGroups[int(partStudents["group_id"][i])],
)
db.session.add(dbPartStudent)
# Experiment
show("Experiment...")
dbSemesterExperiments = {}
for i, id in enumerate(experiments["id"]):
id = int(id)
experimentNumber = int(experiments["number"][i])
experimentProgram = Program.query.filter(Program.label == experiments["program_label"][i]).first()
dbExperiment = Experiment.query.filter(
Experiment.number == experimentNumber, Experiment.program == experimentProgram
).first()
if not dbExperiment:
raise DataBaseImportException(
f"Experiment with number {experimentNumber} and program {experimentProgram.repr()} does not exist in the database. Please make sure that experiments are created from the web interface."
)
dbSemesterExperiment = SemesterExperiment.query.filter(
SemesterExperiment.experiment == dbExperiment, SemesterExperiment.semester == dbSemester
).first()
if not dbSemesterExperiment:
raise DataBaseImportException(
f"No semester experiment exists in the database to the experiment with number {experimentNumber} and program {experimentProgram.repr()}. Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester"
)
dbSemesterExperiments[id] = dbSemesterExperiment
# GroupExperiment
show("GroupExperiment...")
dbGroupExperiments = {}
for i, id in enumerate(groupExperiments["id"]):
id = int(id)
dbGroupExperiment = GroupExperiment.customInit(
semester_experiment=dbSemesterExperiments[int(groupExperiments["experiment_id"][i])],
group=dbGroups[int(groupExperiments["group_id"][i])],
)
db.session.add(dbGroupExperiment)
dbGroupExperiments[id] = dbGroupExperiment
# Appointment
show("Appointment...")
for i, date in enumerate(appointments["date"]):
assistantEmail = appointments["assistant_email"][i]
assistant = Assistant.query.filter(Assistant.user.has(User.email == assistantEmail)).first()
if not assistant:
raise DataBaseImportException(
f"Assistant with email {email} does not exist in the database! Please make sure that you create assistants in the web interface."
)
dbAppointment = Appointment.customInit(
date=datetime.strptime(date, "%d.%m.%Y").date(),
special=bool(int(appointments["special"][i])),
group_experiment=dbGroupExperiments[int(appointments["group_experiment_id"][i])],
assistant=assistant,
)
db.session.add(dbAppointment)
# Backup
dest = relative_db_bk_dir / f"before_{dbSemester.repr()}_import_{now()}.db"
copy2(relative_db_path, dest)
show(f"Made a backup of the database before the import at {dest}")
# Auto commit from the transaction context
dest = relative_db_bk_dir / f"after_{dbSemester.repr()}_import_{now()}.db"
copy2(relative_db_path, dest)
show(f"Made a backup of the database after the import at {dest}")
show("\nImport done!")
show("Please check that everything worked as expected. Restore the database backup otherwise!")