from datetime import datetime from pathlib import Path from shutil import copy2 from flask import flash from sqlalchemy import select from . import settings from .exceptions import DataBaseImportException from .models import ( Appointment, Assistant, Experiment, Group, GroupExperiment, Part, PartStudent, Program, Semester, SemesterExperiment, Student, User, db, get_first, ) def now(): return datetime.now().strftime("%d_%m_%Y_%H_%M_%S") def is_null(entry): return entry == "NULL" or entry == "" def nullable(entry): if is_null(entry): return None return entry def not_nullable(entry): if is_null(entry): raise DataBaseImportException("Unnullable entry is NULL!") return entry def importFromFile(filePath): db_path = Path(settings["SQLITE_DB_PATH"]) db_bk_dir = db_path.parent / "backups" db_bk_dir.mkdir(exist_ok=True) if filePath[-4:] != ".txt": raise DataBaseImportException( "The import file has to be a text file with txt extension (.txt at the end of the filename)!" ) semesters = {} parts = {} students = {} groups = {} partStudents = {} experiments = {} groupExperiments = {} appointments = {} with open(filePath) as f: flash("Reading file...") expectingTable = True readHeader = False for line in f: line = line[:-1] if line == "": expectingTable = True continue if expectingTable: if line[0] != "#": raise DataBaseImportException(f"Expected a Table name starting with # but got this line: {line}") expectingTable = False tableName = line[1:] if tableName == "Semester": activeDict = semesters elif tableName == "Part": activeDict = parts elif tableName == "Student": activeDict = students elif tableName == "Group": activeDict = groups elif tableName == "PartStudent": activeDict = partStudents elif tableName == "Experiment": activeDict = experiments elif tableName == "GroupExperiment": activeDict = groupExperiments elif tableName == "Appointment": activeDict = appointments else: raise DataBaseImportException(f"{tableName} is not a valid table name!") readHeader = True continue cells = line.split("\t") if readHeader: readHeader = False activeDict["_header"] = cells for cell in cells: activeDict[cell] = [] continue cellsLen = len(cells) if cellsLen != len(activeDict["_header"]): raise DataBaseImportException( f"The number of header cells is not equal to the number of row cells in row {cells}!" ) for i in range(cellsLen): activeDict[activeDict["_header"][i]].append(cells[i]) try: # Semester flash("Semester...") if len(semesters["label"]) != 1: raise DataBaseImportException("Only one semester is allowed in an import file!") semesterLabel = not_nullable(semesters["label"][0]) semesterYear = int(not_nullable(semesters["year"][0])) dbSemester = get_first(select(Semester).where(Semester.label == semesterLabel, Semester.year == semesterYear)) if dbSemester is None: raise DataBaseImportException( f"{semesterLabel}{semesterYear} does not exist in the database! Please make sure that you create the semester from the web interface first." ) # Part flash("Part...") dbParts = {} for i, id in enumerate(parts["id"]): id = int(not_nullable(id)) partNumber = int(not_nullable(parts["number"][i])) partProgramLabel = not_nullable(parts["program_label"][i]) dbPart = get_first( select(Part) .join(Program) .where( Part.number == partNumber, Program.label == partProgramLabel, Part.semester == dbSemester, ) ) if dbPart is None: raise DataBaseImportException( f"Part with number {partNumber} and program label {partProgramLabel} does not exist in the database! Please make sure that you create parts from the web interface first." ) dbParts[id] = dbPart # Student flash("Student...") dbStudents = {} for i, studentNumber in enumerate(students["student_number"]): studentNumber = int(not_nullable(studentNumber)) dbStudent = get_first(select(Student).where(Student.student_number == studentNumber)) if dbStudent is None: dbStudent = Student( student_number=studentNumber, first_name=not_nullable(students["first_name"][i]), last_name=not_nullable(students["last_name"][i]), uni_email=not_nullable(students["uni_email"][i]), contact_email=nullable(students["contact_email"][i]), bachelor_thesis=nullable(students["bachelor_thesis"][i]), bachelor_thesis_work_group=nullable(students["bachelor_thesis_work_group"][i]), note=nullable(students["note"][i]), ) db.session.add(dbStudent) else: dbStudent.contact_email = nullable(students["contact_email"][i]) dbStudent.bachelor_thesis = nullable(students["bachelor_thesis"][i]) dbStudent.bachelor_thesis_work_group = nullable(students["bachelor_thesis_work_group"][i]) dbStudent.note = nullable(students["note"][i]) dbStudents[studentNumber] = dbStudent # Group flash("Group...") dbGroups = {} for i, id in enumerate(groups["id"]): id = int(not_nullable(id)) program = get_first(select(Program).where(Program.label == not_nullable(groups["program_label"][i]))) dbGroup = Group( number=int(not_nullable(groups["number"][i])), program=program, semester=dbSemester, ) db.session.add(dbGroup) dbGroups[id] = dbGroup # PartStudent flash("PartStudent...") for i, studentNumber in enumerate(partStudents["student_number"]): studentNumber = int(not_nullable(studentNumber)) dbPartStudent = PartStudent( student=dbStudents[studentNumber], part=dbParts[int(not_nullable(partStudents["part_id"][i]))], group=dbGroups[int(not_nullable(partStudents["group_id"][i]))], ) db.session.add(dbPartStudent) # Experiment flash("Experiment...") dbSemesterExperiments = {} for i, id in enumerate(experiments["id"]): id = int(not_nullable(id)) experimentNumber = int(not_nullable(experiments["number"][i])) experimentProgram = get_first( select(Program).where(Program.label == not_nullable(experiments["program_label"][i])) ) dbExperiment = get_first( select(Experiment).where(Experiment.number == experimentNumber, Experiment.program == experimentProgram) ) if dbExperiment is None: # TODO: Check if experimentProgram is None raise DataBaseImportException( f"Experiment with number {experimentNumber} and program {experimentProgram} does not exist in the database. Please make sure that experiments are created from the web interface." ) dbSemesterExperiment = get_first( select(SemesterExperiment).where( SemesterExperiment.experiment == dbExperiment, SemesterExperiment.semester == dbSemester ) ) if dbSemesterExperiment is None: raise DataBaseImportException( f"No semester experiment exists in the database to the experiment with number {experimentNumber} and program {experimentProgram}. Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester" ) dbSemesterExperiments[id] = dbSemesterExperiment # GroupExperiment flash("GroupExperiment...") dbGroupExperiments = {} for i, id in enumerate(groupExperiments["id"]): id = int(not_nullable(id)) dbGroupExperiment = GroupExperiment( semester_experiment=dbSemesterExperiments[int(not_nullable(groupExperiments["experiment_id"][i]))], group=dbGroups[int(not_nullable(groupExperiments["group_id"][i]))], ) db.session.add(dbGroupExperiment) dbGroupExperiments[id] = dbGroupExperiment # Appointment flash("Appointment...") for i, date in enumerate(appointments["date"]): date = not_nullable(date) assistantEmail = not_nullable(appointments["assistant_email"][i]) assistant = get_first(select(Assistant).join(User).where(User.email == assistantEmail)) if assistant is None: raise DataBaseImportException( f"Assistant with email {assistantEmail} does not exist in the database! Please make sure that you create assistants in the web interface." ) dbAppointment = Appointment( date=datetime.strptime(date, "%d.%m.%Y").date(), special=bool(int(not_nullable(appointments["special"][i]))), group_experiment=dbGroupExperiments[int(not_nullable(appointments["group_experiment_id"][i]))], assistant=assistant, ) db.session.add(dbAppointment) # Backup dest = db_bk_dir / f"before_{dbSemester}_import_{now()}.db" copy2(db_path, dest) flash(f"Made a backup of the database before committing the import at {dest}") db.session.commit() except Exception as ex: db.session.rollback() raise ex dest = db_bk_dir / f"after_{dbSemester}_import_{now()}.db" copy2(db_path, dest) flash(f"Made a backup of the database after the import at {dest}") flash("\nImport done!") flash("Please check that everything worked as expected. Restore the database backup otherwise!")