589d79d3创建于 2023年2月14日历史提交
import bcrypt
import jwt

from nameko.extensions import DependencyProvider


JWT_SECRET = "secret"


class Unauthenticated(Exception):
    pass


class Unauthorized(Exception):
    pass


class Auth(DependencyProvider):

    class Api:
        def __init__(self, users, worker_ctx):
            self.users = users
            self.worker_ctx = worker_ctx

        def authenticate(self, username, password):
            user = self.users.get(username)
            if not user:
                raise Unauthenticated("User does not exist")
            if not bcrypt.checkpw(password.encode('utf-8'), user['password']):
                raise Unauthenticated("Incorrect password")

            payload = {
                'username': username,
                'roles': self.users[username]['roles']
            }
            token = jwt.encode(payload, key=JWT_SECRET, algorithm="HS256")
            self.worker_ctx.context_data['auth'] = token
            return token

        def has_role(self, role):
            token = self.worker_ctx.context_data.get('auth')
            if not token:
                raise Unauthenticated()

            try:
                payload = jwt.decode(
                    token, key=JWT_SECRET, verify=True, algorithms=["HS256"]
                )
                if role in payload['roles']:
                    return True
            except Exception:
                pass

            return False

        def check_role(self, role):
            if self.has_role(role):
                return
            raise Unauthorized()

    def setup(self):
        self.db = {
            'matt': {
                'password': (
                    b'$2b$12$fZXR7Z1Eoyn0pfym8.'
                    b'LyRuIFabYj00ZzhdaJ0qoTLZs9w4fg3pKlK'
                ),
                'roles': [
                    'developer',
                ]
            },
            'susie': {
                'password': (
                    b'$2b$12$k4MVi9PcbSsOqONoj5vW9.'
                    b'pcQpB0xSjYkZcc6Ogr5nQ4MD8DRDiUK'
                ),
                'roles': [
                    'developer',
                    'admin'
                ]
            }
        }

    def get_dependency(self, worker_ctx):
        return Auth.Api(self.db, worker_ctx)