Flask - JWT 認證
目標
- 安裝並配置 PyJWT
- 實現基於 JWT 的認證
- 更新Blog API 使用 token 進行身份驗證
步驟
準備環境
- 繼續使用
flask_api/
項目結構,激活虛擬環境:1 2
# Windows: flask_api_env\Scripts\activate # macOS/Linux: source flask_api_env/bin/activate
- 安裝 PyJWT:
1
pip install pyjwt
- 繼續使用
移除 Flask-HTTPAuth
- 我們將替換 Flask-HTTPAuth,因此先移除相關依賴,但保留密碼字段。
更新應用配置
修改 app/config.py,確保
SECRET_KEY
可用於 JWT 簽名:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import os from dotenv import load_dotenv load_dotenv() class Config: SECRET_KEY = os.getenv('SECRET_KEY', 'default-secret-key') SQLALCHEMY_TRACK_MODIFICATIONS = False class DevelopmentConfig(Config): DEBUG = True SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL', 'sqlite:///blog_dev.db') class TestingConfig(Config): TESTING = True SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL', 'sqlite:///blog_test.db') WTF_CSRF_ENABLED = False class ProductionConfig(Config): DEBUG = False SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL', 'sqlite:///blog_prod.db') config_map = { 'development': DevelopmentConfig, 'testing': TestingConfig, 'production': ProductionConfig }
確保
.env
中有SECRET_KEY
:1 2 3
FLASK_ENV=development SECRET_KEY=your-very-secret-key DATABASE_URL=sqlite:///blog.db
實現 JWT 認證
修改 app/init.py,添加 JWT 工具函數:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
from flask import Flask, jsonify, g from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow import jwt from functools import wraps from .routes.v1.todos import todos_bp as todos_v1_bp from .routes.v1.users import users_bp as users_v1_bp from .routes.v1.posts import posts_bp as posts_v1_bp from .routes.v2.todos import todos_bp as todos_v2_bp from .routes.v2.posts import posts_bp as posts_v2_bp from .config import config_map import os db = SQLAlchemy() ma = Marshmallow() def create_app(): app = Flask(__name__) env = os.getenv('FLASK_ENV', 'development') app.config.from_object(config_map[env]) db.init_app(app) ma.init_app(app) app.register_blueprint(todos_v1_bp, url_prefix='/api/v1') app.register_blueprint(users_v1_bp, url_prefix='/api/v1') app.register_blueprint(posts_v1_bp, url_prefix='/api/v1') app.register_blueprint(todos_v2_bp, url_prefix='/api/v2') app.register_blueprint(posts_v2_bp, url_prefix='/api/v2') @app.errorhandler(404) def not_found(error): return jsonify({'error': 'Not Found', 'message': str(error)}), 404 @app.errorhandler(400) def bad_request(error): return jsonify({'error': 'Bad Request', 'message': str(error)}), 400 @app.errorhandler(500) def internal_error(error): return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500 with app.app_context(): db.create_all() return app # JWT 認證裝飾器 def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): from .models import User # 避免循環導入 token = request.headers.get('Authorization') if not token: abort(401, description='Missing token') try: if token.startswith('Bearer '): token = token[7:] # 移除 "Bearer " 前綴 data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) user = User.query.get(data['user_id']) if not user: abort(401, description='Invalid token') g.current_user = user # 存儲當前用戶 except jwt.ExpiredSignatureError: abort(401, description='Token has expired') except jwt.InvalidTokenError: abort(401, description='Invalid token') return f(*args, **kwargs) return decorated_function
添加登錄端點
修改 app/routes/v1/users.py,添加生成 JWT 的登錄路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
from flask import Blueprint, jsonify, request, abort from ...models import User from ... import db from ...schemas import user_schema, users_schema import jwt import datetime users_bp = Blueprint('users_v1', __name__) @users_bp.route('/users', methods=['GET']) def get_users(): users = User.query.all() return jsonify({'users': users_schema.dump(users)}) @users_bp.route('/users', methods=['POST']) def create_user(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data: abort(400, description='Missing username or password') if User.query.filter_by(username=data['username']).first(): abort(400, description='Username already exists') user = User(username=data['username'], password=data['password']) db.session.add(user) db.session.commit() return jsonify(user_schema.dump(user)), 201 @users_bp.route('/login', methods=['POST']) def login(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data: abort(400, description='Missing username or password') user = User.query.filter_by(username=data['username']).first() if not user or user.password != data['password']: abort(401, description='Invalid credentials') token = jwt.encode({ 'user_id': user.id, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24) # Token 有效期 24 小時 }, users_bp.app.config['SECRET_KEY'], algorithm='HS256') return jsonify({'token': token})
保護路由
修改 app/routes/v1/posts.py,使用 JWT 認證:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
from flask import Blueprint, jsonify, request, abort, g from ...models import Post, User from ... import db, login_required from ...schemas import post_schema, posts_schema posts_bp = Blueprint('posts_v1', __name__) @posts_bp.route('/posts', methods=['GET']) def get_posts(): user_id = request.args.get('user_id', type=int) query = Post.query if user_id: query = query.filter_by(user_id=user_id) posts = query.all() return jsonify({'posts': posts_schema.dump(posts)}) @posts_bp.route('/posts/<int:post_id>', methods=['GET']) def get_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') return jsonify(post_schema.dump(post)) @posts_bp.route('/posts', methods=['POST']) @login_required def create_post(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'title' not in data or 'content' not in data: abort(400, description='Missing title or content') post = Post( title=data['title'], content=data['content'], user_id=g.current_user.id # 使用當前認證用戶 ) db.session.add(post) db.session.commit() return jsonify(post_schema.dump(post)), 201 @posts_bp.route('/posts/<int:post_id>', methods=['PUT']) @login_required def update_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') if post.user_id != g.current_user.id: abort(403, description='You can only edit your own posts') if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'title' in data: post.title = data['title'] if 'content' in data: post.content = data['content'] db.session.commit() return jsonify(post_schema.dump(post)), 200 @posts_bp.route('/posts/<int:post_id>', methods=['DELETE']) @login_required def delete_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') if post.user_id != g.current_user.id: abort(403, description='You can only delete your own posts') db.session.delete(post) db.session.commit() return jsonify({'message': 'Post deleted'}), 200
運行應用
- 運行:
1
python run.py
- 運行:
測試 API
- 使用 Postman 測試:
- POST /api/v1/users:
- Body:
{"username": "alice", "password": "1234"}
- Body:
- POST /api/v1/login:
- Body:
{"username": "alice", "password": "1234"}
- 預期響應:
{"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."}
- Body:
- POST /api/v1/posts(未認證):
- Body:
{"title": "My Post", "content": "Hello"}
- 預期響應:401
- Body:
- POST /api/v1/posts(認證):
- Headers:
Authorization: Bearer <token>
- Body:
{"title": "My Post", "content": "Hello"}
- 預期響應:201
- Headers:
- PUT /api/v1/posts/1(錯誤用戶):
- 用另一用戶的 token,應返回 403。
- POST /api/v1/users:
- 使用 Postman 測試:
作業
- 在 v2 的 posts 路由中實現相同的 JWT 認證。
- 添加一個端點
GET /api/v1/me
,返回當前用戶的信息(使用g.current_user
)。
注意事項
- Token 過期後需重新登錄獲取新 token。
- 目前密碼仍是明文,下一天會加密。
- 在 Postman 中,將 token 添加到 Headers 的
Authorization
字段,格式為Bearer <token>
。
本文章以 CC BY 4.0 授權