Flask - 密碼哈希
目標
- 安裝並配置 Flask-Bcrypt
- 將密碼存儲為哈希值
- 更新認證邏輯以驗證哈希密碼
- 實現密碼策略和強度檢查
- 實現密碼重置功能
- 實現密碼更改功能
步驟
準備環境
- 繼續使用
flask_api/
項目結構,激活虛擬環境:1 2
# Windows: flask_api_env\Scripts\activate # macOS/Linux: source flask_api_env/bin/activate
- 安裝 Flask-Bcrypt 和密碼強度檢查庫:
1
pip install flask-bcrypt zxcvbn
- 繼續使用
配置 Flask-Bcrypt
修改 app/init.py,初始化 Bcrypt:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
from flask import Flask, jsonify, g from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow from flask_bcrypt import Bcrypt # 新增 import jwt from functools import wraps from .routes.v1.todos import todos_bp as todos_v1_bp from .routes.v1.users import users_bp as users_v1_bp from .routes.v1.posts import posts_bp as posts_v1_bp from .routes.v2.todos import todos_bp as todos_v2_bp from .routes.v2.posts import posts_bp as posts_v2_bp from .config import config_map import os db = SQLAlchemy() ma = Marshmallow() bcrypt = Bcrypt() # 初始化 Bcrypt def create_app(): app = Flask(__name__) env = os.getenv('FLASK_ENV', 'development') app.config.from_object(config_map[env]) db.init_app(app) ma.init_app(app) bcrypt.init_app(app) # 初始化 Bcrypt app.register_blueprint(todos_v1_bp, url_prefix='/api/v1') app.register_blueprint(users_v1_bp, url_prefix='/api/v1') app.register_blueprint(posts_v1_bp, url_prefix='/api/v1') app.register_blueprint(todos_v2_bp, url_prefix='/api/v2') app.register_blueprint(posts_v2_bp, url_prefix='/api/v2') @app.errorhandler(404) def not_found(error): return jsonify({'error': 'Not Found', 'message': str(error)}), 404 @app.errorhandler(400) def bad_request(error): return jsonify({'error': 'Bad Request', 'message': str(error)}), 400 @app.errorhandler(500) def internal_error(error): return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500 with app.app_context(): db.create_all() return app
更新模型
修改 app/models.py,調整密碼字段為哈希值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
from . import db, bcrypt from datetime import datetime import zxcvbn import re class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(50), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) password_reset_token = db.Column(db.String(100), unique=True) password_reset_expires = db.Column(db.DateTime) last_password_change = db.Column(db.DateTime, default=datetime.utcnow) failed_login_attempts = db.Column(db.Integer, default=0) account_locked_until = db.Column(db.DateTime) posts = db.relationship('Post', backref='user', lazy=True) def set_password(self, password): """設置密碼並進行強度檢查""" if not self._check_password_strength(password): raise ValueError('Password does not meet requirements') self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8') self.last_password_change = datetime.utcnow() def check_password(self, password): """驗證密碼""" return bcrypt.check_password_hash(self.password_hash, password) def _check_password_strength(self, password): """檢查密碼強度""" # 使用 zxcvbn 檢查密碼強度 result = zxcvbn.zxcvbn(password) if result['score'] < 3: # 密碼強度分數低於 3 return False # 檢查密碼長度 if len(password) < 8: return False # 檢查是否包含數字 if not re.search(r'\d', password): return False # 檢查是否包含大寫字母 if not re.search(r'[A-Z]', password): return False # 檢查是否包含小寫字母 if not re.search(r'[a-z]', password): return False # 檢查是否包含特殊字符 if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): return False return True def generate_password_reset_token(self): """生成密碼重置令牌""" import secrets self.password_reset_token = secrets.token_urlsafe(32) self.password_reset_expires = datetime.utcnow() + timedelta(hours=1) db.session.commit() return self.password_reset_token def verify_password_reset_token(self, token): """驗證密碼重置令牌""" if self.password_reset_token != token: return False if datetime.utcnow() > self.password_reset_expires: return False return True def increment_failed_login_attempts(self): """增加登錄失敗次數""" self.failed_login_attempts += 1 if self.failed_login_attempts >= 5: self.account_locked_until = datetime.utcnow() + timedelta(minutes=30) db.session.commit() def reset_failed_login_attempts(self): """重置登錄失敗次數""" self.failed_login_attempts = 0 self.account_locked_until = None db.session.commit() def is_account_locked(self): """檢查賬戶是否被鎖定""" if self.account_locked_until and datetime.utcnow() < self.account_locked_until: return True return False class Post(db.Model): __tablename__ = 'posts' id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) content = db.Column(db.Text, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) category = db.Column(db.String(50), default='general')
更新用戶路由
修改 app/routes/v1/users.py,添加密碼相關功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
from flask import Blueprint, jsonify, request, abort, current_app from ...models import User from ... import db from ...schemas import user_schema, users_schema from ... import create_access_token, create_refresh_token from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart users_bp = Blueprint('users_v1', __name__) def send_password_reset_email(user, token): """發送密碼重置郵件""" msg = MIMEMultipart() msg['From'] = current_app.config['MAIL_USERNAME'] msg['To'] = user.email msg['Subject'] = 'Password Reset Request' reset_url = f"{current_app.config['FRONTEND_URL']}/reset-password?token={token}" body = f""" <html> <body> <p>Hello {user.username},</p> <p>You requested a password reset. Click the link below to reset your password:</p> <p><a href="{reset_url}">Reset Password</a></p> <p>This link will expire in 1 hour.</p> <p>If you did not request this, please ignore this email.</p> </body> </html> """ msg.attach(MIMEText(body, 'html')) try: server = smtplib.SMTP(current_app.config['MAIL_SERVER'], current_app.config['MAIL_PORT']) server.starttls() server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD']) server.send_message(msg) server.quit() except Exception as e: current_app.logger.error(f'Failed to send email: {str(e)}') raise @users_bp.route('/users', methods=['GET']) def get_users(): users = User.query.all() return jsonify({'users': users_schema.dump(users)}) @users_bp.route('/users', methods=['POST']) def create_user(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data or 'email' not in data: abort(400, description='Missing username, password, or email') if User.query.filter_by(username=data['username']).first(): abort(400, description='Username already exists') if User.query.filter_by(email=data['email']).first(): abort(400, description='Email already exists') try: user = User(username=data['username'], email=data['email']) user.set_password(data['password']) db.session.add(user) db.session.commit() return jsonify(user_schema.dump(user)), 201 except ValueError as e: abort(400, description=str(e)) @users_bp.route('/login', methods=['POST']) def login(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data: abort(400, description='Missing username or password') user = User.query.filter_by(username=data['username']).first() if not user: abort(401, description='Invalid credentials') if user.is_account_locked(): abort(401, description='Account is locked. Please try again later.') if not user.check_password(data['password']): user.increment_failed_login_attempts() abort(401, description='Invalid credentials') user.reset_failed_login_attempts() access_token = create_access_token(user.id) refresh_token = create_refresh_token(user.id) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'token_type': 'Bearer', 'expires_in': current_app.config['JWT_ACCESS_TOKEN_EXPIRES'] }) @users_bp.route('/forgot-password', methods=['POST']) def forgot_password(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'email' not in data: abort(400, description='Missing email') user = User.query.filter_by(email=data['email']).first() if user: token = user.generate_password_reset_token() send_password_reset_email(user, token) return jsonify({'message': 'If an account exists with this email, a password reset link has been sent.'}) @users_bp.route('/reset-password', methods=['POST']) def reset_password(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'token' not in data or 'password' not in data: abort(400, description='Missing token or password') user = User.query.filter_by(password_reset_token=data['token']).first() if not user or not user.verify_password_reset_token(data['token']): abort(400, description='Invalid or expired token') try: user.set_password(data['password']) user.password_reset_token = None user.password_reset_expires = None db.session.commit() return jsonify({'message': 'Password has been reset successfully'}) except ValueError as e: abort(400, description=str(e)) @users_bp.route('/change-password', methods=['POST']) @login_required def change_password(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'current_password' not in data or 'new_password' not in data: abort(400, description='Missing current or new password') if not g.current_user.check_password(data['current_password']): abort(401, description='Current password is incorrect') try: g.current_user.set_password(data['new_password']) db.session.commit() return jsonify({'message': 'Password has been changed successfully'}) except ValueError as e: abort(400, description=str(e))
更新配置
修改 app/config.py,添加郵件配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
import os from dotenv import load_dotenv load_dotenv() class Config: SECRET_KEY = os.getenv('SECRET_KEY', 'default-secret-key') JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'jwt-secret-key') JWT_ACCESS_TOKEN_EXPIRES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRES', 3600)) JWT_REFRESH_TOKEN_EXPIRES = int(os.getenv('JWT_REFRESH_TOKEN_EXPIRES', 604800)) SQLALCHEMY_TRACK_MODIFICATIONS = False JWT_BLACKLIST_ENABLED = True JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh'] # 郵件配置 MAIL_SERVER = os.getenv('MAIL_SERVER', 'smtp.gmail.com') MAIL_PORT = int(os.getenv('MAIL_PORT', 587)) MAIL_USE_TLS = True MAIL_USERNAME = os.getenv('MAIL_USERNAME') MAIL_PASSWORD = os.getenv('MAIL_PASSWORD') FRONTEND_URL = os.getenv('FRONTEND_URL', 'http://localhost:3000')
運行應用
- 刪除舊的
blog.db
(因表結構改變),運行:1
python run.py
- 刪除舊的
測試 API
- 使用 Postman 測試:
- POST /api/v1/users:
- Body:
{"username": "alice", "password": "StrongP@ss123", "email": "alice@example.com"}
- 預期響應:201
- Body:
- POST /api/v1/login:
- Body:
{"username": "alice", "password": "StrongP@ss123"}
- 預期響應:包含 access_token 和 refresh_token
- Body:
- POST /api/v1/forgot-password:
- Body:
{"email": "alice@example.com"}
- 預期響應:成功消息
- Body:
- POST /api/v1/reset-password:
- Body:
{"token": "...", "password": "NewP@ss123"}
- 預期響應:成功消息
- Body:
- POST /api/v1/change-password:
- Headers:
Authorization: Bearer <access_token>
- Body:
{"current_password": "StrongP@ss123", "new_password": "NewP@ss123"}
- 預期響應:成功消息
- Headers:
- POST /api/v1/users:
- 使用 Postman 測試:
安全最佳實踐
- 密碼策略
- 使用強密碼要求
- 實現密碼強度檢查
- 定期強制更改密碼
- 防止密碼重用
- 賬戶安全
- 實現登錄失敗限制
- 實現賬戶鎖定機制
- 記錄安全相關事件
- 實現密碼重置流程
- 數據保護
- 使用安全的哈希算法
- 使用鹽值增加安全性
- 保護敏感信息
- 實現數據加密
- 錯誤處理
- 提供清晰的錯誤消息
- 記錄安全相關錯誤
- 實現速率限制
- 防止暴力破解
注意事項
- 生產環境應使用更強的密鑰
- 定期清理過期的重置令牌
- 實現雙因素認證
- 使用 HTTPS 保護通信
- 定期審計安全日誌
- 實現密碼策略配置
- 考慮使用 OAuth2 或 OpenID Connect
本文章以 CC BY 4.0 授權