mirror of
https://codeberg.org/Mo8it/AdvLabDB.git
synced 2024-11-08 21:21:06 +00:00
Added database_import
This commit is contained in:
parent
57bf4013f1
commit
071a5e94b5
2 changed files with 277 additions and 1 deletions
276
advlabdb/database_import.py
Normal file
276
advlabdb/database_import.py
Normal file
|
@ -0,0 +1,276 @@
|
||||||
|
from flask import has_request_context, flash
|
||||||
|
from datetime import datetime
|
||||||
|
from shutil import copy2
|
||||||
|
|
||||||
|
from advlabdb import app, db
|
||||||
|
from advlabdb.models import (
|
||||||
|
Part,
|
||||||
|
Student,
|
||||||
|
Group,
|
||||||
|
PartStudent,
|
||||||
|
Experiment,
|
||||||
|
GroupExperiment,
|
||||||
|
Appointment,
|
||||||
|
Semester,
|
||||||
|
Program,
|
||||||
|
SemesterExperiment,
|
||||||
|
Assistant,
|
||||||
|
User,
|
||||||
|
)
|
||||||
|
from advlabdb.exceptions import DataBaseImportException
|
||||||
|
|
||||||
|
|
||||||
|
def importFromFile(filePath):
|
||||||
|
if filePath[-4:] != ".txt":
|
||||||
|
raise DataBaseImportException(
|
||||||
|
"The import file has to be a text file with txt extention (.txt at the end of the filename)!"
|
||||||
|
)
|
||||||
|
|
||||||
|
semesters = {}
|
||||||
|
parts = {}
|
||||||
|
students = {}
|
||||||
|
groups = {}
|
||||||
|
partStudents = {}
|
||||||
|
experiments = {}
|
||||||
|
groupExperiments = {}
|
||||||
|
appointments = {}
|
||||||
|
|
||||||
|
with open(filePath, "r") as f: # encoding="iso-8859-15"
|
||||||
|
if has_request_context():
|
||||||
|
show = flash
|
||||||
|
else:
|
||||||
|
show = print
|
||||||
|
|
||||||
|
show("Reading file...")
|
||||||
|
|
||||||
|
expectingTable = True
|
||||||
|
readHeader = False
|
||||||
|
|
||||||
|
activeDict = None
|
||||||
|
|
||||||
|
for line in f:
|
||||||
|
line = line[:-1]
|
||||||
|
|
||||||
|
if not line:
|
||||||
|
expectingTable = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
if expectingTable:
|
||||||
|
if line[0] == "#":
|
||||||
|
expectingTable = False
|
||||||
|
tableName = line[1:]
|
||||||
|
|
||||||
|
if tableName == "Semester":
|
||||||
|
activeDict = semesters
|
||||||
|
elif tableName == "Part":
|
||||||
|
activeDict = parts
|
||||||
|
elif tableName == "Student":
|
||||||
|
activeDict = students
|
||||||
|
elif tableName == "Group":
|
||||||
|
activeDict = groups
|
||||||
|
elif tableName == "PartStudent":
|
||||||
|
activeDict = partStudents
|
||||||
|
elif tableName == "Experiment":
|
||||||
|
activeDict = experiments
|
||||||
|
elif tableName == "GroupExperiment":
|
||||||
|
activeDict = groupExperiments
|
||||||
|
elif tableName == "Appointment":
|
||||||
|
activeDict = appointments
|
||||||
|
else:
|
||||||
|
raise DataBaseImportException(f"{tableName} is not a valide table name!")
|
||||||
|
|
||||||
|
readHeader = True
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise DataBaseImportException(f"Expected a Table name starting with # but got this line: {line}")
|
||||||
|
|
||||||
|
cells = line.split("\t")
|
||||||
|
|
||||||
|
if readHeader:
|
||||||
|
readHeader = False
|
||||||
|
activeDict["_header"] = cells
|
||||||
|
|
||||||
|
for cell in cells:
|
||||||
|
activeDict[cell] = []
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
cellsLen = len(cells)
|
||||||
|
if cellsLen == len(activeDict["_header"]):
|
||||||
|
for i in range(cellsLen):
|
||||||
|
activeDict[activeDict["_header"][i]].append(cells[i])
|
||||||
|
else:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"The number of header cells is not equal to the number of row cells in row {cells}!"
|
||||||
|
)
|
||||||
|
|
||||||
|
db.session.rollback()
|
||||||
|
with db.session.begin():
|
||||||
|
# Semester
|
||||||
|
show("Semester...")
|
||||||
|
|
||||||
|
if len(semesters["label"]) * len(semesters["year"]) != 1:
|
||||||
|
raise DataBaseImportException("Only one semester is allowed in an import file!")
|
||||||
|
|
||||||
|
semesterLabel = semesters["label"][0]
|
||||||
|
semesterYear = int(semesters["year"][0])
|
||||||
|
dbSemester = Semester.query.filter(Semester.label == semesterLabel, Semester.year == semesterYear).first()
|
||||||
|
|
||||||
|
if not dbSemester:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"{semesterLabel}{semesterYear} does not exist in the database! Please make sure that you create the semester from the web interface first."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Part
|
||||||
|
show("Part...")
|
||||||
|
|
||||||
|
dbParts = {}
|
||||||
|
for i, id in enumerate(parts["id"]):
|
||||||
|
id = int(id)
|
||||||
|
partNumber = int(parts["number"][i])
|
||||||
|
partProgramLabel = parts["program_label"][i]
|
||||||
|
dbPart = Part.query.filter(
|
||||||
|
Part.number == partNumber,
|
||||||
|
Part.program.has(Program.label == partProgramLabel),
|
||||||
|
Part.semester == dbSemester,
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not dbPart:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"Part with number {partNumber} and program_label {partProgramLabel} does not exist in the database! Please make sure that you create parts from the web interface first."
|
||||||
|
)
|
||||||
|
|
||||||
|
dbParts[id] = dbPart
|
||||||
|
|
||||||
|
# Student
|
||||||
|
show("Student...")
|
||||||
|
|
||||||
|
dbStudents = {}
|
||||||
|
for i, studentNumber in enumerate(students["student_number"]):
|
||||||
|
studentNumber = int(studentNumber)
|
||||||
|
dbStudent = Student.query.filter(Student.student_number == studentNumber).first()
|
||||||
|
|
||||||
|
if not dbStudent:
|
||||||
|
dbStudent = Student(
|
||||||
|
student_number=studentNumber,
|
||||||
|
first_name=students["first_name"][i],
|
||||||
|
last_name=students["last_name"][i],
|
||||||
|
uni_email=students["uni_email"][i],
|
||||||
|
contact_email=students["contact_email"][i] or None,
|
||||||
|
bachelor_thesis=students["bachelor_thesis"][i] or None,
|
||||||
|
bachelor_thesis_work_group=students["bachelor_thesis_work_group"][i] or None,
|
||||||
|
note=students["note"][i] or None,
|
||||||
|
)
|
||||||
|
db.session.add(dbStudent)
|
||||||
|
else:
|
||||||
|
dbStudent.contact_email = students["contact_email"][i] or None
|
||||||
|
dbStudent.bachelor_thesis = students["bachelor_thesis"][i] or None
|
||||||
|
dbStudent.bachelor_thesis_work_group = students["bachelor_thesis_work_group"][i] or None
|
||||||
|
dbStudent.note = students["note"][i] or None
|
||||||
|
|
||||||
|
dbStudents[studentNumber] = dbStudent
|
||||||
|
|
||||||
|
# Group
|
||||||
|
show("Group...")
|
||||||
|
|
||||||
|
dbGroups = {}
|
||||||
|
for i, id in enumerate(groups["id"]):
|
||||||
|
id = int(id)
|
||||||
|
dbGroup = Group(
|
||||||
|
number=int(groups["number"][i]),
|
||||||
|
program=Program.query.filter(Program.label == groups["program_label"][i]).first(),
|
||||||
|
semester=dbSemester,
|
||||||
|
)
|
||||||
|
db.session.add(dbGroup)
|
||||||
|
dbGroups[id] = dbGroup
|
||||||
|
|
||||||
|
# PartStudent
|
||||||
|
show("PartStudent...")
|
||||||
|
|
||||||
|
for i, studentNumber in enumerate(partStudents["student_number"]):
|
||||||
|
studentNumber = int(studentNumber)
|
||||||
|
dbPartStudent = PartStudent.customInit(
|
||||||
|
student=dbStudents[studentNumber],
|
||||||
|
part=dbParts[int(partStudents["part_id"][i])],
|
||||||
|
group=dbGroups[int(partStudents["group_id"][i])],
|
||||||
|
)
|
||||||
|
db.session.add(dbPartStudent)
|
||||||
|
|
||||||
|
# Experiment
|
||||||
|
show("Experiment...")
|
||||||
|
|
||||||
|
dbSemesterExperiments = {}
|
||||||
|
for i, id in enumerate(experiments["id"]):
|
||||||
|
id = int(id)
|
||||||
|
experimentNumber = int(experiments["number"][i])
|
||||||
|
experimentProgram = Program.query.filter(Program.label == experiments["program_label"][i]).first()
|
||||||
|
dbExperiment = Experiment.query.filter(
|
||||||
|
Experiment.number == experimentNumber, Experiment.program == experimentProgram
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not dbExperiment:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"Experiment with number {experimentNumber} and program {experimentProgram.repr()} does not exist in the database. Please make sure that experiments are created from the web interface."
|
||||||
|
)
|
||||||
|
|
||||||
|
dbSemesterExperiment = SemesterExperiment.query.filter(
|
||||||
|
SemesterExperiment.experiment == dbExperiment, SemesterExperiment.semester == dbSemester
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not dbSemesterExperiment:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"No semester experiment exists in the database to the experiment with number {experimentNumber} and program {experimentProgram.repr()}. Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester"
|
||||||
|
)
|
||||||
|
|
||||||
|
dbSemesterExperiments[id] = dbSemesterExperiment
|
||||||
|
|
||||||
|
# GroupExperiment
|
||||||
|
show("GroupExperiment...")
|
||||||
|
|
||||||
|
dbGroupExperiments = {}
|
||||||
|
for i, id in enumerate(groupExperiments["id"]):
|
||||||
|
id = int(id)
|
||||||
|
dbGroupExperiment = GroupExperiment.customInit(
|
||||||
|
semester_experiment=dbSemesterExperiments[int(groupExperiments["experiment_id"][i])],
|
||||||
|
group=dbGroups[int(groupExperiments["group_id"][i])],
|
||||||
|
)
|
||||||
|
db.session.add(dbGroupExperiment)
|
||||||
|
dbGroupExperiments[id] = dbGroupExperiment
|
||||||
|
|
||||||
|
# Appointment
|
||||||
|
show("Appointment...")
|
||||||
|
|
||||||
|
for i, date in enumerate(appointments["date"]):
|
||||||
|
assistantEmail = appointments["assistant_email"][i]
|
||||||
|
assistant = Assistant.query.filter(Assistant.user.has(User.email == assistantEmail)).first()
|
||||||
|
|
||||||
|
if not assistant:
|
||||||
|
raise DataBaseImportException(
|
||||||
|
f"Assistant with email {email} does not exist in the database! Please make sure that you create assistants in the web interface."
|
||||||
|
)
|
||||||
|
|
||||||
|
dbAppointment = Appointment.customInit(
|
||||||
|
date=datetime.strptime(date, "%d.%m.%Y").date(),
|
||||||
|
special=bool(int(appointments["special"][i])),
|
||||||
|
group_experiment=dbGroupExperiments[int(appointments["group_experiment_id"][i])],
|
||||||
|
assistant=assistant,
|
||||||
|
)
|
||||||
|
db.session.add(dbAppointment)
|
||||||
|
|
||||||
|
# Backup
|
||||||
|
src = "advLab.db"
|
||||||
|
dest = f"db_backups/before_{dbSemester.repr()}_import_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.db"
|
||||||
|
copy2(src, dest)
|
||||||
|
|
||||||
|
show(f"Made a backup of the database before the import at {dest}")
|
||||||
|
|
||||||
|
# Auto commit from the transaction context
|
||||||
|
|
||||||
|
src = "advLab.db"
|
||||||
|
dest = f"db_backups/after_{dbSemester.repr()}_import_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.db"
|
||||||
|
copy2(src, dest)
|
||||||
|
|
||||||
|
show(f"Made a backup of the database after the import at {dest}")
|
||||||
|
|
||||||
|
show("\nImport done!")
|
||||||
|
show("Please check that everything worked as expected. Restore the database backup otherwise!")
|
|
@ -7,7 +7,7 @@ https://flask-sqlalchemy.palletsprojects.com/en/2.x/models/
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Imports
|
# Imports
|
||||||
from flask import flash, has_request_context
|
from flask import flash
|
||||||
from flask_security import current_user
|
from flask_security import current_user
|
||||||
from flask_security.models.fsqla_v2 import FsRoleMixin, FsUserMixin
|
from flask_security.models.fsqla_v2 import FsRoleMixin, FsUserMixin
|
||||||
from decimal import Decimal, ROUND_HALF_UP
|
from decimal import Decimal, ROUND_HALF_UP
|
||||||
|
|
Loading…
Reference in a new issue