文章

Flask - 角色與權限

目標

  • 在用戶模型中添加角色字段
  • 實現權限檢查邏輯
  • 限制特定端點的操作

步驟

  1. 準備環境

    • 繼續使用 flask_api/ 項目結構,激活虛擬環境:
      1
      2
      
      # Windows: flask_api_env\Scripts\activate
      # macOS/Linux: source flask_api_env/bin/activate
      
  2. 更新模型

    • 修改 app/models.py,添加角色字段:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      
      from . import db, bcrypt
      from datetime import datetime
      
      class User(db.Model):
          __tablename__ = 'users'
          id = db.Column(db.Integer, primary_key=True)
          username = db.Column(db.String(50), unique=True, nullable=False)
          password_hash = db.Column(db.String(128), nullable=False)
          role = db.Column(db.String(20), default='user')  # 新增角色字段:user 或 admin
          posts = db.relationship('Post', backref='user', lazy=True)
      
          def set_password(self, password):
              self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
      
          def check_password(self, password):
              return bcrypt.check_password_hash(self.password_hash, password)
      
          @property
          def is_admin(self):
              return self.role == 'admin'
      
      class Post(db.Model):
          __tablename__ = 'posts'
          id = db.Column(db.Integer, primary_key=True)
          title = db.Column(db.String(100), nullable=False)
          content = db.Column(db.Text, nullable=False)
          created_at = db.Column(db.DateTime, default=datetime.utcnow)
          user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
          category = db.Column(db.String(50), default='general')
      
    • 代碼解釋

      • role:字符串字段,默認為 ‘user’,可設為 ‘admin’。
      • is_admin:屬性方法,方便檢查用戶是否為管理员。
  3. 更新序列化器

    • 修改 app/schemas.py,包含角色字段:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      
      from . import ma
      from .models import User, Post
      
      class PostSchema(ma.SQLAlchemyAutoSchema):
          class Meta:
              model = Post
              include_fk = True
              fields = ('id', 'title', 'content', 'created_at', 'user_id', 'category')
      
      class UserSchema(ma.SQLAlchemyAutoSchema):
          class Meta:
              model = User
              fields = ('id', 'username', 'role', 'posts')  # 添加 role
          posts = ma.Nested('PostSchema', many=True)
      
      post_schema = PostSchema()
      posts_schema = PostSchema(many=True)
      user_schema = UserSchema()
      users_schema = UserSchema(many=True)
      
  4. 實現權限檢查

    • 修改 app/init.py,添加權限裝飾器:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      
      from flask import Flask, jsonify, g
      from flask_sqlalchemy import SQLAlchemy
      from flask_marshmallow import Marshmallow
      from flask_bcrypt import Bcrypt
      import jwt
      from functools import wraps
      from .routes.v1.todos import todos_bp as todos_v1_bp
      from .routes.v1.users import users_bp as users_v1_bp
      from .routes.v1.posts import posts_bp as posts_v1_bp
      from .routes.v2.todos import todos_bp as todos_v2_bp
      from .routes.v2.posts import posts_bp as posts_v2_bp
      from .config import config_map
      import os
      
      db = SQLAlchemy()
      ma = Marshmallow()
      bcrypt = Bcrypt()
      
      def create_app():
          app = Flask(__name__)
          env = os.getenv('FLASK_ENV', 'development')
          app.config.from_object(config_map[env])
          db.init_app(app)
          ma.init_app(app)
          bcrypt.init_app(app)
      
          app.register_blueprint(todos_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(users_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(posts_v1_bp, url_prefix='/api/v1')
          app.register_blueprint(todos_v2_bp, url_prefix='/api/v2')
          app.register_blueprint(posts_v2_bp, url_prefix='/api/v2')
      
          @app.errorhandler(404)
          def not_found(error):
              return jsonify({'error': 'Not Found', 'message': str(error)}), 404
      
          @app.errorhandler(400)
          def bad_request(error):
              return jsonify({'error': 'Bad Request', 'message': str(error)}), 400
      
          @app.errorhandler(500)
          def internal_error(error):
              return jsonify({'error': 'Internal Server Error', 'message': 'Something went wrong on our end'}), 500
      
          with app.app_context():
              db.create_all()
      
          return app
      
      def login_required(f):
          @wraps(f)
          def decorated_function(*args, **kwargs):
              from .models import User
              token = request.headers.get('Authorization')
              if not token:
                  abort(401, description='Missing token')
              try:
                  if token.startswith('Bearer '):
                      token = token[7:]
                  data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
                  user = User.query.get(data['user_id'])
                  if not user:
                      abort(401, description='Invalid token')
                  g.current_user = user
              except jwt.ExpiredSignatureError:
                  abort(401, description='Token has expired')
              except jwt.InvalidTokenError:
                  abort(401, description='Invalid token')
              return f(*args, **kwargs)
          return decorated_function
      
      def admin_required(f):
          @wraps(f)
          @login_required  # 先檢查登錄
          def decorated_function(*args, **kwargs):
              if not g.current_user.is_admin:
                  abort(403, description='Admin access required')
              return f(*args, **kwargs)
          return decorated_function
      
  5. 更新路由

    • 修改 app/routes/v1/users.py,支持角色並限制管理操作:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      
      from flask import Blueprint, jsonify, request, abort
      from ...models import User
      from ... import db, admin_required
      from ...schemas import user_schema, users_schema
      import jwt
      import datetime
      
      users_bp = Blueprint('users_v1', __name__)
      
      @users_bp.route('/users', methods=['GET'])
      def get_users():
          users = User.query.all()
          return jsonify({'users': users_schema.dump(users)})
      
      @users_bp.route('/users', methods=['POST'])
      def create_user():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'username' not in data or 'password' not in data:
              abort(400, description='Missing username or password')
          if User.query.filter_by(username=data['username']).first():
              abort(400, description='Username already exists')
          user = User(username=data['username'])
          user.set_password(data['password'])
          if data.get('role') in ['user', 'admin']:  # 限制角色值
              user.role = data['role']
          db.session.add(user)
          db.session.commit()
          return jsonify(user_schema.dump(user)), 201
      
      @users_bp.route('/login', methods=['POST'])
      def login():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'username' not in data or 'password' not in data:
              abort(400, description='Missing username or password')
          user = User.query.filter_by(username=data['username']).first()
          if not user or not user.check_password(data['password']):
              abort(401, description='Invalid credentials')
          token = jwt.encode({
              'user_id': user.id,
              'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
          }, users_bp.app.config['SECRET_KEY'], algorithm='HS256')
          return jsonify({'token': token})
      
      @users_bp.route('/users/<int:user_id>', methods=['DELETE'])
      @admin_required  # 僅管理员可刪除用戶
      def delete_user(user_id):
          user = User.query.get_or_404(user_id, description='User not found')
          db.session.delete(user)
          db.session.commit()
          return jsonify({'message': 'User deleted'}), 200
      
    • 修改 app/routes/v1/posts.py,允許管理员操作他人文章:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      
      from flask import Blueprint, jsonify, request, abort, g
      from ...models import Post, User
      from ... import db, login_required, admin_required
      from ...schemas import post_schema, posts_schema
      
      posts_bp = Blueprint('posts_v1', __name__)
      
      @posts_bp.route('/posts', methods=['GET'])
      def get_posts():
          user_id = request.args.get('user_id', type=int)
          query = Post.query
          if user_id:
              query = query.filter_by(user_id=user_id)
          posts = query.all()
          return jsonify({'posts': posts_schema.dump(posts)})
      
      @posts_bp.route('/posts/<int:post_id>', methods=['GET'])
      def get_post(post_id):
          post = Post.query.get_or_404(post_id, description='Post not found')
          return jsonify(post_schema.dump(post))
      
      @posts_bp.route('/posts', methods=['POST'])
      @login_required
      def create_post():
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'title' not in data or 'content' not in data:
              abort(400, description='Missing title or content')
          post = Post(
              title=data['title'],
              content=data['content'],
              user_id=g.current_user.id
          )
          db.session.add(post)
          db.session.commit()
          return jsonify(post_schema.dump(post)), 201
      
      @posts_bp.route('/posts/<int:post_id>', methods=['PUT'])
      @login_required
      def update_post(post_id):
          post = Post.query.get_or_404(post_id, description='Post not found')
          if post.user_id != g.current_user.id and not g.current_user.is_admin:
              abort(403, description='You can only edit your own posts unless you are an admin')
          if not request.is_json:
              abort(400, description='Request must be JSON')
          data = request.get_json()
          if 'title' in data:
              post.title = data['title']
          if 'content' in data:
              post.content = data['content']
          db.session.commit()
          return jsonify(post_schema.dump(post)), 200
      
      @posts_bp.route('/posts/<int:post_id>', methods=['DELETE'])
      @login_required
      def delete_post(post_id):
          post = Post.query.get_or_404(post_id, description='Post not found')
          if post.user_id != g.current_user.id and not g.current_user.is_admin:
              abort(403, description='You can only delete your own posts unless you are an admin')
          db.session.delete(post)
          db.session.commit()
          return jsonify({'message': 'Post deleted'}), 200
      
      @posts_bp.route('/posts/all', methods=['DELETE'])
      @admin_required  # 僅管理员可批量刪除
      def delete_all_posts():
          Post.query.delete()
          db.session.commit()
          return jsonify({'message': 'All posts deleted'}), 200
      
  6. 運行應用

    • 刪除舊的 blog.db(因表結構改變),運行:
      1
      
      python run.py
      
  7. 測試 API

    • 使用 Postman 測試:
      • POST /api/v1/users(普通用戶):
        • Body:{"username": "alice", "password": "1234"}
      • POST /api/v1/users(管理员):
        • Body:{"username": "admin", "password": "admin123", "role": "admin"}
      • POST /api/v1/login(普通用戶):
        • Body:{"username": "alice", "password": "1234"}
        • 獲取 token1。
      • POST /api/v1/login(管理员):
        • Body:{"username": "admin", "password": "admin123"}
        • 獲取 token2。
      • POST /api/v1/posts(用 token1):
        • Headers:Authorization: Bearer <token1>
        • Body:{"title": "My Post", "content": "Hello"}
      • DELETE /api/v1/users/1(用 token1):
        • 預期響應:403
      • DELETE /api/v1/users/1(用 token2):
        • 預期響應:200
      • PUT /api/v1/posts/1(用 token2,修改他人文章):
        • 預期響應:200
  8. 作業

    • 在 v2 的 posts 路由中實現相同的角色和權限邏輯。
    • 添加一個端點 GET /api/v1/users/<int:user_id>/role,允許管理员查看和更新用戶角色。

注意事項

  • 表結構改變後需重建數據庫。
  • 角色值應嚴格控制,避免無效輸入。
  • 實際應用中應限制誰能設置 role=admin

本文章以 CC BY 4.0 授權