Skip to content

Commit 005317f

Browse files
committed
第六章 安全、认证和授权
1 parent 95dc264 commit 005317f

File tree

3 files changed

+224
-1
lines changed

3 files changed

+224
-1
lines changed

run.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from starlette.exceptions import HTTPException as StarletteHTTPException
1010

1111
from coronavirus import application
12-
from tutorial import app03, app04, app05
12+
from tutorial import app03, app04, app05, app06
1313

1414
app = FastAPI(
1515
title='FastAPI Tutorial and Coronavirus Tracker API Docs',
@@ -43,6 +43,7 @@
4343
app.include_router(app03, prefix="/chapter03", tags=['第三章 请求参数和验证'])
4444
app.include_router(app04, prefix="/chapter04", tags=['第四章 响应处理和FastAPI配置'])
4545
app.include_router(app05, prefix="/chapter05", tags=['第五章 FastAPI的依赖注入系统'])
46+
app.include_router(app06, prefix="/chapter06", tags=['第六章 安全、认证和授权'])
4647
app.include_router(application, prefix="/coronavirus", tags=['新冠病毒疫情跟踪器API'])
4748

4849
if __name__ == '__main__':

tutorial/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
from .chapter03 import app03
66
from .chapter04 import app04
77
from .chapter05 import app05
8+
from .chapter06 import app06

tutorial/chapter06.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#!/usr/bin/python3
2+
# -*- coding:utf-8 -*-
3+
# __author__ = '__Jack__'
4+
5+
from datetime import datetime, timedelta
6+
from typing import Optional
7+
8+
from fastapi import APIRouter
9+
from fastapi import Depends, HTTPException, status
10+
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
11+
from jose import JWTError, jwt
12+
from passlib.context import CryptContext
13+
from pydantic import BaseModel
14+
15+
app06 = APIRouter()
16+
17+
"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""
18+
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="chapter06/token") # 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
19+
20+
21+
@app06.get("/oauth2_password_bearer")
22+
async def oauth2_password_bearer(token: str = Depends(oauth2_scheme)):
23+
return {"token": token}
24+
25+
26+
"""基于 Password 和 Bearer token 的 OAuth2 认证 """
27+
28+
fake_users_db = {
29+
"john snow": {
30+
"username": "john snow",
31+
"full_name": "John Snow",
32+
"email": "[email protected]",
33+
"hashed_password": "fakehashedsecret",
34+
"disabled": False,
35+
},
36+
"alice": {
37+
"username": "alice",
38+
"full_name": "Alice Wonderson",
39+
"email": "[email protected]",
40+
"hashed_password": "fakehashedsecret2",
41+
"disabled": True,
42+
},
43+
}
44+
45+
46+
def fake_hash_password(password: str):
47+
return "fakehashed" + password
48+
49+
50+
class User(BaseModel):
51+
username: str
52+
email: Optional[str] = None
53+
full_name: Optional[str] = None
54+
disabled: Optional[bool] = None
55+
56+
57+
class UserInDB(User):
58+
hashed_password: str
59+
60+
61+
def get_user(db, username: str):
62+
if username in db:
63+
user_dict = db[username]
64+
return UserInDB(**user_dict)
65+
66+
67+
def fake_decode_token(token):
68+
user = get_user(fake_users_db, token)
69+
return user
70+
71+
72+
async def get_current_user(token: str = Depends(oauth2_scheme)):
73+
user = fake_decode_token(token)
74+
if not user:
75+
raise HTTPException(
76+
status_code=status.HTTP_401_UNAUTHORIZED,
77+
detail="Invalid authentication credentials",
78+
headers={"WWW-Authenticate": "Bearer"}, # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
79+
)
80+
return user
81+
82+
83+
async def get_current_active_user(current_user: User = Depends(get_current_user)):
84+
if current_user.disabled:
85+
raise HTTPException(status_code=400, detail="Inactive user")
86+
return current_user
87+
88+
89+
@app06.post("/token")
90+
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
91+
user_dict = fake_users_db.get(form_data.username)
92+
if not user_dict:
93+
raise HTTPException(status_code=400, detail="Incorrect username or password")
94+
user = UserInDB(**user_dict)
95+
hashed_password = fake_hash_password(form_data.password)
96+
if not hashed_password == user.hashed_password:
97+
raise HTTPException(status_code=400, detail="Incorrect username or password")
98+
99+
return {"access_token": user.username, "token_type": "bearer"}
100+
101+
102+
@app06.get("/users/me")
103+
async def read_users_me(current_user: User = Depends(get_current_active_user)):
104+
return current_user
105+
106+
107+
"""OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证"""
108+
109+
fake_users_db.update({
110+
"john snow": {
111+
"username": "john snow",
112+
"full_name": "John Snow",
113+
"email": "[email protected]",
114+
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
115+
"disabled": False,
116+
}
117+
})
118+
119+
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
120+
ALGORITHM = "HS256"
121+
ACCESS_TOKEN_EXPIRE_MINUTES = 30
122+
123+
124+
class Token(BaseModel):
125+
access_token: str
126+
token_type: str
127+
128+
129+
class TokenData(BaseModel):
130+
username: Optional[str] = None
131+
132+
133+
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
134+
135+
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="chapter06/jwt/token")
136+
137+
138+
def verify_password(plain_password, hashed_password):
139+
return pwd_context.verify(plain_password, hashed_password)
140+
141+
142+
def get_password_hash(password):
143+
return pwd_context.hash(password)
144+
145+
146+
def jwt_get_user(db, username: str):
147+
if username in db:
148+
user_dict = db[username]
149+
return UserInDB(**user_dict)
150+
151+
152+
def authenticate_user(fake_db, username: str, password: str):
153+
user = get_user(fake_db, username)
154+
if not user:
155+
return False
156+
if not verify_password(password, user.hashed_password):
157+
return False
158+
return user
159+
160+
161+
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
162+
to_encode = data.copy()
163+
if expires_delta:
164+
expire = datetime.utcnow() + expires_delta
165+
else:
166+
expire = datetime.utcnow() + timedelta(minutes=15)
167+
to_encode.update({"exp": expire})
168+
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
169+
return encoded_jwt
170+
171+
172+
async def jwt_get_current_user(token: str = Depends(oauth2_scheme)):
173+
credentials_exception = HTTPException(
174+
status_code=status.HTTP_401_UNAUTHORIZED,
175+
detail="Could not validate credentials",
176+
headers={"WWW-Authenticate": "Bearer"},
177+
)
178+
try:
179+
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
180+
username: str = payload.get("sub")
181+
if username is None:
182+
raise credentials_exception
183+
token_data = TokenData(username=username)
184+
except JWTError:
185+
raise credentials_exception
186+
user = get_user(fake_users_db, username=token_data.username)
187+
if user is None:
188+
raise credentials_exception
189+
return user
190+
191+
192+
async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
193+
if current_user.disabled:
194+
raise HTTPException(status_code=400, detail="Inactive user")
195+
return current_user
196+
197+
198+
@app06.post("/jwt/token", response_model=Token)
199+
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
200+
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
201+
if not user:
202+
raise HTTPException(
203+
status_code=status.HTTP_401_UNAUTHORIZED,
204+
detail="Incorrect username or password",
205+
headers={"WWW-Authenticate": "Bearer"},
206+
)
207+
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
208+
access_token = create_access_token(
209+
data={"sub": user.username}, expires_delta=access_token_expires
210+
)
211+
return {"access_token": access_token, "token_type": "bearer"}
212+
213+
214+
@app06.get("/jwt/users/me/", response_model=User)
215+
async def read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
216+
return current_user
217+
218+
219+
@app06.get("/jwt/users/me/items/")
220+
async def read_own_items(current_user: User = Depends(jwt_get_current_active_user)):
221+
return [{"item_id": "Foo", "owner": current_user.username}]

0 commit comments

Comments
 (0)