文章

Flask - 密碼哈希

目標

  • 安裝並配置 Flask-Bcrypt
  • 將密碼存儲為哈希值
  • 更新認證邏輯以驗證哈希密碼
  • 實現密碼策略和強度檢查
  • 實現密碼重置功能
  • 實現密碼更改功能

步驟

  1. 準備環境

    • 繼續使用 flask_api/ 項目結構,激活虛擬環境:
      1
      2
      
      # Windows: flask_api_env\Scripts\activate
      # macOS/Linux: source flask_api_env/bin/activate
      
    • 安裝 Flask-Bcrypt 和密碼強度檢查庫:
      1
      
      pip install flask-bcrypt zxcvbn
      
  2. 配置 Flask-Bcrypt

    • 修改 app/init.py,初始化 Bcrypt:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      
      from flask import Flask, jsonify, g
      from flask_sqlalchemy import SQLAlchemy
      from flask_marshmallow import Marshmallow
      from flask_bcrypt import Bcrypt  # 新增
      import jwt
      from functools import wraps
      from .routes.v1.todos import todos_bp as todos_v1_bp
      from .routes.v1.users import users_bp as users_v1_bp
      from .routes.v1.posts import posts_bp as posts_v1_bp
      from .routes.v2.todos import todos_bp as todos_v2_bp
      from .routes.v2.posts import posts_bp as posts_v2_bp
      from .config import config_map
      import os
      
      db = SQLAlchemy()
      ma = Marshmallow()
      bcrypt = Bcrypt()  # 初始化 Bcrypt
      
      def create_app():
          app = Flask(__name__)
          env = os.getenv('FLASK_ENV', 'development')
          app.config.from_object(config_map[env])
          db.init_app(app)
          ma.init_app(app)
          bcrypt.init_app(app)  # 初始化 Bcrypt
      
          app.register_blueprint(todos_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(users_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(posts_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(todos_v2_bp, url_prefix='/api/v2')
          app.register_blueprint(posts_v2_bp, url_prefix='/api/v2')
      
          @app.errorhandler(404)
          def not_found(error):
              return jsonify({'error': 'Not Found', 'message': str(error)}), 404
      
          @app.errorhandler(400)
          def bad_request(error):
              return jsonify({'error': 'Bad Request', 'message': str(error)}), 400
      
          @app.errorhandler(500)
          def internal_error(error):
              return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500
      
          with app.app_context():
              db.create_all()
      
          return app
      
  3. 更新模型

    • 修改 app/models.py,調整密碼字段為哈希值:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      
      from . import db, bcrypt
      from datetime import datetime
      import zxcvbn
      import re
      
      class User(db.Model):
          __tablename__ = 'users'
          id = db.Column(db.Integer, primary_key=True)
          username = db.Column(db.String(50), unique=True, nullable=False)
          email = db.Column(db.String(120), unique=True, nullable=False)
          password_hash = db.Column(db.String(128), nullable=False)
          password_reset_token = db.Column(db.String(100), unique=True)
          password_reset_expires = db.Column(db.DateTime)
          last_password_change = db.Column(db.DateTime, default=datetime.utcnow)
          failed_login_attempts = db.Column(db.Integer, default=0)
          account_locked_until = db.Column(db.DateTime)
          posts = db.relationship('Post', backref='user', lazy=True)
      
          def set_password(self, password):
              """設置密碼並進行強度檢查"""
              if not self._check_password_strength(password):
                  raise ValueError('Password does not meet requirements')
              self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
              self.last_password_change = datetime.utcnow()
      
          def check_password(self, password):
              """驗證密碼"""
              return bcrypt.check_password_hash(self.password_hash, password)
      
          def _check_password_strength(self, password):
              """檢查密碼強度"""
              # 使用 zxcvbn 檢查密碼強度
              result = zxcvbn.zxcvbn(password)
              if result['score'] < 3:  # 密碼強度分數低於 3
                  return False
      
              # 檢查密碼長度
              if len(password) < 8:
                  return False
      
              # 檢查是否包含數字
              if not re.search(r'\d', password):
                  return False
      
              # 檢查是否包含大寫字母
              if not re.search(r'[A-Z]', password):
                  return False
      
              # 檢查是否包含小寫字母
              if not re.search(r'[a-z]', password):
                  return False
      
              # 檢查是否包含特殊字符
              if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
                  return False
      
              return True
      
          def generate_password_reset_token(self):
              """生成密碼重置令牌"""
              import secrets
              self.password_reset_token = secrets.token_urlsafe(32)
              self.password_reset_expires = datetime.utcnow() + timedelta(hours=1)
              db.session.commit()
              return self.password_reset_token
      
          def verify_password_reset_token(self, token):
              """驗證密碼重置令牌"""
              if self.password_reset_token != token:
                  return False
              if datetime.utcnow() > self.password_reset_expires:
                  return False
              return True
      
          def increment_failed_login_attempts(self):
              """增加登錄失敗次數"""
              self.failed_login_attempts += 1
              if self.failed_login_attempts >= 5:
                  self.account_locked_until = datetime.utcnow() + timedelta(minutes=30)
              db.session.commit()
      
          def reset_failed_login_attempts(self):
              """重置登錄失敗次數"""
              self.failed_login_attempts = 0
              self.account_locked_until = None
              db.session.commit()
      
          def is_account_locked(self):
              """檢查賬戶是否被鎖定"""
              if self.account_locked_until and datetime.utcnow() < self.account_locked_until:
                  return True
              return False
      
      class Post(db.Model):
          __tablename__ = 'posts'
          id = db.Column(db.Integer, primary_key=True)
          title = db.Column(db.String(100), nullable=False)
          content = db.Column(db.Text, nullable=False)
          created_at = db.Column(db.DateTime, default=datetime.utcnow)
          user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
          category = db.Column(db.String(50), default='general')
      
  4. 更新用戶路由

    • 修改 app/routes/v1/users.py,添加密碼相關功能:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      
      from flask import Blueprint, jsonify, request, abort, current_app
      from ...models import User
      from ... import db
      from ...schemas import user_schema, users_schema
      from ... import create_access_token, create_refresh_token
      from datetime import datetime, timedelta
      import smtplib
      from email.mime.text import MIMEText
      from email.mime.multipart import MIMEMultipart
      
      users_bp = Blueprint('users_v1', __name__)
      
      def send_password_reset_email(user, token):
          """發送密碼重置郵件"""
          msg = MIMEMultipart()
          msg['From'] = current_app.config['MAIL_USERNAME']
          msg['To'] = user.email
          msg['Subject'] = 'Password Reset Request'
      
          reset_url = f"{current_app.config['FRONTEND_URL']}/reset-password?token={token}"
          body = f"""
          <html>
              <body>
                  <p>Hello {user.username},</p>
                  <p>You requested a password reset. Click the link below to reset your password:</p>
                  <p><a href="{reset_url}">Reset Password</a></p>
                  <p>This link will expire in 1 hour.</p>
                  <p>If you did not request this, please ignore this email.</p>
              </body>
          </html>
          """
          msg.attach(MIMEText(body, 'html'))
      
          try:
              server = smtplib.SMTP(current_app.config['MAIL_SERVER'], current_app.config['MAIL_PORT'])
              server.starttls()
              server.login(current_app.config['MAIL_USERNAME'], current_app.config['MAIL_PASSWORD'])
              server.send_message(msg)
              server.quit()
          except Exception as e:
              current_app.logger.error(f'Failed to send email: {str(e)}')
              raise
      
      @users_bp.route('/users', methods=['GET'])
      def get_users():
          users = User.query.all()
          return jsonify({'users': users_schema.dump(users)})
      
      @users_bp.route('/users', methods=['POST'])
      def create_user():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'username' not in data or 'password' not in data or 'email' not in data:
              abort(400, description='Missing username, password, or email')
          if User.query.filter_by(username=data['username']).first():
              abort(400, description='Username already exists')
          if User.query.filter_by(email=data['email']).first():
              abort(400, description='Email already exists')
               
          try:
              user = User(username=data['username'], email=data['email'])
              user.set_password(data['password'])
              db.session.add(user)
              db.session.commit()
              return jsonify(user_schema.dump(user)), 201
          except ValueError as e:
              abort(400, description=str(e))
      
      @users_bp.route('/login', methods=['POST'])
      def login():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'username' not in data or 'password' not in data:
              abort(400, description='Missing username or password')
               
          user = User.query.filter_by(username=data['username']).first()
          if not user:
              abort(401, description='Invalid credentials')
               
          if user.is_account_locked():
              abort(401, description='Account is locked. Please try again later.')
               
          if not user.check_password(data['password']):
              user.increment_failed_login_attempts()
              abort(401, description='Invalid credentials')
               
          user.reset_failed_login_attempts()
          access_token = create_access_token(user.id)
          refresh_token = create_refresh_token(user.id)
               
          return jsonify({
              'access_token': access_token,
              'refresh_token': refresh_token,
              'token_type': 'Bearer',
              'expires_in': current_app.config['JWT_ACCESS_TOKEN_EXPIRES']
          })
      
      @users_bp.route('/forgot-password', methods=['POST'])
      def forgot_password():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'email' not in data:
              abort(400, description='Missing email')
               
          user = User.query.filter_by(email=data['email']).first()
          if user:
              token = user.generate_password_reset_token()
              send_password_reset_email(user, token)
               
          return jsonify({'message': 'If an account exists with this email, a password reset link has been sent.'})
      
      @users_bp.route('/reset-password', methods=['POST'])
      def reset_password():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'token' not in data or 'password' not in data:
              abort(400, description='Missing token or password')
               
          user = User.query.filter_by(password_reset_token=data['token']).first()
          if not user or not user.verify_password_reset_token(data['token']):
              abort(400, description='Invalid or expired token')
               
          try:
              user.set_password(data['password'])
              user.password_reset_token = None
              user.password_reset_expires = None
              db.session.commit()
              return jsonify({'message': 'Password has been reset successfully'})
          except ValueError as e:
              abort(400, description=str(e))
      
      @users_bp.route('/change-password', methods=['POST'])
      @login_required
      def change_password():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'current_password' not in data or 'new_password' not in data:
              abort(400, description='Missing current or new password')
               
          if not g.current_user.check_password(data['current_password']):
              abort(401, description='Current password is incorrect')
               
          try:
              g.current_user.set_password(data['new_password'])
              db.session.commit()
              return jsonify({'message': 'Password has been changed successfully'})
          except ValueError as e:
              abort(400, description=str(e))
      
  5. 更新配置

    • 修改 app/config.py,添加郵件配置:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      
      import os
      from dotenv import load_dotenv
      
      load_dotenv()
      
      class Config:
          SECRET_KEY = os.getenv('SECRET_KEY', 'default-secret-key')
          JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'jwt-secret-key')
          JWT_ACCESS_TOKEN_EXPIRES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRES', 3600))
          JWT_REFRESH_TOKEN_EXPIRES = int(os.getenv('JWT_REFRESH_TOKEN_EXPIRES', 604800))
          SQLALCHEMY_TRACK_MODIFICATIONS = False
          JWT_BLACKLIST_ENABLED = True
          JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
               
          # 郵件配置
          MAIL_SERVER = os.getenv('MAIL_SERVER', 'smtp.gmail.com')
          MAIL_PORT = int(os.getenv('MAIL_PORT', 587))
          MAIL_USE_TLS = True
          MAIL_USERNAME = os.getenv('MAIL_USERNAME')
          MAIL_PASSWORD = os.getenv('MAIL_PASSWORD')
          FRONTEND_URL = os.getenv('FRONTEND_URL', 'http://localhost:3000')
      
  6. 運行應用

    • 刪除舊的 blog.db(因表結構改變),運行:
      1
      
      python run.py
      
  7. 測試 API

    • 使用 Postman 測試:
      • POST /api/v1/users
        • Body:{"username": "alice", "password": "StrongP@ss123", "email": "alice@example.com"}
        • 預期響應:201
      • POST /api/v1/login
        • Body:{"username": "alice", "password": "StrongP@ss123"}
        • 預期響應:包含 access_token 和 refresh_token
      • POST /api/v1/forgot-password
        • Body:{"email": "alice@example.com"}
        • 預期響應:成功消息
      • POST /api/v1/reset-password
        • Body:{"token": "...", "password": "NewP@ss123"}
        • 預期響應:成功消息
      • POST /api/v1/change-password
        • Headers:Authorization: Bearer <access_token>
        • Body:{"current_password": "StrongP@ss123", "new_password": "NewP@ss123"}
        • 預期響應:成功消息

安全最佳實踐

  1. 密碼策略
    • 使用強密碼要求
    • 實現密碼強度檢查
    • 定期強制更改密碼
    • 防止密碼重用
  2. 賬戶安全
    • 實現登錄失敗限制
    • 實現賬戶鎖定機制
    • 記錄安全相關事件
    • 實現密碼重置流程
  3. 數據保護
    • 使用安全的哈希算法
    • 使用鹽值增加安全性
    • 保護敏感信息
    • 實現數據加密
  4. 錯誤處理
    • 提供清晰的錯誤消息
    • 記錄安全相關錯誤
    • 實現速率限制
    • 防止暴力破解

注意事項

  • 生產環境應使用更強的密鑰
  • 定期清理過期的重置令牌
  • 實現雙因素認證
  • 使用 HTTPS 保護通信
  • 定期審計安全日誌
  • 實現密碼策略配置
  • 考慮使用 OAuth2 或 OpenID Connect

本文章以 CC BY 4.0 授權