2021-09-11 18:47:48 +00:00
from datetime import datetime
2022-05-07 14:48:28 +00:00
from os import environ
from pathlib import Path
2021-09-11 18:47:48 +00:00
from shutil import copy2
2022-02-13 18:58:05 +00:00
from flask import flash , has_request_context
2021-09-11 18:47:48 +00:00
from advlabdb import app , db
2022-02-13 18:58:05 +00:00
from advlabdb . exceptions import DataBaseImportException
2021-09-11 18:47:48 +00:00
from advlabdb . models import (
2022-02-13 18:58:05 +00:00
Appointment ,
Assistant ,
2021-09-11 18:47:48 +00:00
Experiment ,
2022-02-13 18:58:05 +00:00
Group ,
2021-09-11 18:47:48 +00:00
GroupExperiment ,
2022-02-13 18:58:05 +00:00
Part ,
PartStudent ,
2021-09-11 18:47:48 +00:00
Program ,
2022-02-13 18:58:05 +00:00
Semester ,
2021-09-11 18:47:48 +00:00
SemesterExperiment ,
2022-02-13 18:58:05 +00:00
Student ,
2021-09-11 18:47:48 +00:00
User ,
)
2022-05-07 14:48:28 +00:00
relative_db_dir = Path ( environ [ " RELATIVE_DB_DIR " ] )
relative_db_path = relative_db_dir / " adblab.db "
relative_db_bk_dir = relative_db_dir / " backups "
relative_db_bk_dir . mkdir ( exist_ok = True )
def now ( ) :
return datetime . now ( ) . strftime ( " %d _ % m_ % Y_ % H_ % M_ % S " )
2021-09-11 18:47:48 +00:00
def importFromFile ( filePath ) :
if filePath [ - 4 : ] != " .txt " :
raise DataBaseImportException (
2022-03-19 21:22:23 +00:00
" The import file has to be a text file with txt extension (.txt at the end of the filename)! "
2021-09-11 18:47:48 +00:00
)
semesters = { }
parts = { }
students = { }
groups = { }
partStudents = { }
experiments = { }
groupExperiments = { }
appointments = { }
with open ( filePath , " r " ) as f : # encoding="iso-8859-15"
if has_request_context ( ) :
show = flash
else :
show = print
show ( " Reading file... " )
expectingTable = True
readHeader = False
activeDict = None
for line in f :
line = line [ : - 1 ]
if not line :
expectingTable = True
continue
if expectingTable :
if line [ 0 ] == " # " :
expectingTable = False
tableName = line [ 1 : ]
if tableName == " Semester " :
activeDict = semesters
elif tableName == " Part " :
activeDict = parts
elif tableName == " Student " :
activeDict = students
elif tableName == " Group " :
activeDict = groups
elif tableName == " PartStudent " :
activeDict = partStudents
elif tableName == " Experiment " :
activeDict = experiments
elif tableName == " GroupExperiment " :
activeDict = groupExperiments
elif tableName == " Appointment " :
activeDict = appointments
else :
2022-03-19 21:22:23 +00:00
raise DataBaseImportException ( f " { tableName } is not a valid table name! " )
2021-09-11 18:47:48 +00:00
readHeader = True
continue
else :
raise DataBaseImportException ( f " Expected a Table name starting with # but got this line: { line } " )
cells = line . split ( " \t " )
if readHeader :
readHeader = False
activeDict [ " _header " ] = cells
for cell in cells :
activeDict [ cell ] = [ ]
continue
cellsLen = len ( cells )
if cellsLen == len ( activeDict [ " _header " ] ) :
for i in range ( cellsLen ) :
activeDict [ activeDict [ " _header " ] [ i ] ] . append ( cells [ i ] )
else :
raise DataBaseImportException (
f " The number of header cells is not equal to the number of row cells in row { cells } ! "
)
db . session . rollback ( )
with db . session . begin ( ) :
# Semester
show ( " Semester... " )
if len ( semesters [ " label " ] ) * len ( semesters [ " year " ] ) != 1 :
raise DataBaseImportException ( " Only one semester is allowed in an import file! " )
semesterLabel = semesters [ " label " ] [ 0 ]
semesterYear = int ( semesters [ " year " ] [ 0 ] )
dbSemester = Semester . query . filter ( Semester . label == semesterLabel , Semester . year == semesterYear ) . first ( )
if not dbSemester :
raise DataBaseImportException (
f " { semesterLabel } { semesterYear } does not exist in the database! Please make sure that you create the semester from the web interface first. "
)
# Part
show ( " Part... " )
dbParts = { }
for i , id in enumerate ( parts [ " id " ] ) :
id = int ( id )
partNumber = int ( parts [ " number " ] [ i ] )
partProgramLabel = parts [ " program_label " ] [ i ]
dbPart = Part . query . filter (
Part . number == partNumber ,
Part . program . has ( Program . label == partProgramLabel ) ,
Part . semester == dbSemester ,
) . first ( )
if not dbPart :
raise DataBaseImportException (
f " Part with number { partNumber } and program_label { partProgramLabel } does not exist in the database! Please make sure that you create parts from the web interface first. "
)
dbParts [ id ] = dbPart
# Student
show ( " Student... " )
dbStudents = { }
for i , studentNumber in enumerate ( students [ " student_number " ] ) :
studentNumber = int ( studentNumber )
dbStudent = Student . query . filter ( Student . student_number == studentNumber ) . first ( )
if not dbStudent :
dbStudent = Student (
student_number = studentNumber ,
first_name = students [ " first_name " ] [ i ] ,
last_name = students [ " last_name " ] [ i ] ,
uni_email = students [ " uni_email " ] [ i ] ,
contact_email = students [ " contact_email " ] [ i ] or None ,
bachelor_thesis = students [ " bachelor_thesis " ] [ i ] or None ,
bachelor_thesis_work_group = students [ " bachelor_thesis_work_group " ] [ i ] or None ,
note = students [ " note " ] [ i ] or None ,
)
db . session . add ( dbStudent )
else :
dbStudent . contact_email = students [ " contact_email " ] [ i ] or None
dbStudent . bachelor_thesis = students [ " bachelor_thesis " ] [ i ] or None
dbStudent . bachelor_thesis_work_group = students [ " bachelor_thesis_work_group " ] [ i ] or None
dbStudent . note = students [ " note " ] [ i ] or None
dbStudents [ studentNumber ] = dbStudent
# Group
show ( " Group... " )
dbGroups = { }
for i , id in enumerate ( groups [ " id " ] ) :
id = int ( id )
dbGroup = Group (
number = int ( groups [ " number " ] [ i ] ) ,
program = Program . query . filter ( Program . label == groups [ " program_label " ] [ i ] ) . first ( ) ,
semester = dbSemester ,
)
db . session . add ( dbGroup )
dbGroups [ id ] = dbGroup
# PartStudent
show ( " PartStudent... " )
for i , studentNumber in enumerate ( partStudents [ " student_number " ] ) :
studentNumber = int ( studentNumber )
dbPartStudent = PartStudent . customInit (
student = dbStudents [ studentNumber ] ,
part = dbParts [ int ( partStudents [ " part_id " ] [ i ] ) ] ,
group = dbGroups [ int ( partStudents [ " group_id " ] [ i ] ) ] ,
)
db . session . add ( dbPartStudent )
# Experiment
show ( " Experiment... " )
dbSemesterExperiments = { }
for i , id in enumerate ( experiments [ " id " ] ) :
id = int ( id )
experimentNumber = int ( experiments [ " number " ] [ i ] )
experimentProgram = Program . query . filter ( Program . label == experiments [ " program_label " ] [ i ] ) . first ( )
dbExperiment = Experiment . query . filter (
Experiment . number == experimentNumber , Experiment . program == experimentProgram
) . first ( )
if not dbExperiment :
raise DataBaseImportException (
f " Experiment with number { experimentNumber } and program { experimentProgram . repr ( ) } does not exist in the database. Please make sure that experiments are created from the web interface. "
)
dbSemesterExperiment = SemesterExperiment . query . filter (
SemesterExperiment . experiment == dbExperiment , SemesterExperiment . semester == dbSemester
) . first ( )
if not dbSemesterExperiment :
raise DataBaseImportException (
f " No semester experiment exists in the database to the experiment with number { experimentNumber } and program { experimentProgram . repr ( ) } . Please make sure that semester experiments are created in the web interface. The problem might be that the experiment was not active while creating a new semester "
)
dbSemesterExperiments [ id ] = dbSemesterExperiment
# GroupExperiment
show ( " GroupExperiment... " )
dbGroupExperiments = { }
for i , id in enumerate ( groupExperiments [ " id " ] ) :
id = int ( id )
dbGroupExperiment = GroupExperiment . customInit (
semester_experiment = dbSemesterExperiments [ int ( groupExperiments [ " experiment_id " ] [ i ] ) ] ,
group = dbGroups [ int ( groupExperiments [ " group_id " ] [ i ] ) ] ,
)
db . session . add ( dbGroupExperiment )
dbGroupExperiments [ id ] = dbGroupExperiment
# Appointment
show ( " Appointment... " )
for i , date in enumerate ( appointments [ " date " ] ) :
assistantEmail = appointments [ " assistant_email " ] [ i ]
assistant = Assistant . query . filter ( Assistant . user . has ( User . email == assistantEmail ) ) . first ( )
if not assistant :
raise DataBaseImportException (
f " Assistant with email { email } does not exist in the database! Please make sure that you create assistants in the web interface. "
)
dbAppointment = Appointment . customInit (
date = datetime . strptime ( date , " %d . % m. % Y " ) . date ( ) ,
special = bool ( int ( appointments [ " special " ] [ i ] ) ) ,
group_experiment = dbGroupExperiments [ int ( appointments [ " group_experiment_id " ] [ i ] ) ] ,
assistant = assistant ,
)
db . session . add ( dbAppointment )
# Backup
2022-05-07 14:48:28 +00:00
dest = relative_db_bk_dir / f " before_ { dbSemester . repr ( ) } _import_ { now ( ) } .db "
copy2 ( relative_db_path , dest )
2021-09-11 18:47:48 +00:00
show ( f " Made a backup of the database before the import at { dest } " )
# Auto commit from the transaction context
2022-02-13 18:58:05 +00:00
2022-05-07 14:48:28 +00:00
dest = relative_db_bk_dir / f " after_ { dbSemester . repr ( ) } _import_ { now ( ) } .db "
copy2 ( relative_db_path , dest )
2021-09-11 18:47:48 +00:00
show ( f " Made a backup of the database after the import at { dest } " )
show ( " \n Import done! " )
show ( " Please check that everything worked as expected. Restore the database backup otherwise! " )