I am build a system include server, mobile app and desktop app.
server.py
from flask import Flask, jsonify, request, g, send_file
from flask_cors import CORS
import mysql.connector
import data.data as dt
import database as db
import hashlib
import json
from subprocess import Popen
import os
from datetime import datetime
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
CORS(app)
limiter = Limiter(get_remote_address, app=app, default_limits=["5 per second"])
def is_high_permission(user):
try:
with open("highpermission.dat", "r", encoding="utf-8") as f:
allowed_accounts = [line.strip() for line in f if line.strip()]
except Exception as e:
print("Error loading highpermission.dat:", e)
allowed_accounts = []
return user in allowed_accounts
def get_current_session():
now = datetime.now()
date_str = now.strftime("%d%m%Y")
morning_start = now.replace(hour=6, minute=0, second=0, microsecond=0)
morning_end = now.replace(hour=11, minute=50, second=0, microsecond=0)
afternoon_end = now.replace(hour=18, minute=0, second=0, microsecond=0)
if morning_start <= now <= morning_end:
session = "sang"
elif morning_end < now <= afternoon_end:
session = "chieu"
else:
return None
return f"{date_str}-{session}.session"
def log_record_session(record_id):
session_file = get_current_session()
if session_file:
log_dir = os.path.join("logs", "sessions")
os.makedirs(log_dir, exist_ok=True)
file_path = os.path.join(log_dir, session_file)
with open(file_path, "a", encoding="utf-8") as f:
f.write(str(record_id) + "\n")
else:
print("Ngoài giờ làm việc. Không ghi nhận phiên.")
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in {"zip", "smuc", "plgs"}
@app.before_request
def before_request():
g.db_accounts = mysql.connector.connect(
host="127.0.0.1",
port=3306,
user="main",
password="mainserverlogin",
database="ACCOUNTS",
autocommit=True,
connection_timeout=31536000
)
g.cursor_accounts = g.db_accounts.cursor(dictionary=True)
g.db_data = mysql.connector.connect(
host="127.0.0.1",
port=3306,
user="main",
password="mainserverlogin",
database="DATA",
autocommit=True,
connection_timeout=31536000
)
g.cursor_data = g.db_data.cursor()
@app.teardown_request
def teardown_request(exception):
if hasattr(g, 'cursor_accounts'):
g.cursor_accounts.close()
if hasattr(g, 'db_accounts'):
g.db_accounts.close()
if hasattr(g, 'cursor_data'):
g.cursor_data.close()
if hasattr(g, 'db_data'):
g.db_data.close()
svr = db.DB(role_config_path="role_cfg.json")
ast = dt.Data()
@app.route("/infomation")
@limiter.limit("1 per 200 milliseconds")
def info():
return jsonify({"version": "1.0.0-res", "compact": "1xx"}), 200
@app.route("/login", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def Login():
inf = request.json
pswd = hashlib.sha256(inf["passwd"].encode()).hexdigest()
scr = svr.userLogin(usr=inf["user"], passwd=pswd)
if scr == -1:
return jsonify({"status": "fail"}), 200
elif scr == 1:
return jsonify({"status": "success", "usrst": "disabled"}), 200
else:
return jsonify({"status": "success", "accesskey": scr}), 200
@app.route("/logincheck", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def LoginCheck():
inf = request.json
scr = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if scr == 1:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
@app.route("/userprofile", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def UsrProfile():
inf = request.json
try:
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
scr = svr.getUserProfile(inf['usr'])
if scr:
return jsonify({"status": "success", "data": scr}), 200
else:
return jsonify({"status": "fail"}), 200
except:
return jsonify({"status": "fail"}), 200
@app.route("/getmistakes", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def ReadProfile():
inf = request.json
try:
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
scr = ast.getRecordsByUser(inf["user"])
if scr:
return jsonify({"status": "success", "data": scr}), 200
else:
return jsonify({"status": "fail"}), 200
except:
return jsonify({"status": "fail"}), 200
@app.route("/getclassmtk", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def GetClassMistake():
inf = request.json
try:
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
tmp = svr.getClassInfo(inf["user"], inf["class"])
if tmp and tmp != "denied":
REC = {"status": "success", "data": tmp}
else:
return jsonify({"status": "fail"}), 200
scr = ast.getRecordsByClass(REC)
if scr:
return jsonify({"status": "success", "data": scr}), 200
else:
return jsonify({"status": "fail"}), 200
except:
return jsonify({"status": "fail"}), 200
@app.route('/getclass', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def getclass():
inf = request.json
try:
scr = svr.getClassInfo(inf["user"], inf["class"])
if scr and scr != "denied":
return jsonify({"status": "success", "data": scr})
else:
return jsonify({"status": "fail"}), 200
except:
return jsonify({"status": "fail"}), 200
@app.route('/getallprf', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def getallprf():
inf = request.json
try:
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
scr = svr.getAllProfile(inf["user"])
if scr and scr != "denied":
return jsonify({"status": "success", "data": scr})
else:
return jsonify({"status": "fail"}), 200
except:
return jsonify({"status": "fail"}), 200
@app.route('/writeRecord', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def writeRecord():
try:
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
new_id = ast.writeRecord(inf["order"], inf["mistake"], inf["user"])
log_record_session(new_id)
return jsonify({"status": "success", "record_id": new_id}), 200
except Exception as e:
print("Error in /writeRecord:", e)
return jsonify({"status": "fail"}), 200
@app.route('/image/<userid>', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def getImage(userid):
try:
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
scr = ast.getImage(inf["id"])
if scr is None:
return jsonify({"status": "fail"}), 200
return jsonify({"status": "success", "data": scr})
except:
return jsonify({"status": "fail"}), 200
@app.route('/account/passwd_change', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def passwdChange():
try:
inf = request.json
pswd = hashlib.sha256(inf["passwd"].encode()).hexdigest()
newpswd = hashlib.sha256(inf["newpasswd"].encode()).hexdigest()
scr = svr.changeUsrPassword(user=inf["user"], curpasswd=pswd, newpasswd=newpswd)
if scr == 1:
return jsonify({"status": "fail", "reason": "INVALID_INFO"}), 200
elif scr == -1:
return jsonify({"status": "fail", "reason": "NNOS_RETURN_ERROR"}), 200
return jsonify({"status": "success"})
except:
return jsonify({"status": "fail", "reason": "ERROR"}), 200
@app.route('/chg_profile', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def changeProfile():
inf = request.json
login_user = inf.get("username")
token = inf.get("token")
target_user = inf.get("user")
update_data = inf.get("data", {})
if not login_user or not token or not target_user or not update_data:
return jsonify({"status": "fail", "reason": "missing required fields"}), 200
chk = svr.userLoginCheck(usr=login_user, accesskey=token)
if chk != 1 or not is_high_permission(login_user):
return jsonify({"status": "fail", "reason": "not authorized"}), 200
result = svr.update_user_profile(target_user, update_data)
if result:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
@app.route("/gradeup", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def gradeUp():
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1 or not is_high_permission(inf["user"]):
return jsonify({"status": "fail", "reason": "not authorized"}), 200
result = svr.gradeup_all_profiles()
if result:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
@app.route("/gradedown", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def gradeDown():
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1 or not is_high_permission(inf["user"]):
return jsonify({"status": "fail", "reason": "not authorized"}), 200
result = svr.gradedown_all_profiles()
if result:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
@app.route("/updaterecord/<recordid>", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def updateRecord(recordid):
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1 or not is_high_permission(inf["user"]):
return jsonify({"status": "fail", "reason": "not authorized"}), 200
update_fields = {k: v for k, v in inf.items() if k not in ["user", "token"]}
if not update_fields:
return jsonify({"status": "fail", "reason": "no fields to update"}), 200
result = ast.update_record(recordid, update_fields)
if result:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
@app.route("/removerecord/<recordid>", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def removeRecord(recordid):
try:
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1 or not is_high_permission(inf["user"]):
return jsonify({"status": "fail"}), 200
result = ast.remove_record(recordid)
if result:
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "fail"}), 200
except:
print(0)
return jsonify({"status": "fail"}), 200
@app.route("/createmistakename", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def createMistakeName():
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1 or not is_high_permission(inf["user"]):
return jsonify({"status": "fail", "reason": "not authorized"}), 200
if not all(k in inf for k in ["mistake", "name", "minus"]):
return jsonify({"status": "fail", "reason": "missing parameters"}), 200
try:
ast.setMistake(inf["mistake"], inf["name"], inf["minus"])
return jsonify({"status": "success"}), 200
except Exception as e:
print("Error in /createmistakename:", e)
return jsonify({"status": "fail"}), 200
@app.route("/getmistakeslist", methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def getMistakesList():
inf = request.json
chk = svr.userLoginCheck(usr=inf["user"], accesskey=inf["token"])
if chk != 1:
return jsonify({"status": "fail"}), 200
cursor = g.get('cursor_data')
try:
cursor.execute("SELECT * FROM MISTAKES")
mistakes = cursor.fetchall()
return jsonify({"status": "success", "data": mistakes}), 200
except Exception as e:
print("Error in /getmistakeslist:", e)
return jsonify({"status": "fail"}), 200
@app.route("/checkuserperm", methods=['GET'])
@limiter.limit("1 per 200 milliseconds")
def checkUserPerm():
username = request.args.get("username")
if username and is_high_permission(username):
return jsonify({"role": "admin"}), 200
else:
return jsonify({"role": "user"}), 200
@app.route('/createUsers', methods=['POST'])
@limiter.limit("1 per 200 milliseconds")
def users_create():
auth = json.loads(request.headers.get("auth"))
chk = svr.userLoginCheck(usr=auth.get("user"), accesskey=auth.get("token"))
if chk != 1:
return jsonify({"status": "fail", "err": "auth user fail"}), 200
if 'file' not in request.files:
return jsonify({"status": "fail", "err": "no file upload"}), 200
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'no file upload'}), 200
if file and allowed_file(file.filename):
filename = "user_create.zip"
filepath = os.path.join(".", "upload", filename)
file.save(filepath)
print(os.getcwd())
Popen(["python3", "handle_data.py"], cwd=os.getcwd())
return jsonify({"state": "success"}), 200
else:
return jsonify({"state": "fail", "err": "Not allowed filetype"}), 200
@app.errorhandler(404)
def page_not_found(e):
return jsonify({"code": 404}), 404
if __name__ == "__main__":
app.run(host="0.0.0.0", port=2391, debug=True)
database.py
import json
import uuid
from datetime import datetime, timedelta
from flask import g
class DB:
def __init__(self, role_config_path="role_cfg.json"):
try:
with open(role_config_path, "r", encoding="utf-8") as f:
self.role_cfg = json.load(f)
except Exception as e:
print("Error loading role configuration:", e)
self.role_cfg = {}
def userLogin(self, usr: str, passwd: str):
cursor = g.get('cursor_accounts')
conn = g.get('db_accounts')
if not cursor:
return -1
query = "SELECT password, state FROM USERS WHERE username = %s"
try:
print("CHECK")
print(f"Autocommit is {'enabled' if conn.autocommit else 'disabled'}")
cursor.execute(query, (usr,))
result = cursor.fetchone()
if result and result['password'] == passwd:
if result['state'].lower() == "enable":
access_key = str(uuid.uuid4())
expiry_date = (datetime.now() + timedelta(days=7)).strftime("%d%m%Y")
insert_query = "INSERT INTO ACCESS (username, accesskey, exprise_date) VALUES (%s, %s, %s)"
cursor.execute(insert_query, (usr, access_key, expiry_date))
conn.commit()
print(f"Rows affected: {cursor.rowcount}")
delete_query = """
DELETE FROM ACCESS
WHERE accesskey NOT IN (
SELECT accesskey FROM (
SELECT accesskey, ROW_NUMBER() OVER (PARTITION BY username ORDER BY exprise_date DESC, accesskey DESC) AS row_num
FROM ACCESS
) AS temp
WHERE row_num <= 3
)
"""
cursor.execute(delete_query)
print(f"Rows affected: {cursor.rowcount}")
conn.commit()
return access_key
else:
return 1
else:
return -1
except Exception as err:
print("Error in userLogin:", err)
return -1
def userLoginCheck(self, usr: str, accesskey: str):
cursor = g.get('cursor_accounts')
if not cursor:
return 0
query = "SELECT exprise_date FROM ACCESS WHERE username = %s AND accesskey = %s"
try:
cursor.execute(query, (usr, accesskey))
result = cursor.fetchone()
if result:
expiry_date_str = result["exprise_date"]
expiry_date = datetime.strptime(expiry_date_str, "%d%m%Y")
if datetime.now() <= expiry_date:
return 1
else:
return 0
else:
return 0
except Exception as err:
print("Error in userLoginCheck:", err)
return 0
def changeUsrPassword(self, user: str, curpasswd: str, newpasswd: str):
cursor = g.get('cursor_accounts')
if not cursor:
return 1
try:
cursor.execute("SELECT password FROM USERS WHERE username = %s", (user,))
dat = cursor.fetchone()
if dat and curpasswd == dat["password"]:
cursor.execute("UPDATE USERS SET password = %s WHERE username = %s", (newpasswd, user))
return 0
else:
return 1
except Exception as e:
print(e)
return -1
def getUserProfile(self, usr: str) -> dict:
cursor = g.get('cursor_accounts')
if not cursor:
return {}
try:
cursor.execute("SELECT * FROM PROFILES WHERE username = %s", (usr,))
result = cursor.fetchone()
return result if result else {}
except Exception as err:
print("Error in getUserProfile:", err)
return {}
def _get_allowed_patterns(self, usr: str) -> list:
profile = self.getUserProfile(usr)
if not profile or "role" not in profile:
return []
roles = [r.strip().upper() for r in profile["role"].split(",") if r.strip()]
allowed_patterns = []
for role in roles:
pattern = self.role_cfg.get(role)
if pattern:
allowed_patterns.append(pattern)
return allowed_patterns
def _is_allowed_class(self, caller: str, target_class: str) -> bool:
caller_profile = self.getUserProfile(caller)
if not caller_profile or "class" not in caller_profile:
return False
caller_class = str(caller_profile["class"])
allowed_patterns = self._get_allowed_patterns(caller)
if "*" in allowed_patterns:
return True
for pattern in allowed_patterns:
if pattern == "%sclass":
if target_class == caller_class:
return True
elif pattern.endswith("*"):
prefix = pattern[:-1]
if target_class.startswith(prefix):
return True
else:
if target_class == pattern:
return True
return False
def getClassInfo(self, usr: str, class_: str):
allowed_patterns = self._get_allowed_patterns(usr)
if not allowed_patterns:
return "denied"
cursor = g.get('cursor_accounts')
if not cursor:
return []
try:
if class_:
cursor.execute("SELECT * FROM PROFILES WHERE `class` = %s", (class_,))
candidates = cursor.fetchall()
else:
cursor.execute("SELECT * FROM PROFILES")
candidates = cursor.fetchall()
except Exception as err:
print("Error in getClassInfo (query):", err)
return []
if "*" in allowed_patterns:
return candidates
filtered = []
for candidate in candidates:
target_class = str(candidate.get("class", ""))
if self._is_allowed_class(usr, target_class):
filtered.append(candidate)
return filtered
def getAccountProfile(self, usr: str, acc: str):
allowed_patterns = self._get_allowed_patterns(usr)
if not allowed_patterns:
return "denied"
cursor = g.get('cursor_accounts')
if not cursor:
return {}
try:
cursor.execute("SELECT * FROM PROFILES WHERE username = %s", (acc,))
target_profile = cursor.fetchone()
if not target_profile:
return {}
except Exception as err:
print("Error in getAccountProfile (query):", err)
return {}
if "*" in allowed_patterns:
return target_profile
target_class = str(target_profile.get("class", ""))
if self._is_allowed_class(usr, target_class):
return target_profile
else:
return "denied"
def getAllProfile(self, usr: str):
allowed_patterns = self._get_allowed_patterns(usr)
if not allowed_patterns:
return "denied"
cursor = g.get('cursor_accounts')
if not cursor:
return []
try:
cursor.execute("SELECT * FROM PROFILES")
candidates = cursor.fetchall()
except Exception as err:
print("Error in getClassInfo (query):", err)
return []
if "*" in allowed_patterns:
return candidates
filtered = []
for candidate in candidates:
target_class = str(candidate.get("class", ""))
if self._is_allowed_class(usr, target_class):
filtered.append(candidate)
return filtered
def update_user_profile(self, username: str, update_fields: dict) -> bool:
cursor = g.get('cursor_accounts')
if not cursor:
return False
set_clause = ", ".join([f"{k} = %s" for k in update_fields.keys()])
values = list(update_fields.values())
values.append(username)
try:
query = f"UPDATE PROFILES SET {set_clause} WHERE username = %s"
cursor.execute(query, tuple(values))
return True
except Exception as e:
print("Error in update_user_profile:", e)
return False
def gradeup_all_profiles(self) -> bool:
cursor_accounts = g.get('cursor_accounts')
cursor_data = g.get('cursor_data')
if not cursor_accounts or not cursor_data:
return False
try:
cursor_accounts.execute("SELECT username, class FROM PROFILES")
profiles = cursor_accounts.fetchall()
for profile in profiles:
current_class = profile["class"]
current_username = profile["username"]
try:
class_int = int(current_class)
new_class_int = class_int + 100
new_class = str(new_class_int)
except:
new_class = current_class
if current_username.endswith(str(current_class)):
new_username = current_username[:-len(str(current_class))] + new_class
else:
new_username = current_username
if isinstance(new_class, str):
try:
new_class_int = int(new_class)
except:
new_class_int = None
else:
new_class_int = None
if new_class_int is None or new_class_int > 9999 or new_class_int <= 0 or new_username is None:
cursor_accounts.execute("DELETE FROM PROFILES WHERE username = %s", (current_username,))
cursor_accounts.execute("DELETE FROM USERS WHERE username = %s", (current_username,))
cursor_accounts.execute("DELETE FROM ACCESS WHERE username = %s", (current_username,))
cursor_data.execute("DELETE FROM RECORDS WHERE username = %s", (current_username,))
cursor_data.execute("DELETE FROM IMAGES WHERE id = %s", (current_username,))
else:
cursor_accounts.execute("UPDATE PROFILES SET class = %s, username = %s WHERE username = %s",
(new_class, new_username, current_username))
cursor_accounts.execute("UPDATE USERS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_accounts.execute("UPDATE ACCESS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_data.execute("UPDATE RECORDS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_data.execute("UPDATE IMAGES SET id = %s WHERE id = %s", (new_username, current_username))
return True
except Exception as e:
print("Error in gradeup_all_profiles:", e)
return False
def gradedown_all_profiles(self) -> bool:
cursor_accounts = g.get('cursor_accounts')
cursor_data = g.get('cursor_data')
if not cursor_accounts or not cursor_data:
return False
try:
cursor_accounts.execute("SELECT username, class FROM PROFILES")
profiles = cursor_accounts.fetchall()
for profile in profiles:
current_class = profile["class"]
current_username = profile["username"]
try:
class_int = int(current_class)
new_class_int = class_int - 100
new_class = str(new_class_int)
except:
new_class = current_class
if current_username.endswith(str(current_class)):
new_username = current_username[:-len(str(current_class))] + new_class
else:
new_username = current_username
if isinstance(new_class, str):
try:
new_class_int = int(new_class)
except:
new_class_int = None
else:
new_class_int = None
if new_class_int is None or new_class_int <= 0 or new_username is None:
# Xóa người dùng
cursor_accounts.execute("DELETE FROM PROFILES WHERE username = %s", (current_username,))
cursor_accounts.execute("DELETE FROM USERS WHERE username = %s", (current_username,))
cursor_accounts.execute("DELETE FROM ACCESS WHERE username = %s", (current_username,))
cursor_data.execute("DELETE FROM RECORDS WHERE username = %s", (current_username,))
cursor_data.execute("DELETE FROM IMAGES WHERE id = %s", (current_username,))
else:
cursor_accounts.execute("UPDATE PROFILES SET class = %s, username = %s WHERE username = %s",
(new_class, new_username, current_username))
cursor_accounts.execute("UPDATE USERS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_accounts.execute("UPDATE ACCESS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_data.execute("UPDATE RECORDS SET username = %s WHERE username = %s", (new_username, current_username))
cursor_data.execute("UPDATE IMAGES SET id = %s WHERE id = %s", (new_username, current_username))
return True
except Exception as e:
print("Error in gradedown_all_profiles:", e)
return False
While I'm using it, there are times when I call login but it doesn't save to the database (still returns status: 'success') and no errors occur. Sometimes I restart it and it works then it goes back to normal and now it almost never writes any more data to the database causing all login attempts to fail. Is there a way to fix it? Thanks a lot
I tried testing the database connection, tried running the command manually (works as expected), tried different accounts, restarted linux, restarted the services. But the application is still the same.
I use MariaDB