文章

Flask - 安全增強

目標

  • 安裝並配置 Flask-Talisman
  • 添加安全的 HTTP 頭
  • 測試安全功能

步驟

  1. 準備環境

    • 繼續使用 flask_api/ 項目結構,激活虛擬環境:
      1
      2
      
      # Windows: flask_api_env\Scripts\activate
      # macOS/Linux: source flask_api_env/bin/activate
      
    • 安裝 Flask-Talisman:
      1
      
      pip install flask-talisman
      
  2. 配置 Flask-Talisman

    • 修改 app/init.py,集成安全增強:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      
      from flask import Flask, jsonify, g
      from flask_sqlalchemy import SQLAlchemy
      from flask_marshmallow import Marshmallow
      from flask_bcrypt import Bcrypt
      from flask_restx import Api
      from flask_caching import Cache
      from flask_limiter import Limiter
      from flask_limiter.util import get_remote_address
      from flask_cors import CORS
      from flask_talisman import Talisman  # 新增
      import jwt
      from functools import wraps
      from .routes.v1.todos import todos_bp as todos_v1_bp
      from caes.v1.users import users_bp as users_v1_bp
      from .routes.v1.posts import posts_bp as posts_v1_bp
      from .routes.v2.todos import todos_bp as todos_v2_bp
      from .routes.v2.posts import posts_bp as posts_v2_bp
      from .config import config_map
      from .celery_config import make_celery
      from .monitoring import setup_metrics
      import os
      import logging
      from logging.handlers import RotatingFileHandler
      import time
      
      db = SQLAlchemy()
      ma = Marshmallow()
      bcrypt = Bcrypt()
      cache = Cache()
      limiter = Limiter(key_func=get_remote_address)
      
      def setup_logging(app):
          if not app.debug:
              handler = RotatingFileHandler('app.log', maxBytes=10000, backupCount=3)
              handler.setLevel(logging.INFO)
              formatter = logging.Formatter(
                  '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
              )
              handler.setFormatter(formatter)
              app.logger.addHandler(handler)
          console_handler = logging.StreamHandler()
          console_handler.setLevel(logging.DEBUG)
          console_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
          app.logger.addHandler(console_handler)
          app.logger.setLevel(logging.DEBUG)
      
      def create_app():
          app = Flask(__name__)
          env = os.getenv('FLASK_ENV', 'development')
          app.config.from_object(config_map[env])
      
          if os.getenv('DATABASE_URL'):
              app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL').replace('postgres://', 'postgresql://')
      
          os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
          app.static_folder = app.root_path
      
          # 初始化 Talisman
          talisman = Talisman(
              app,
              force_https=env == 'production',  # 生產環境強制 HTTPS
              strict_transport_security=True,   # HSTS
              content_security_policy={
                  'default-src': "'self'",
                  'img-src': ["'self'", "data:"],  # 允許圖片和 data URI
                  'script-src': "'self'"           # 限制腳本來源
              }
          )
      
          db.init_app(app)
          ma.init_app(app)
          bcrypt.init_app(app)
          cache.init_app(app)
          limiter.init_app(app)
          setup_logging(app)
      
          CORS(app, resources={r"/api/*": {"origins": "*"}})
      
          api = Api(app,
                    title='Blog API',
                    version='1.0',
                    description='A simple blog API with user and post management',
                    doc='/api/docs/',
                    authorizations={
                        'jwt': {
                            'type': 'apiKey',
                            'in': 'header',
                            'name': 'Authorization',
                            'description': 'Enter "Bearer <token>"'
                        }
                    })
      
          global celery
          celery = make_celery(app)
          celery.conf.broker_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
          celery.conf.result_backend = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
      
          app.register_blueprint(todos_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(users_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(posts_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(todos_v2_bp, url_prefix='/api/v2')
          app.register_blueprint(posts_v2_bp, url_prefix='/api/v2')
      
          @app.route('/health')
          def health_check():
              """Health check endpoint"""
              try:
                  db.session.execute('SELECT 1')
                  status = 'healthy'
              except Exception as e:
                  app.logger.error(f'Health check failed: {str(e)}')
                  status = 'unhealthy'
                  return jsonify({'status': status}), 503
              return jsonify({'status': status})
      
          setup_metrics(app)
      
          @app.errorhandler(404)
          def not_found(error):
              app.logger.error(f'404 error: {str(error)}')
              return jsonify({'error': 'Not Found', 'message': str(error)}), 404
      
          @app.errorhandler(400)
          def bad_request(error):
              app.logger.warning(f'400 error: {str(error)}')
              return jsonify({'error': 'Bad Request', 'message': str(error)}), 400
      
          @app.errorhandler(500)
          def internal_error(error):
              app.logger.critical(f'500 error: {str(error)}')
              return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500
      
          with app.app_context():
              db.create_all()
      
          return app
      
      def login_required(f):
          @wraps(f)
          def decorated_function(*args, **kwargs):
              from .models import User
              token = request.headers.get('Authorization')
              if not token:
                  abort(401, description='Missing token')
              try:
                  if token.startswith('Bearer '):
                      token = token[7:]
                  data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
                  user = User.query.get(data['user_id'])
                  if not user:
                      abort(401, description='Invalid token')
                  g.current_user = user
              except jwt.ExpiredSignatureError:
                  abort(401, description='Token has expired')
              except jwt.InvalidTokenError:
                  abort(401, description='Invalid token')
              return f(*args, **kwargs)
          return decorated_function
      
      def admin_required(f):
          @wraps(f)
          @login_required
          def decorated_function(*args, **kwargs):
              if not g.current_user.is_admin:
                  abort(403, description='Admin access required')
              return f(*args, **kwargs)
          return decorated_function
      
      celery = None
      
  3. 加強密碼安全性

    • 修改 app/models.py,增強密碼哈希:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      
      from . import db, bcrypt
      from datetime import datetime
      
      class User(db.Model):
          __tablename__ = 'users'
          id = db.Column(db.Integer, primary_key=True)
          username = db.Column(db.String(50), unique=True, nullable=False)
          password_hash = db.Column(db.String(128), nullable=False)
          role = db.Column(db.String(20), default='user')
          posts = db.relationship('Post', backref='user', lazy=True)
      
          def set_password(self, password):
              # 使用更高的工作因子(12)增強安全性
              self.password_hash = bcrypt.generate_password_hash(password, rounds=12).decode('utf-8')
      
          def check_password(self, password):
              return bcrypt.check_password_hash(self.password_hash, password)
      
          @property
          def is_admin(self):
              return self.role == 'admin'
      
      class Post(db.Model):
          __tablename__ = 'posts'
          id = db.Column(db.Integer, primary_key=True)
          title = db.Column(db.String(100), nullable=False)
          content = db.Column(db.Text, nullable=False)
          created_at = db.Column(db.DateTime, default=datetime.utcnow)
          user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
          category = db.Column(db.String(50), default='general')
          image_path = db.Column(db.String(255))
      
  4. 運行應用

    • 運行:
      1
      
      python run.py
      
  5. 測試安全功能

    • 檢查 HTTP 頭
      • 使用 Postman 或瀏覽器開發者工具,請求 http://localhost:5000/api/v1/posts
        • Content-Security-Policy:應包含 default-src 'self' 等。
        • Strict-Transport-Security:在生產模式下應出現(本地無 HTTPS 不生效)。
        • X-Content-Type-Options:應為 nosniff
    • 測試密碼哈希
      • 創建用戶(POST /api/v1/users),記錄時間,應略有延遲(因更高工作因子)。
    • 生產環境測試
      • 更新 Heroku:
        1
        2
        3
        
        git add .
        git commit -m "Add security enhancements with Talisman"
        git push heroku main
        
      • 訪問 https://my-blog-api.herokuapp.com/api/v1/posts,檢查頭部。
  6. 作業

    • 添加一個自定義 CSRF 令牌檢查(提示:在 login_required 中驗證頭部 X-CSRF-Token)。
    • 配置 Talisman 限制特定域名訪問靜態文件(提示:調整 content_security_policyimg-src)。

注意事項

  • Talisman 在本地開發時可能需要禁用 force_https(設為 False),因無 HTTPS。
  • 高工作因子的 bcrypt 會增加計算開銷,需平衡安全與性能。
  • 生產環境應始終使用 HTTPS,Heroku 默認支持。
  • CSP 可能影響前端調試,根據實際需求調整。

本文章以 CC BY 4.0 授權