Flask - 安全增強
目標
- 安裝並配置 Flask-Talisman
- 添加安全的 HTTP 頭
- 測試安全功能
步驟
準備環境
- 繼續使用
flask_api/
項目結構,激活虛擬環境:1 2
# Windows: flask_api_env\Scripts\activate # macOS/Linux: source flask_api_env/bin/activate
- 安裝 Flask-Talisman:
1
pip install flask-talisman
- 繼續使用
配置 Flask-Talisman
修改 app/init.py,集成安全增強:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
from flask import Flask, jsonify, g from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow from flask_bcrypt import Bcrypt from flask_restx import Api from flask_caching import Cache from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_cors import CORS from flask_talisman import Talisman # 新增 import jwt from functools import wraps from .routes.v1.todos import todos_bp as todos_v1_bp from caes.v1.users import users_bp as users_v1_bp from .routes.v1.posts import posts_bp as posts_v1_bp from .routes.v2.todos import todos_bp as todos_v2_bp from .routes.v2.posts import posts_bp as posts_v2_bp from .config import config_map from .celery_config import make_celery from .monitoring import setup_metrics import os import logging from logging.handlers import RotatingFileHandler import time db = SQLAlchemy() ma = Marshmallow() bcrypt = Bcrypt() cache = Cache() limiter = Limiter(key_func=get_remote_address) def setup_logging(app): if not app.debug: handler = RotatingFileHandler('app.log', maxBytes=10000, backupCount=3) handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' ) handler.setFormatter(formatter) app.logger.addHandler(handler) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s')) app.logger.addHandler(console_handler) app.logger.setLevel(logging.DEBUG) def create_app(): app = Flask(__name__) env = os.getenv('FLASK_ENV', 'development') app.config.from_object(config_map[env]) if os.getenv('DATABASE_URL'): app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL').replace('postgres://', 'postgresql://') os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) app.static_folder = app.root_path # 初始化 Talisman talisman = Talisman( app, force_https=env == 'production', # 生產環境強制 HTTPS strict_transport_security=True, # HSTS content_security_policy={ 'default-src': "'self'", 'img-src': ["'self'", "data:"], # 允許圖片和 data URI 'script-src': "'self'" # 限制腳本來源 } ) db.init_app(app) ma.init_app(app) bcrypt.init_app(app) cache.init_app(app) limiter.init_app(app) setup_logging(app) CORS(app, resources={r"/api/*": {"origins": "*"}}) api = Api(app, title='Blog API', version='1.0', description='A simple blog API with user and post management', doc='/api/docs/', authorizations={ 'jwt': { 'type': 'apiKey', 'in': 'header', 'name': 'Authorization', 'description': 'Enter "Bearer <token>"' } }) global celery celery = make_celery(app) celery.conf.broker_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') celery.conf.result_backend = os.getenv('REDIS_URL', 'redis://localhost:6379/0') app.register_blueprint(todos_v1_bp, url_prefix='/api/v1') app.register_blueprint(users_v1_bp, url_prefix='/api/v1') app.register_blueprint(posts_v1_bp, url_prefix='/api/v1') app.register_blueprint(todos_v2_bp, url_prefix='/api/v2') app.register_blueprint(posts_v2_bp, url_prefix='/api/v2') @app.route('/health') def health_check(): """Health check endpoint""" try: db.session.execute('SELECT 1') status = 'healthy' except Exception as e: app.logger.error(f'Health check failed: {str(e)}') status = 'unhealthy' return jsonify({'status': status}), 503 return jsonify({'status': status}) setup_metrics(app) @app.errorhandler(404) def not_found(error): app.logger.error(f'404 error: {str(error)}') return jsonify({'error': 'Not Found', 'message': str(error)}), 404 @app.errorhandler(400) def bad_request(error): app.logger.warning(f'400 error: {str(error)}') return jsonify({'error': 'Bad Request', 'message': str(error)}), 400 @app.errorhandler(500) def internal_error(error): app.logger.critical(f'500 error: {str(error)}') return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500 with app.app_context(): db.create_all() return app def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): from .models import User token = request.headers.get('Authorization') if not token: abort(401, description='Missing token') try: if token.startswith('Bearer '): token = token[7:] data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) user = User.query.get(data['user_id']) if not user: abort(401, description='Invalid token') g.current_user = user except jwt.ExpiredSignatureError: abort(401, description='Token has expired') except jwt.InvalidTokenError: abort(401, description='Invalid token') return f(*args, **kwargs) return decorated_function def admin_required(f): @wraps(f) @login_required def decorated_function(*args, **kwargs): if not g.current_user.is_admin: abort(403, description='Admin access required') return f(*args, **kwargs) return decorated_function celery = None
加強密碼安全性
修改 app/models.py,增強密碼哈希:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
from . import db, bcrypt from datetime import datetime class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(50), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) role = db.Column(db.String(20), default='user') posts = db.relationship('Post', backref='user', lazy=True) def set_password(self, password): # 使用更高的工作因子(12)增強安全性 self.password_hash = bcrypt.generate_password_hash(password, rounds=12).decode('utf-8') def check_password(self, password): return bcrypt.check_password_hash(self.password_hash, password) @property def is_admin(self): return self.role == 'admin' class Post(db.Model): __tablename__ = 'posts' id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) content = db.Column(db.Text, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) category = db.Column(db.String(50), default='general') image_path = db.Column(db.String(255))
運行應用
- 運行:
1
python run.py
- 運行:
測試安全功能
- 檢查 HTTP 頭:
- 使用 Postman 或瀏覽器開發者工具,請求
http://localhost:5000/api/v1/posts
:Content-Security-Policy
:應包含default-src 'self'
等。Strict-Transport-Security
:在生產模式下應出現(本地無 HTTPS 不生效)。X-Content-Type-Options
:應為nosniff
。
- 使用 Postman 或瀏覽器開發者工具,請求
- 測試密碼哈希:
- 創建用戶(
POST /api/v1/users
),記錄時間,應略有延遲(因更高工作因子)。
- 創建用戶(
- 生產環境測試:
- 更新 Heroku:
1 2 3
git add . git commit -m "Add security enhancements with Talisman" git push heroku main
- 訪問
https://my-blog-api.herokuapp.com/api/v1/posts
,檢查頭部。
- 更新 Heroku:
- 檢查 HTTP 頭:
作業
- 添加一個自定義 CSRF 令牌檢查(提示:在
login_required
中驗證頭部X-CSRF-Token
)。 - 配置 Talisman 限制特定域名訪問靜態文件(提示:調整
content_security_policy
的img-src
)。
- 添加一個自定義 CSRF 令牌檢查(提示:在
注意事項
- Talisman 在本地開發時可能需要禁用
force_https
(設為False
),因無 HTTPS。 - 高工作因子的 bcrypt 會增加計算開銷,需平衡安全與性能。
- 生產環境應始終使用 HTTPS,Heroku 默認支持。
- CSP 可能影響前端調試,根據實際需求調整。
本文章以 CC BY 4.0 授權