from flask import has_request_context, flash from datetime import datetime from shutil import copy2 from advlabdb import app, db from advlabdb.models import ( Part, Student, Group, PartStudent, Experiment, GroupExperiment, Appointment, Semester, Program, SemesterExperiment, Assistant, User, ) from advlabdb.exceptions import DataBaseImportException def importFromFile(filePath): if filePath[-4:] != ".txt": raise DataBaseImportException( "The import file has to be a text file with txt extention (.txt at the end of the filename)!" ) semesters = {} parts = {} students = {} groups = {} partStudents = {} experiments = {} groupExperiments = {} appointments = {} with open(filePath, "r") as f: # encoding="iso-8859-15" if has_request_context(): show = flash else: show = print show("Reading file...") expectingTable = True readHeader = False activeDict = None for line in f: line = line[:-1] if not line: expectingTable = True continue if expectingTable: if line[0] == "#": expectingTable = False tableName = line[1:] if tableName == "Semester": activeDict = semesters elif tableName == "Part": activeDict = parts elif tableName == "Student": activeDict = students elif tableName == "Group": activeDict = groups elif tableName == "PartStudent": activeDict = partStudents elif tableName == "Experiment": activeDict = experiments elif tableName == "GroupExperiment": activeDict = groupExperiments elif tableName == "Appointment": activeDict = appointments else: raise DataBaseImportException(f"{tableName} is not a valide table name!") readHeader = True continue else: raise DataBaseImportException(f"Expected a Table name starting with # but got this line: {line}") cells = line.split("\t") if readHeader: readHeader = False activeDict["_header"] = cells for cell in cells: activeDict[cell] = [] continue cellsLen = len(cells) if cellsLen == len(activeDict["_header"]): for i in range(cellsLen): activeDict[activeDict["_header"][i]].append(cells[i]) else: raise DataBaseImportException( f"The number of header cells is not equal to the number of row cells in row {cells}!" ) db.session.rollback() with db.session.begin(): # Semester show("Semester...") if len(semesters["label"]) * len(semesters["year"]) != 1: raise DataBaseImportException("Only one semester is allowed in an import file!") semesterLabel = semesters["label"][0] semesterYear = int(semesters["year"][0]) dbSemester = Semester.query.filter(Semester.label == semesterLabel, Semester.year == semesterYear).first() if not dbSemester: raise DataBaseImportException( f"{semesterLabel}{semesterYear} does not exist in the database! Please make sure that you create the semester from the web interface first." ) # Part show("Part...") dbParts = {} for i, id in enumerate(parts["id"]): id = int(id) partNumber = int(parts["number"][i]) partProgramLabel = parts["program_label"][i] dbPart = Part.query.filter( Part.number == partNumber, Part.program.has(Program.label == partProgramLabel), Part.semester == dbSemester, ).first() if not dbPart: raise DataBaseImportException( f"Part with number {partNumber} and program_label {partProgramLabel} does not exist in the database! Please make sure that you create parts from the web interface first." ) dbParts[id] = dbPart # Student show("Student...") dbStudents = {} for i, studentNumber in enumerate(students["student_number"]): studentNumber = int(studentNumber) dbStudent = Student.query.filter(Student.student_number == studentNumber).first() if not dbStudent: dbStudent = Student( student_number=studentNumber, first_name=students["first_name"][i], last_name=students["last_name"][i], uni_email=students["uni_email"][i], contact_email=students["contact_email"][i] or None, bachelor_thesis=students["bachelor_thesis"][i] or None, bachelor_thesis_work_group=students["bachelor_thesis_work_group"][i] or None, note=students["note"][i] or None, ) db.session.add(dbStudent) else: dbStudent.contact_email = students["contact_email"][i] or None dbStudent.bachelor_thesis = students["bachelor_thesis"][i] or None dbStudent.bachelor_thesis_work_group = students["bachelor_thesis_work_group"][i] or None dbStudent.note = students["note"][i] or None dbStudents[studentNumber] = dbStudent # Group show("Group...") dbGroups = {} for i, id in enumerate(groups["id"]): id = int(id) dbGroup = Group( number=int(groups["number"][i]), program=Program.query.filter(Program.label == groups["program_label"][i]).first(), semester=dbSemester, ) db.session.add(dbGroup) dbGroups[id] = dbGroup # PartStudent show("PartStudent...") for i, studentNumber in enumerate(partStudents["student_number"]): studentNumber = int(studentNumber) dbPartStudent = PartStudent.customInit( student=dbStudents[studentNumber], part=dbParts[int(partStudents["part_id"][i])], group=dbGroups[int(partStudents["group_id"][i])], ) db.session.add(dbPartStudent) # Experiment show("Experiment...") dbSemesterExperiments = {} for i, id in enumerate(experiments["id"]): id = int(id) experimentNumber = int(experiments["number"][i]) experimentProgram = Program.query.filter(Program.label == experiments["program_label"][i]).first() dbExperiment = Experiment.query.filter( Experiment.number == experimentNumber, Experiment.program == experimentProgram ).first() if not dbExperiment: raise DataBaseImportException( f"Experiment with number {experimentNumber} and program {experimentProgram.repr()} does not exist in the database. Please make sure that experiments are created from the web interface." ) dbSemesterExperiment = SemesterExperiment.query.filter( SemesterExperiment.experiment == dbExperiment, SemesterExperiment.semester == dbSemester ).first() if not dbSemesterExperiment: raise DataBaseImportException( f"No semester experiment exists in the database to the experiment with number {experimentNumber} and program {experimentProgram.repr()}. Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester" ) dbSemesterExperiments[id] = dbSemesterExperiment # GroupExperiment show("GroupExperiment...") dbGroupExperiments = {} for i, id in enumerate(groupExperiments["id"]): id = int(id) dbGroupExperiment = GroupExperiment.customInit( semester_experiment=dbSemesterExperiments[int(groupExperiments["experiment_id"][i])], group=dbGroups[int(groupExperiments["group_id"][i])], ) db.session.add(dbGroupExperiment) dbGroupExperiments[id] = dbGroupExperiment # Appointment show("Appointment...") for i, date in enumerate(appointments["date"]): assistantEmail = appointments["assistant_email"][i] assistant = Assistant.query.filter(Assistant.user.has(User.email == assistantEmail)).first() if not assistant: raise DataBaseImportException( f"Assistant with email {email} does not exist in the database! Please make sure that you create assistants in the web interface." ) dbAppointment = Appointment.customInit( date=datetime.strptime(date, "%d.%m.%Y").date(), special=bool(int(appointments["special"][i])), group_experiment=dbGroupExperiments[int(appointments["group_experiment_id"][i])], assistant=assistant, ) db.session.add(dbAppointment) # Backup # TODO: Check existing dirs src = "db/advLab.db" dest = f"db/backups/before_{dbSemester.repr()}_import_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.db" copy2(src, dest) show(f"Made a backup of the database before the import at {dest}") # Auto commit from the transaction context src = "db/advLab.db" dest = f"db/backups/after_{dbSemester.repr()}_import_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.db" copy2(src, dest) show(f"Made a backup of the database after the import at {dest}") show("\nImport done!") show("Please check that everything worked as expected. Restore the database backup otherwise!")