from datetime import datetime from pathlib import Path from flask import flash from sqlalchemy import select from .actions import backup from .exceptions import DatabaseImportException from .models import ( Appointment, Assistant, Experiment, Group, GroupExperiment, Part, PartStudent, Program, Semester, SemesterExperiment, Student, User, db, get_first, ) def is_null(entry): return entry in {"NULL", ""} def nullable(entry): if is_null(entry): return None return entry.strip() def not_nullable(entry): if is_null(entry): raise DatabaseImportException("Unnullable entry is NULL!") return entry.strip() def importFromFile(filePath: Path): if filePath.name[-4:] != ".txt": raise DatabaseImportException( "The import file has to be a text file with txt extension (.txt at the end of the filename)!" ) semesters = {} parts = {} students = {} groups = {} partStudents = {} experiments = {} groupExperiments = {} appointments = {} with filePath.open() as f: flash("Reading file...") expectingTable = True readHeader = False for line in f: line = line[:-1] if line == "": expectingTable = True continue if expectingTable: if line[0] != "#": raise DatabaseImportException(f"Expected a Table name starting with # but got this line: {line}") expectingTable = False tableName = line[1:] if tableName == "Semester": activeDict = semesters elif tableName == "Part": activeDict = parts elif tableName == "Student": activeDict = students elif tableName == "Group": activeDict = groups elif tableName == "PartStudent": activeDict = partStudents elif tableName == "Experiment": activeDict = experiments elif tableName == "GroupExperiment": activeDict = groupExperiments elif tableName == "Appointment": activeDict = appointments else: raise DatabaseImportException(f"{tableName} is not a valid table name!") readHeader = True continue cells = line.split("\t") if readHeader: readHeader = False activeDict["_header"] = cells for cell in cells: activeDict[cell] = [] continue cellsLen = len(cells) if cellsLen != len(activeDict["_header"]): raise DatabaseImportException( f"The number of header cells is not equal to the number of row cells in row {cells}!" ) for i in range(cellsLen): activeDict[activeDict["_header"][i]].append(cells[i]) try: # Semester flash("Semester...") if len(semesters["label"]) != 1: raise DatabaseImportException("Only one semester is allowed in an import file!") semesterLabel = not_nullable(semesters["label"][0]) semesterYear = int(not_nullable(semesters["year"][0])) dbSemester = get_first(select(Semester).where(Semester.label == semesterLabel, Semester.year == semesterYear)) if dbSemester is None: raise DatabaseImportException( f"{semesterLabel}{semesterYear} does not exist in the database! Please make sure that you create the semester from the web interface first." ) # Part flash("Part...") dbParts = {} for i, id in enumerate(parts["id"]): id = int(not_nullable(id)) partNumber = int(not_nullable(parts["number"][i])) partProgramLabel = not_nullable(parts["program_label"][i]) dbPart = get_first( select(Part) .join(Program) .where( Part.number == partNumber, Program.label == partProgramLabel, Part.semester == dbSemester, ) ) if dbPart is None: raise DatabaseImportException( f"Part with number {partNumber} and program label {partProgramLabel} does not exist in the database! Please make sure that you create parts from the web interface first." ) dbParts[id] = dbPart # Student flash("Student...") dbStudents = {} for i, student_number in enumerate(students["student_number"]): student_number = int(not_nullable(student_number)) first_name = not_nullable(students["first_name"][i]) last_name = not_nullable(students["last_name"][i]) uni_email = not_nullable(students["uni_email"][i]).lower() contact_email = nullable(students["contact_email"][i]) if contact_email is not None: contact_email = contact_email.lower() bachelor_thesis = nullable(students["bachelor_thesis"][i]) bachelor_thesis_work_group = nullable(students["bachelor_thesis_work_group"][i]) note = nullable(students["note"][i]) dbStudent = get_first(select(Student).where(Student.student_number == student_number)) if dbStudent is None: dbStudent = Student( student_number=student_number, first_name=first_name, last_name=last_name, uni_email=uni_email, contact_email=contact_email, bachelor_thesis=bachelor_thesis, bachelor_thesis_work_group=bachelor_thesis_work_group, note=note, ) db.session.add(dbStudent) else: # Check if columns that should not change match if dbStudent.first_name != first_name: flash( f'First name "{dbStudent.first_name}" in the database does not match with the first name "{first_name}" provided in the import file for the student number {student_number}.', "warning", ) if dbStudent.last_name != last_name: flash( f'Last name "{dbStudent.last_name}" in the database does not match with the last name "{last_name}" provided in the import file for the student number {student_number}.', "warning", ) if dbStudent.uni_email != uni_email: flash( f'University email "{dbStudent.uni_email}" in the database does not match with the university email "{last_name}" provided in the import file for the student number {student_number}.', "warning", ) dbStudent.contact_email = contact_email # Only overwrite if set if bachelor_thesis is not None: dbStudent.bachelor_thesis = bachelor_thesis if bachelor_thesis_work_group is not None: dbStudent.bachelor_thesis_work_group = bachelor_thesis_work_group # Append to note instead of overwriting if note is not None: if dbStudent.note is None: dbStudent.note = note dbStudent.note += "\n" + note dbStudents[student_number] = dbStudent # Group flash("Group...") dbGroups = {} for i, id in enumerate(groups["id"]): id = int(not_nullable(id)) program = get_first(select(Program).where(Program.label == not_nullable(groups["program_label"][i]))) dbGroup = Group( number=int(not_nullable(groups["number"][i])), program=program, semester=dbSemester, ) db.session.add(dbGroup) dbGroups[id] = dbGroup # PartStudent flash("PartStudent...") for i, student_number in enumerate(partStudents["student_number"]): student_number = int(not_nullable(student_number)) dbPartStudent = PartStudent( student=dbStudents[student_number], part=dbParts[int(not_nullable(partStudents["part_id"][i]))], group=dbGroups[int(not_nullable(partStudents["group_id"][i]))], ) db.session.add(dbPartStudent) # Experiment flash("Experiment...") dbSemesterExperiments = {} for i, id in enumerate(experiments["id"]): id = int(not_nullable(id)) experimentNumber = int(not_nullable(experiments["number"][i])) experimentProgram = get_first( select(Program).where(Program.label == not_nullable(experiments["program_label"][i])) ) dbExperiment = get_first( select(Experiment).where(Experiment.number == experimentNumber, Experiment.program == experimentProgram) ) if dbExperiment is None: # TODO: Check if experimentProgram is None raise DatabaseImportException( f"Experiment with number {experimentNumber} and program {experimentProgram} does not exist in the database. Please make sure that experiments are created from the web interface." ) dbSemesterExperiment = get_first( select(SemesterExperiment).where( SemesterExperiment.experiment == dbExperiment, SemesterExperiment.semester == dbSemester ) ) if dbSemesterExperiment is None: raise DatabaseImportException( f"No semester experiment exists in the database to the experiment with number {experimentNumber} and program {experimentProgram}. Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester" ) dbSemesterExperiments[id] = dbSemesterExperiment # GroupExperiment flash("GroupExperiment...") dbGroupExperiments = {} for i, id in enumerate(groupExperiments["id"]): id = int(not_nullable(id)) dbGroupExperiment = GroupExperiment( semester_experiment=dbSemesterExperiments[int(not_nullable(groupExperiments["experiment_id"][i]))], group=dbGroups[int(not_nullable(groupExperiments["group_id"][i]))], ) db.session.add(dbGroupExperiment) dbGroupExperiments[id] = dbGroupExperiment # Appointment flash("Appointment...") for i, date in enumerate(appointments["date"]): date = not_nullable(date) assistantEmail = not_nullable(appointments["assistant_email"][i]).lower() assistant = get_first(select(Assistant).join(User).where(User.email == assistantEmail)) if assistant is None: raise DatabaseImportException( f"Assistant with email {assistantEmail} does not exist in the database! Please make sure that you create assistants in the web interface." ) dbAppointment = Appointment( date=datetime.strptime(date, "%d.%m.%Y").date(), special=bool(int(not_nullable(appointments["special"][i]))), group_experiment=dbGroupExperiments[int(not_nullable(appointments["group_experiment_id"][i]))], assistant=assistant, ) db.session.add(dbAppointment) backup(f"before_{dbSemester}_import") db.session.commit() except Exception as ex: db.session.rollback() raise ex backup(f"after_{dbSemester}_import") flash("\nImport done!", "success") flash("Please check that everything worked as expected. Restore the database backup otherwise!", "warning")