Flask - 角色與權限
目標
- 在用戶模型中添加角色字段
- 實現權限檢查邏輯
- 限制特定端點的操作
步驟
準備環境
- 繼續使用
flask_api/
項目結構,激活虛擬環境:1 2
# Windows: flask_api_env\Scripts\activate # macOS/Linux: source flask_api_env/bin/activate
- 繼續使用
更新模型
修改 app/models.py,添加角色字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
from . import db, bcrypt from datetime import datetime class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(50), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) role = db.Column(db.String(20), default='user') # 新增角色字段:user 或 admin posts = db.relationship('Post', backref='user', lazy=True) def set_password(self, password): self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8') def check_password(self, password): return bcrypt.check_password_hash(self.password_hash, password) @property def is_admin(self): return self.role == 'admin' class Post(db.Model): __tablename__ = 'posts' id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(100), nullable=False) content = db.Column(db.Text, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) category = db.Column(db.String(50), default='general')
代碼解釋:
role
:字符串字段,默認為 ‘user’,可設為 ‘admin’。is_admin
:屬性方法,方便檢查用戶是否為管理员。
更新序列化器
修改 app/schemas.py,包含角色字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from . import ma from .models import User, Post class PostSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Post include_fk = True fields = ('id', 'title', 'content', 'created_at', 'user_id', 'category') class UserSchema(ma.SQLAlchemyAutoSchema): class Meta: model = User fields = ('id', 'username', 'role', 'posts') # 添加 role posts = ma.Nested('PostSchema', many=True) post_schema = PostSchema() posts_schema = PostSchema(many=True) user_schema = UserSchema() users_schema = UserSchema(many=True)
實現權限檢查
修改 app/init.py,添加權限裝飾器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
from flask import Flask, jsonify, g from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow from flask_bcrypt import Bcrypt import jwt from functools import wraps from .routes.v1.todos import todos_bp as todos_v1_bp from .routes.v1.users import users_bp as users_v1_bp from .routes.v1.posts import posts_bp as posts_v1_bp from .routes.v2.todos import todos_bp as todos_v2_bp from .routes.v2.posts import posts_bp as posts_v2_bp from .config import config_map import os db = SQLAlchemy() ma = Marshmallow() bcrypt = Bcrypt() def create_app(): app = Flask(__name__) env = os.getenv('FLASK_ENV', 'development') app.config.from_object(config_map[env]) db.init_app(app) ma.init_app(app) bcrypt.init_app(app) app.register_blueprint(todos_v1_bp, url_prefix='/api/v1') app.register_blueprint(users_v1_bp, url_prefix='/api/v1') app.register_blueprint(posts_v1_bp, url_prefix='/api/v1') app.register_blueprint(todos_v2_bp, url_prefix='/api/v2') app.register_blueprint(posts_v2_bp, url_prefix='/api/v2') @app.errorhandler(404) def not_found(error): return jsonify({'error': 'Not Found', 'message': str(error)}), 404 @app.errorhandler(400) def bad_request(error): return jsonify({'error': 'Bad Request', 'message': str(error)}), 400 @app.errorhandler(500) def internal_error(error): return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500 with app.app_context(): db.create_all() return app def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): from .models import User token = request.headers.get('Authorization') if not token: abort(401, description='Missing token') try: if token.startswith('Bearer '): token = token[7:] data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) user = User.query.get(data['user_id']) if not user: abort(401, description='Invalid token') g.current_user = user except jwt.ExpiredSignatureError: abort(401, description='Token has expired') except jwt.InvalidTokenError: abort(401, description='Invalid token') return f(*args, **kwargs) return decorated_function def admin_required(f): @wraps(f) @login_required # 先檢查登錄 def decorated_function(*args, **kwargs): if not g.current_user.is_admin: abort(403, description='Admin access required') return f(*args, **kwargs) return decorated_function
更新路由
修改 app/routes/v1/users.py,支持角色並限制管理操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
from flask import Blueprint, jsonify, request, abort from ...models import User from ... import db, admin_required from ...schemas import user_schema, users_schema import jwt import datetime users_bp = Blueprint('users_v1', __name__) @users_bp.route('/users', methods=['GET']) def get_users(): users = User.query.all() return jsonify({'users': users_schema.dump(users)}) @users_bp.route('/users', methods=['POST']) def create_user(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data: abort(400, description='Missing username or password') if User.query.filter_by(username=data['username']).first(): abort(400, description='Username already exists') user = User(username=data['username']) user.set_password(data['password']) if data.get('role') in ['user', 'admin']: # 限制角色值 user.role = data['role'] db.session.add(user) db.session.commit() return jsonify(user_schema.dump(user)), 201 @users_bp.route('/login', methods=['POST']) def login(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'username' not in data or 'password' not in data: abort(400, description='Missing username or password') user = User.query.filter_by(username=data['username']).first() if not user or not user.check_password(data['password']): abort(401, description='Invalid credentials') token = jwt.encode({ 'user_id': user.id, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24) }, users_bp.app.config['SECRET_KEY'], algorithm='HS256') return jsonify({'token': token}) @users_bp.route('/users/<int:user_id>', methods=['DELETE']) @admin_required # 僅管理员可刪除用戶 def delete_user(user_id): user = User.query.get_or_404(user_id, description='User not found') db.session.delete(user) db.session.commit() return jsonify({'message': 'User deleted'}), 200
修改 app/routes/v1/posts.py,允許管理员操作他人文章:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
from flask import Blueprint, jsonify, request, abort, g from ...models import Post, User from ... import db, login_required, admin_required from ...schemas import post_schema, posts_schema posts_bp = Blueprint('posts_v1', __name__) @posts_bp.route('/posts', methods=['GET']) def get_posts(): user_id = request.args.get('user_id', type=int) query = Post.query if user_id: query = query.filter_by(user_id=user_id) posts = query.all() return jsonify({'posts': posts_schema.dump(posts)}) @posts_bp.route('/posts/<int:post_id>', methods=['GET']) def get_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') return jsonify(post_schema.dump(post)) @posts_bp.route('/posts', methods=['POST']) @login_required def create_post(): if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'title' not in data or 'content' not in data: abort(400, description='Missing title or content') post = Post( title=data['title'], content=data['content'], user_id=g.current_user.id ) db.session.add(post) db.session.commit() return jsonify(post_schema.dump(post)), 201 @posts_bp.route('/posts/<int:post_id>', methods=['PUT']) @login_required def update_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') if post.user_id != g.current_user.id and not g.current_user.is_admin: abort(403, description='You can only edit your own posts unless you are an admin') if not request.is_json: abort(400, description='Request must be JSON') data = request.get_json() if 'title' in data: post.title = data['title'] if 'content' in data: post.content = data['content'] db.session.commit() return jsonify(post_schema.dump(post)), 200 @posts_bp.route('/posts/<int:post_id>', methods=['DELETE']) @login_required def delete_post(post_id): post = Post.query.get_or_404(post_id, description='Post not found') if post.user_id != g.current_user.id and not g.current_user.is_admin: abort(403, description='You can only delete your own posts unless you are an admin') db.session.delete(post) db.session.commit() return jsonify({'message': 'Post deleted'}), 200 @posts_bp.route('/posts/all', methods=['DELETE']) @admin_required # 僅管理员可批量刪除 def delete_all_posts(): Post.query.delete() db.session.commit() return jsonify({'message': 'All posts deleted'}), 200
運行應用
- 刪除舊的
blog.db
(因表結構改變),運行:1
python run.py
- 刪除舊的
測試 API
- 使用 Postman 測試:
- POST /api/v1/users(普通用戶):
- Body:
{"username": "alice", "password": "1234"}
- Body:
- POST /api/v1/users(管理员):
- Body:
{"username": "admin", "password": "admin123", "role": "admin"}
- Body:
- POST /api/v1/login(普通用戶):
- Body:
{"username": "alice", "password": "1234"}
- 獲取 token1。
- Body:
- POST /api/v1/login(管理员):
- Body:
{"username": "admin", "password": "admin123"}
- 獲取 token2。
- Body:
- POST /api/v1/posts(用 token1):
- Headers:
Authorization: Bearer <token1>
- Body:
{"title": "My Post", "content": "Hello"}
- Headers:
- DELETE /api/v1/users/1(用 token1):
- 預期響應:403
- DELETE /api/v1/users/1(用 token2):
- 預期響應:200
- PUT /api/v1/posts/1(用 token2,修改他人文章):
- 預期響應:200
- POST /api/v1/users(普通用戶):
- 使用 Postman 測試:
作業
- 在 v2 的 posts 路由中實現相同的角色和權限邏輯。
- 添加一個端點
GET /api/v1/users/<int:user_id>/role
,允許管理员查看和更新用戶角色。
注意事項
- 表結構改變後需重建數據庫。
- 角色值應嚴格控制,避免無效輸入。
- 實際應用中應限制誰能設置
role=admin
。
本文章以 CC BY 4.0 授權