mirror of
https://codeberg.org/Mo8it/AdvLabDB.git
synced 2024-11-08 21:21:06 +00:00
Move scripts
This commit is contained in:
parent
d2cd1ae9a1
commit
d362820c3c
15 changed files with 37 additions and 348 deletions
|
@ -1,15 +1,18 @@
|
||||||
|
from os import environ
|
||||||
|
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
from flask_admin import Admin
|
from flask_admin import Admin
|
||||||
from flask_security import Security, SQLAlchemyUserDatastore
|
from flask_security import Security, SQLAlchemyUserDatastore
|
||||||
from flask_security.models import fsqla_v2 as fsqla
|
from flask_security.models import fsqla_v2 as fsqla
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from flask_migrate import Migrate
|
from flask_migrate import Migrate
|
||||||
|
|
||||||
from .config import set_config
|
from .config import set_config
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
set_config(app)
|
# Config
|
||||||
|
settings = set_config(app)
|
||||||
|
|
||||||
# Setup Flask-SQLAlchemy
|
# Setup Flask-SQLAlchemy
|
||||||
db = SQLAlchemy(app)
|
db = SQLAlchemy(app)
|
||||||
|
@ -41,10 +44,4 @@ from . import models
|
||||||
user_datastore = SQLAlchemyUserDatastore(db, models.User, models.Role)
|
user_datastore = SQLAlchemyUserDatastore(db, models.User, models.Role)
|
||||||
Security(app, user_datastore)
|
Security(app, user_datastore)
|
||||||
|
|
||||||
try:
|
from . import routes, adminModelViews, assistantModelViews
|
||||||
from . import routes, adminModelViews, assistantModelViews
|
|
||||||
except Exception as ex:
|
|
||||||
print(
|
|
||||||
"\nYou are probably initializing the database with a script. If not, then you have to worry about not being able to import in __init__.py!\n"
|
|
||||||
)
|
|
||||||
print(str(ex))
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ from configparser import ConfigParser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
def load_config(*files):
|
def load_config(app, *files):
|
||||||
config = ConfigParser()
|
config = ConfigParser()
|
||||||
|
|
||||||
for file in files:
|
for file in files:
|
||||||
|
@ -19,7 +19,7 @@ def load_config(*files):
|
||||||
|
|
||||||
|
|
||||||
def set_config(app):
|
def set_config(app):
|
||||||
config = load_config("secrets.ini", "settings.ini")
|
config = load_config(app, "secrets.ini", "settings.ini")
|
||||||
secrets = config["Secrets"]
|
secrets = config["Secrets"]
|
||||||
settings = config["Settings"]
|
settings = config["Settings"]
|
||||||
|
|
||||||
|
@ -53,5 +53,7 @@ def set_config(app):
|
||||||
"check_deliverability": check_email_deliverability,
|
"check_deliverability": check_email_deliverability,
|
||||||
}
|
}
|
||||||
app.config["SECURITY_PASSWORD_SALT"] = secrets["SECURITY_PASSWORD_SALT"]
|
app.config["SECURITY_PASSWORD_SALT"] = secrets["SECURITY_PASSWORD_SALT"]
|
||||||
app.config["SECURITY_PASSWORD_LENGTH_MIN"] = settings.getint(SECURITY_PASSWORD_LENGTH_MIN, 15)
|
app.config["SECURITY_PASSWORD_LENGTH_MIN"] = settings.getint("SECURITY_PASSWORD_LENGTH_MIN", 15)
|
||||||
# TODO: app.config["SECURITY_LOGIN_USER_TEMPLATE"] =
|
# TODO: app.config["SECURITY_LOGIN_USER_TEMPLATE"] =
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from os import environ
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import copy2
|
from shutil import copy2
|
||||||
|
|
||||||
from flask import flash, has_request_context
|
from flask import flash, has_request_context
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from . import db
|
from . import db, settings
|
||||||
from .exceptions import DataBaseImportException
|
from .exceptions import DataBaseImportException
|
||||||
from .model_independent_funs import get_first
|
from .model_independent_funs import get_first
|
||||||
from .models import (
|
from .models import (
|
||||||
|
@ -24,12 +23,6 @@ from .models import (
|
||||||
User,
|
User,
|
||||||
)
|
)
|
||||||
|
|
||||||
relative_db_dir = Path(environ["RELATIVE_DB_DIR"])
|
|
||||||
relative_db_path = relative_db_dir / "advlab.db"
|
|
||||||
|
|
||||||
relative_db_bk_dir = relative_db_dir / "backups"
|
|
||||||
relative_db_bk_dir.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
|
|
||||||
def now():
|
def now():
|
||||||
return datetime.now().strftime("%d_%m_%Y_%H_%M_%S")
|
return datetime.now().strftime("%d_%m_%Y_%H_%M_%S")
|
||||||
|
@ -54,16 +47,16 @@ def not_nullable(entry):
|
||||||
|
|
||||||
|
|
||||||
def importFromFile(filePath):
|
def importFromFile(filePath):
|
||||||
|
db_path = Path(settings["SQLITE_DB_PATH"])
|
||||||
|
|
||||||
|
db_bk_dir = db_path.parent / "backups"
|
||||||
|
db_bk_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
if filePath[-4:] != ".txt":
|
if filePath[-4:] != ".txt":
|
||||||
raise DataBaseImportException(
|
raise DataBaseImportException(
|
||||||
"The import file has to be a text file with txt extension (.txt at the end of the filename)!"
|
"The import file has to be a text file with txt extension (.txt at the end of the filename)!"
|
||||||
)
|
)
|
||||||
|
|
||||||
if has_request_context():
|
|
||||||
show = flash
|
|
||||||
else:
|
|
||||||
show = print
|
|
||||||
|
|
||||||
semesters = {}
|
semesters = {}
|
||||||
parts = {}
|
parts = {}
|
||||||
students = {}
|
students = {}
|
||||||
|
@ -74,7 +67,7 @@ def importFromFile(filePath):
|
||||||
appointments = {}
|
appointments = {}
|
||||||
|
|
||||||
with open(filePath) as f:
|
with open(filePath) as f:
|
||||||
show("Reading file...")
|
flash("Reading file...")
|
||||||
|
|
||||||
expectingTable = True
|
expectingTable = True
|
||||||
readHeader = False
|
readHeader = False
|
||||||
|
@ -139,7 +132,7 @@ def importFromFile(filePath):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Semester
|
# Semester
|
||||||
show("Semester...")
|
flash("Semester...")
|
||||||
|
|
||||||
if len(semesters["label"]) != 1:
|
if len(semesters["label"]) != 1:
|
||||||
raise DataBaseImportException("Only one semester is allowed in an import file!")
|
raise DataBaseImportException("Only one semester is allowed in an import file!")
|
||||||
|
@ -154,7 +147,7 @@ def importFromFile(filePath):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Part
|
# Part
|
||||||
show("Part...")
|
flash("Part...")
|
||||||
|
|
||||||
dbParts = {}
|
dbParts = {}
|
||||||
for i, id in enumerate(parts["id"]):
|
for i, id in enumerate(parts["id"]):
|
||||||
|
@ -179,7 +172,7 @@ def importFromFile(filePath):
|
||||||
dbParts[id] = dbPart
|
dbParts[id] = dbPart
|
||||||
|
|
||||||
# Student
|
# Student
|
||||||
show("Student...")
|
flash("Student...")
|
||||||
|
|
||||||
dbStudents = {}
|
dbStudents = {}
|
||||||
for i, studentNumber in enumerate(students["student_number"]):
|
for i, studentNumber in enumerate(students["student_number"]):
|
||||||
|
@ -207,7 +200,7 @@ def importFromFile(filePath):
|
||||||
dbStudents[studentNumber] = dbStudent
|
dbStudents[studentNumber] = dbStudent
|
||||||
|
|
||||||
# Group
|
# Group
|
||||||
show("Group...")
|
flash("Group...")
|
||||||
|
|
||||||
dbGroups = {}
|
dbGroups = {}
|
||||||
for i, id in enumerate(groups["id"]):
|
for i, id in enumerate(groups["id"]):
|
||||||
|
@ -222,7 +215,7 @@ def importFromFile(filePath):
|
||||||
dbGroups[id] = dbGroup
|
dbGroups[id] = dbGroup
|
||||||
|
|
||||||
# PartStudent
|
# PartStudent
|
||||||
show("PartStudent...")
|
flash("PartStudent...")
|
||||||
|
|
||||||
for i, studentNumber in enumerate(partStudents["student_number"]):
|
for i, studentNumber in enumerate(partStudents["student_number"]):
|
||||||
studentNumber = int(not_nullable(studentNumber))
|
studentNumber = int(not_nullable(studentNumber))
|
||||||
|
@ -234,7 +227,7 @@ def importFromFile(filePath):
|
||||||
db.session.add(dbPartStudent)
|
db.session.add(dbPartStudent)
|
||||||
|
|
||||||
# Experiment
|
# Experiment
|
||||||
show("Experiment...")
|
flash("Experiment...")
|
||||||
|
|
||||||
dbSemesterExperiments = {}
|
dbSemesterExperiments = {}
|
||||||
for i, id in enumerate(experiments["id"]):
|
for i, id in enumerate(experiments["id"]):
|
||||||
|
@ -268,7 +261,7 @@ def importFromFile(filePath):
|
||||||
dbSemesterExperiments[id] = dbSemesterExperiment
|
dbSemesterExperiments[id] = dbSemesterExperiment
|
||||||
|
|
||||||
# GroupExperiment
|
# GroupExperiment
|
||||||
show("GroupExperiment...")
|
flash("GroupExperiment...")
|
||||||
|
|
||||||
dbGroupExperiments = {}
|
dbGroupExperiments = {}
|
||||||
for i, id in enumerate(groupExperiments["id"]):
|
for i, id in enumerate(groupExperiments["id"]):
|
||||||
|
@ -281,7 +274,7 @@ def importFromFile(filePath):
|
||||||
dbGroupExperiments[id] = dbGroupExperiment
|
dbGroupExperiments[id] = dbGroupExperiment
|
||||||
|
|
||||||
# Appointment
|
# Appointment
|
||||||
show("Appointment...")
|
flash("Appointment...")
|
||||||
|
|
||||||
for i, date in enumerate(appointments["date"]):
|
for i, date in enumerate(appointments["date"]):
|
||||||
date = not_nullable(date)
|
date = not_nullable(date)
|
||||||
|
@ -302,10 +295,10 @@ def importFromFile(filePath):
|
||||||
db.session.add(dbAppointment)
|
db.session.add(dbAppointment)
|
||||||
|
|
||||||
# Backup
|
# Backup
|
||||||
dest = relative_db_bk_dir / f"before_{dbSemester}_import_{now()}.db"
|
dest = db_bk_dir / f"before_{dbSemester}_import_{now()}.db"
|
||||||
copy2(relative_db_path, dest)
|
copy2(db_path, dest)
|
||||||
|
|
||||||
show(f"Made a backup of the database before committing the import at {dest}")
|
flash(f"Made a backup of the database before committing the import at {dest}")
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
@ -313,10 +306,10 @@ def importFromFile(filePath):
|
||||||
|
|
||||||
raise ex
|
raise ex
|
||||||
|
|
||||||
dest = relative_db_bk_dir / f"after_{dbSemester}_import_{now()}.db"
|
dest = db_bk_dir / f"after_{dbSemester}_import_{now()}.db"
|
||||||
copy2(relative_db_path, dest)
|
copy2(db_path, dest)
|
||||||
|
|
||||||
show(f"Made a backup of the database after the import at {dest}")
|
flash(f"Made a backup of the database after the import at {dest}")
|
||||||
|
|
||||||
show("\nImport done!")
|
flash("\nImport done!")
|
||||||
show("Please check that everything worked as expected. Restore the database backup otherwise!")
|
flash("Please check that everything worked as expected. Restore the database backup otherwise!")
|
||||||
|
|
|
@ -3,7 +3,6 @@ Functions dependent on advlabdb.models.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from functools import cache
|
from functools import cache
|
||||||
from os import environ
|
|
||||||
|
|
||||||
from flask import flash
|
from flask import flash
|
||||||
from flask_admin.menu import MenuLink
|
from flask_admin.menu import MenuLink
|
||||||
|
@ -11,7 +10,7 @@ from flask_security import current_user
|
||||||
from wtforms.fields import BooleanField, IntegerField, SelectField, StringField
|
from wtforms.fields import BooleanField, IntegerField, SelectField, StringField
|
||||||
from wtforms.validators import DataRequired, NumberRange, Optional
|
from wtforms.validators import DataRequired, NumberRange, Optional
|
||||||
|
|
||||||
from . import app
|
from . import app, settings
|
||||||
from .models import MAX_MARK, MIN_MARK, Semester
|
from .models import MAX_MARK, MIN_MARK, Semester
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,7 +23,7 @@ def initActiveSemesterMenuLinks(space):
|
||||||
MenuLink(
|
MenuLink(
|
||||||
name=str(semester),
|
name=str(semester),
|
||||||
# Using SERVER_NAME because of missing request context
|
# Using SERVER_NAME because of missing request context
|
||||||
url=environ["SERVER_NAME"] + "/set_semester" + "?semester_id=" + str(semester.id),
|
url=settings["SERVER_NAME"] + "/set_semester" + "?semester_id=" + str(semester.id),
|
||||||
category="Active semester",
|
category="Active semester",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -33,7 +32,7 @@ def initActiveSemesterMenuLinks(space):
|
||||||
"ERROR: The Semester table does not exist yet! Therefore, menu links could not be generated. You can ignore this error if you are just initializing the database."
|
"ERROR: The Semester table does not exist yet! Therefore, menu links could not be generated. You can ignore this error if you are just initializing the database."
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
space.add_link(MenuLink(name="Logout", url=environ["SERVER_NAME"] + "/logout"))
|
space.add_link(MenuLink(name="Logout", url=settings["SERVER_NAME"] + "/logout"))
|
||||||
|
|
||||||
|
|
||||||
def active_semester_str():
|
def active_semester_str():
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
from os.path import exists
|
|
||||||
from shutil import copytree, rmtree
|
|
||||||
|
|
||||||
from flask_admin import __file__ as flaskAdminPath
|
|
||||||
|
|
||||||
|
|
||||||
def copyAdminTemplates():
|
|
||||||
src = flaskAdminPath.removesuffix("__init__.py") + "templates/bootstrap4/admin"
|
|
||||||
if not exists(src):
|
|
||||||
print("Templates could not be found in", src)
|
|
||||||
print("You can also copy them manually.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
dist = "advlabdb/templates/admin"
|
|
||||||
if exists(dist):
|
|
||||||
while True:
|
|
||||||
ans = input(
|
|
||||||
f"The directory {dist} already exists. Enter 'o' to override it and update the templates or enter 's' to stop the process: "
|
|
||||||
).lower()
|
|
||||||
if ans == "s":
|
|
||||||
print("Process stopped!")
|
|
||||||
return False
|
|
||||||
elif ans == "o":
|
|
||||||
break
|
|
||||||
rmtree(dist)
|
|
||||||
print("Old templates deleted!")
|
|
||||||
|
|
||||||
copytree(src, dist)
|
|
||||||
print("Copied", src, "->", dist)
|
|
||||||
|
|
||||||
print(
|
|
||||||
f"""
|
|
||||||
_________
|
|
||||||
| WARNING
|
|
||||||
| -------
|
|
||||||
|
|
|
||||||
| You might have to edit the file {dist}/base.html by adding nav in the following way:
|
|
||||||
| This line:\t<ul class="navbar-nav mr-auto">
|
|
||||||
| Becomes:\t<ul class="nav navbar-nav mr-auto">
|
|
||||||
|
|
|
||||||
| This will prevent the navigation bar from expanding such that some elements can not be seen.
|
|
||||||
| Refer to this pull request: https://github.com/flask-admin/flask-admin/pull/2233
|
|
||||||
|
|
|
||||||
| If the above pull request is merged and flask-admin is on a new release after the merge,
|
|
||||||
| then this step is not needed.
|
|
||||||
_________
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if not copyAdminTemplates():
|
|
||||||
print("Did not copy!")
|
|
|
@ -1,55 +0,0 @@
|
||||||
from flask_security import admin_change_password
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from ... import app, db
|
|
||||||
from ...model_independent_funs import randomPassword
|
|
||||||
from ...models import Admin, User
|
|
||||||
from ..terminal_utils import box, validating_input
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
print("This script will generate a new random password for a chosen admin.")
|
|
||||||
print()
|
|
||||||
|
|
||||||
with app.app_context():
|
|
||||||
with db.session.begin():
|
|
||||||
admins = db.session.execute(select(Admin).join(User).where(User.active == True)).scalars().all()
|
|
||||||
activate_user = False
|
|
||||||
|
|
||||||
if len(admins) == 0:
|
|
||||||
print("There is no admin with an active user. The user of the chosen admin will be activated")
|
|
||||||
admins = db.session.execute(select(Admin)).scalars().all()
|
|
||||||
activate_user = True
|
|
||||||
|
|
||||||
num_admins = len(admins)
|
|
||||||
|
|
||||||
prompt = "Admins:\n"
|
|
||||||
for ind, admin in enumerate(admins):
|
|
||||||
user = admin.user
|
|
||||||
prompt += f"[{ind}] {user.first_name} {user.last_name}: {user.email}\n"
|
|
||||||
prompt += "Enter number"
|
|
||||||
|
|
||||||
admin_index = validating_input(
|
|
||||||
prompt,
|
|
||||||
options=[str(ind) for ind in range(num_admins)],
|
|
||||||
format_function=lambda ans: int(ans),
|
|
||||||
)
|
|
||||||
|
|
||||||
chosen_admin_user = admins[admin_index].user
|
|
||||||
|
|
||||||
new_password = randomPassword()
|
|
||||||
|
|
||||||
admin_change_password(
|
|
||||||
chosen_admin_user, new_password, notify=False
|
|
||||||
) # Password is automatically hashed with this function
|
|
||||||
|
|
||||||
if activate_user:
|
|
||||||
chosen_admin_user.active = True
|
|
||||||
|
|
||||||
box(new_password, "New password")
|
|
||||||
|
|
||||||
print("Done!")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,4 +0,0 @@
|
||||||
import secrets
|
|
||||||
|
|
||||||
print(f"SECRET_KEY={secrets.SystemRandom().getrandbits(128)}")
|
|
||||||
print(f"SECURITY_PASSWORD_SALT={secrets.token_hex()}")
|
|
|
@ -1,85 +0,0 @@
|
||||||
from email_validator import validate_email
|
|
||||||
from flask_security import hash_password
|
|
||||||
|
|
||||||
from ... import app, db, user_datastore
|
|
||||||
from ...model_independent_funs import randomPassword
|
|
||||||
from ...models import MAX_YEAR, MIN_YEAR, Admin, Semester
|
|
||||||
from ..terminal_utils import box, confirm, validating_input
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
print("\n")
|
|
||||||
print("This script should only be used to initialize the database after setting up a server")
|
|
||||||
print("The old database will be DELETED and a new database will be created.")
|
|
||||||
|
|
||||||
if not confirm("Are you sure that you want to continue?"):
|
|
||||||
print("Aborted!")
|
|
||||||
return
|
|
||||||
|
|
||||||
with app.app_context():
|
|
||||||
with db.session.begin():
|
|
||||||
# Delete old database
|
|
||||||
db.drop_all()
|
|
||||||
# Create new database
|
|
||||||
db.create_all()
|
|
||||||
|
|
||||||
semester_label = validating_input(
|
|
||||||
"Enter the label of the current semester",
|
|
||||||
options=("SS", "WS"),
|
|
||||||
format_function=lambda ans: ans.upper(),
|
|
||||||
)
|
|
||||||
|
|
||||||
semester_year = validating_input(
|
|
||||||
f"Enter the year of the current semester (between {MIN_YEAR} and {MAX_YEAR})",
|
|
||||||
format_function=lambda n_str: int(n_str),
|
|
||||||
check_constraints_function=lambda n: MIN_YEAR <= n <= MAX_YEAR,
|
|
||||||
)
|
|
||||||
|
|
||||||
semester = Semester(label=semester_label, year=semester_year)
|
|
||||||
|
|
||||||
db.session.add(semester)
|
|
||||||
|
|
||||||
adminRole = user_datastore.create_role(name="admin")
|
|
||||||
user_datastore.create_role(name="assistant")
|
|
||||||
|
|
||||||
box("The first admin account will now be created")
|
|
||||||
|
|
||||||
admin_email = validating_input(
|
|
||||||
"Enter the admin's email address",
|
|
||||||
format_function=lambda ans: validate_email(ans).email,
|
|
||||||
)
|
|
||||||
|
|
||||||
admin_first_name = input("Enter the admin's first name: ")
|
|
||||||
admin_last_name = input("Enter the admin's last name: ")
|
|
||||||
admin_phone_number = input("Enter the admin's phone number (optional): ")
|
|
||||||
admin_mobile_phone_number = input("Enter the admin's mobile phone number (optional): ")
|
|
||||||
admin_building = input("Enter the admin's building (optional): ")
|
|
||||||
admin_room = input("Enter the admin's room (optional): ")
|
|
||||||
|
|
||||||
admin_password = randomPassword()
|
|
||||||
admin_hashed_password = hash_password(admin_password)
|
|
||||||
|
|
||||||
admin_user = user_datastore.create_user(
|
|
||||||
email=admin_email.strip(),
|
|
||||||
password=admin_hashed_password,
|
|
||||||
roles=[adminRole],
|
|
||||||
first_name=admin_first_name.strip(),
|
|
||||||
last_name=admin_last_name.strip(),
|
|
||||||
phone_number=admin_phone_number.strip() or None,
|
|
||||||
mobile_phone_number=admin_mobile_phone_number.strip() or None,
|
|
||||||
building=admin_building.strip() or None,
|
|
||||||
room=admin_room.strip() or None,
|
|
||||||
active_semester=semester,
|
|
||||||
)
|
|
||||||
|
|
||||||
admin = Admin(user=admin_user)
|
|
||||||
|
|
||||||
db.session.add(admin)
|
|
||||||
|
|
||||||
box(admin_password, "Admin password")
|
|
||||||
|
|
||||||
print("Done database initialization!")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,103 +0,0 @@
|
||||||
"""
|
|
||||||
No relative imports allowed in this file to be able to run server_setup.py without packages.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import subprocess # nosec 404
|
|
||||||
from getpass import getpass
|
|
||||||
|
|
||||||
|
|
||||||
def run(command, **kwargs):
|
|
||||||
return subprocess.run(command, shell=True, **kwargs) # nosec B602
|
|
||||||
|
|
||||||
|
|
||||||
def box(message, context=None):
|
|
||||||
text_line = "| "
|
|
||||||
|
|
||||||
if context is not None:
|
|
||||||
text_line += context + ": "
|
|
||||||
|
|
||||||
text_line += message + " |"
|
|
||||||
|
|
||||||
separator = "=" * len(text_line)
|
|
||||||
|
|
||||||
print()
|
|
||||||
print(separator)
|
|
||||||
print(text_line)
|
|
||||||
print(separator)
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
def step(message):
|
|
||||||
continue_message = "-> Press ENTER to continue or Ctrl+C to interrupt the script <-"
|
|
||||||
upper_separator = "_" * len(continue_message)
|
|
||||||
|
|
||||||
print()
|
|
||||||
print(upper_separator)
|
|
||||||
|
|
||||||
box(message, "Next step")
|
|
||||||
|
|
||||||
print(continue_message)
|
|
||||||
getpass("")
|
|
||||||
print()
|
|
||||||
|
|
||||||
|
|
||||||
def spaced_hl():
|
|
||||||
print("\n\n" + "_" * 20 + "\n\n")
|
|
||||||
|
|
||||||
|
|
||||||
def validating_input(
|
|
||||||
prompt,
|
|
||||||
options=None,
|
|
||||||
format_function=lambda ans: ans,
|
|
||||||
check_constraints_function=lambda ans: True,
|
|
||||||
):
|
|
||||||
if options is not None:
|
|
||||||
prompt += " ["
|
|
||||||
|
|
||||||
for opt in options[:-1]:
|
|
||||||
prompt += opt + "/"
|
|
||||||
|
|
||||||
prompt += options[-1] + "]: "
|
|
||||||
|
|
||||||
lowered_options = [opt.lower() for opt in options]
|
|
||||||
|
|
||||||
def adj_check_constraints_function(ans):
|
|
||||||
return ans.lower() in lowered_options and check_constraints_function(ans)
|
|
||||||
|
|
||||||
else:
|
|
||||||
prompt += ": "
|
|
||||||
adj_check_constraints_function = check_constraints_function
|
|
||||||
|
|
||||||
ans = None
|
|
||||||
first_run = True
|
|
||||||
done_validation = False
|
|
||||||
|
|
||||||
while not done_validation or not adj_check_constraints_function(ans):
|
|
||||||
if not first_run:
|
|
||||||
done_validation = False
|
|
||||||
print("Invalid input!\n")
|
|
||||||
else:
|
|
||||||
first_run = False
|
|
||||||
|
|
||||||
ans = input(prompt)
|
|
||||||
try:
|
|
||||||
ans = format_function(ans)
|
|
||||||
except Exception: # nosec B112
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
done_validation = True
|
|
||||||
|
|
||||||
return ans
|
|
||||||
|
|
||||||
|
|
||||||
def confirm(prompt):
|
|
||||||
ans = validating_input(
|
|
||||||
prompt,
|
|
||||||
options=("y", "n"),
|
|
||||||
format_function=lambda ans: ans.lower(),
|
|
||||||
)
|
|
||||||
|
|
||||||
if ans == "y":
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
Loading…
Reference in a new issue