Module sql_app.api.auth
Expand source code
from fastapi import Depends, APIRouter, Form
from fastapi import Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi_jwt_auth import AuthJWT
from sqlalchemy.orm import Session
from sql_app import crud
from passlib.context import CryptContext
from pydantic import BaseModel
from ..database import SessionLocal, engine
# Path to html templates used in this file
templates = Jinja2Templates(directory="../templates/auth")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# prefix used for all endpoints in this file
auth = APIRouter(prefix="")
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
# Configure application to store and get JWT from cookies
authjwt_token_location: set = {"cookies"}
# Disable CSRF Protection for this example. default is True
authjwt_cookie_csrf_protect: bool = False
@AuthJWT.load_config
def get_config():
return Settings()
# admin username and password
fake_users_db = {
"admin": {
"username": "admin",
"password": "admin"
}
}
def verify_password(plain_password, hashed_password):
"""
Verifies plain text password with hashed password
"""
return pwd_context.verify(plain_password, hashed_password)
def get_hash_password(password):
"""
Returns hashed password
"""
return pwd_context.hash(password)
def auth_user(db, username: str, password: str):
"""
Determines if given password belongs to user with given username
"""
user = crud.find_user(db, username)
if not user:
return None
if not verify_password(password, user.password):
return None
return user
@auth.get("/signup", response_class=HTMLResponse)
async def signup_get(request: Request):
"""
return html template for signup
"""
return templates.TemplateResponse("signup.html", {"request": request})
@auth.post("/signup", response_class=HTMLResponse)
async def signup(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)):
"""
Endpoint called form signup template. Creates new user with role guest that can be changed by admin user
"""
users = crud.get_users(db, 0, 100)
users_names = []
for u in users:
users_names.append(u.username)
if username not in users_names:
new_user = crud.create_user(db, username, get_hash_password(password), "guest")
if new_user is None:
print("something went wrong")
return """
<html>
<head>
<title>Signup</title>
</head>
<body>
<h1>New user created. You can go back to previous page.</h1>
<form action="/logs-web" method="get">
<input type="submit" value="Home Page" />
</form>
</body>
</html>
"""
else:
return """
<html>
<head>
<title>Signup</title>
</head>
<body>
<h1>Username taken. Try to choose different username.</h1>
<form action="/logs-web" method="get">
<input type="submit" value="Home Page" />
</form>
</body>
</html>
"""
@auth.get("/login", response_class=HTMLResponse)
async def login_get(request: Request):
"""
return html template for login
"""
return templates.TemplateResponse("login.html", {"request": request})
@auth.post("/login", response_class=HTMLResponse)
async def login(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db),
Authorize: AuthJWT = Depends()):
"""
Endpoint called from login template. Checks if given username and password aligns with admin
username and password and returns token for browser according to given username and password
"""
user = auth_user(db, username, password)
if user != None:
if user.role == "admin":
access_token = Authorize.create_access_token(subject="admin", expires_time=False)
refresh_token = Authorize.create_refresh_token(subject="admin", expires_time=False)
else:
access_token = Authorize.create_access_token(subject="guest", expires_time=False)
refresh_token = Authorize.create_refresh_token(subject="guest", expires_time=False)
else:
usr = fake_users_db.get(username)
if usr != None:
if usr["username"] == username and usr["password"] == password:
access_token = Authorize.create_access_token(subject="admin", expires_time=False)
refresh_token = Authorize.create_refresh_token(subject="admin", expires_time=False)
else:
return """
<html>
<head>
<title>Login</title>
</head>
<body>
<h1>Wrong Username or Password</h1>
<form action="/login" method="get">
<input type="submit" value="Log again" />
</form>
<form action="/login" method="get">
<input type="submit" value="Home Page" />
</form>
</body>
</html>
"""
# Set the JWT cookies in the response
Authorize.set_access_cookies(access_token)
Authorize.set_refresh_cookies(refresh_token)
return """
<html>
<head>
<title>Login</title>
</head>
<body>
<h1>Now you are logged in, you can continue to previous page.</h1>
<form action="/logs-web" method="get">
<input type="submit" value="Home Page" />
</form>
</body>
</html>
"""
@auth.post('/refresh')
def refresh(Authorize: AuthJWT = Depends()):
"""
endpoint for refreshing browser token. Not used at the moment since lifetime of given tokens are
unlimited.
"""
Authorize.jwt_refresh_token_required()
current_user = Authorize.get_jwt_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
# Set the JWT cookies in the response
Authorize.set_access_cookies(new_access_token)
return {"msg": "The token has been refresh"}
@auth.get('/logout', response_class=HTMLResponse)
def logout(Authorize: AuthJWT = Depends()):
"""
Endpoint for deleting cookie token with acces role.
"""
Authorize.jwt_optional()
Authorize.unset_jwt_cookies()
return """
<html>
<head>
<title>Logout</title>
</head>
<body>
<h1>Logged Out</h1>
<form action="/logs-web" method="get">
<input type="submit" value="Back" />
</form>
</body>
</html>
"""
Functions
def auth_user(db, username: str, password: str)
-
Determines if given password belongs to user with given username
Expand source code
def auth_user(db, username: str, password: str): """ Determines if given password belongs to user with given username """ user = crud.find_user(db, username) if not user: return None if not verify_password(password, user.password): return None return user
def get_db()
-
Expand source code
def get_db(): db = SessionLocal() try: yield db finally: db.close()
def get_hash_password(password)
-
Returns hashed password
Expand source code
def get_hash_password(password): """ Returns hashed password """ return pwd_context.hash(password)
async def login(username: str = Form(Ellipsis), password: str = Form(Ellipsis), db: sqlalchemy.orm.session.Session = Depends(get_db), Authorize: fastapi_jwt_auth.auth_jwt.AuthJWT = Depends(NoneType))
-
Endpoint called from login template. Checks if given username and password aligns with admin username and password and returns token for browser according to given username and password
Expand source code
@auth.post("/login", response_class=HTMLResponse) async def login(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), Authorize: AuthJWT = Depends()): """ Endpoint called from login template. Checks if given username and password aligns with admin username and password and returns token for browser according to given username and password """ user = auth_user(db, username, password) if user != None: if user.role == "admin": access_token = Authorize.create_access_token(subject="admin", expires_time=False) refresh_token = Authorize.create_refresh_token(subject="admin", expires_time=False) else: access_token = Authorize.create_access_token(subject="guest", expires_time=False) refresh_token = Authorize.create_refresh_token(subject="guest", expires_time=False) else: usr = fake_users_db.get(username) if usr != None: if usr["username"] == username and usr["password"] == password: access_token = Authorize.create_access_token(subject="admin", expires_time=False) refresh_token = Authorize.create_refresh_token(subject="admin", expires_time=False) else: return """ <html> <head> <title>Login</title> </head> <body> <h1>Wrong Username or Password</h1> <form action="/login" method="get"> <input type="submit" value="Log again" /> </form> <form action="/login" method="get"> <input type="submit" value="Home Page" /> </form> </body> </html> """ # Set the JWT cookies in the response Authorize.set_access_cookies(access_token) Authorize.set_refresh_cookies(refresh_token) return """ <html> <head> <title>Login</title> </head> <body> <h1>Now you are logged in, you can continue to previous page.</h1> <form action="/logs-web" method="get"> <input type="submit" value="Home Page" /> </form> </body> </html> """
async def login_get(request: starlette.requests.Request)
-
return html template for login
Expand source code
@auth.get("/login", response_class=HTMLResponse) async def login_get(request: Request): """ return html template for login """ return templates.TemplateResponse("login.html", {"request": request})
def logout(Authorize: fastapi_jwt_auth.auth_jwt.AuthJWT = Depends(NoneType))
-
Endpoint for deleting cookie token with acces role.
Expand source code
@auth.get('/logout', response_class=HTMLResponse) def logout(Authorize: AuthJWT = Depends()): """ Endpoint for deleting cookie token with acces role. """ Authorize.jwt_optional() Authorize.unset_jwt_cookies() return """ <html> <head> <title>Logout</title> </head> <body> <h1>Logged Out</h1> <form action="/logs-web" method="get"> <input type="submit" value="Back" /> </form> </body> </html> """
def refresh(Authorize: fastapi_jwt_auth.auth_jwt.AuthJWT = Depends(NoneType))
-
endpoint for refreshing browser token. Not used at the moment since lifetime of given tokens are unlimited.
Expand source code
@auth.post('/refresh') def refresh(Authorize: AuthJWT = Depends()): """ endpoint for refreshing browser token. Not used at the moment since lifetime of given tokens are unlimited. """ Authorize.jwt_refresh_token_required() current_user = Authorize.get_jwt_subject() new_access_token = Authorize.create_access_token(subject=current_user) # Set the JWT cookies in the response Authorize.set_access_cookies(new_access_token) return {"msg": "The token has been refresh"}
async def signup(username: str = Form(Ellipsis), password: str = Form(Ellipsis), db: sqlalchemy.orm.session.Session = Depends(get_db))
-
Endpoint called form signup template. Creates new user with role guest that can be changed by admin user
Expand source code
@auth.post("/signup", response_class=HTMLResponse) async def signup(username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): """ Endpoint called form signup template. Creates new user with role guest that can be changed by admin user """ users = crud.get_users(db, 0, 100) users_names = [] for u in users: users_names.append(u.username) if username not in users_names: new_user = crud.create_user(db, username, get_hash_password(password), "guest") if new_user is None: print("something went wrong") return """ <html> <head> <title>Signup</title> </head> <body> <h1>New user created. You can go back to previous page.</h1> <form action="/logs-web" method="get"> <input type="submit" value="Home Page" /> </form> </body> </html> """ else: return """ <html> <head> <title>Signup</title> </head> <body> <h1>Username taken. Try to choose different username.</h1> <form action="/logs-web" method="get"> <input type="submit" value="Home Page" /> </form> </body> </html> """
async def signup_get(request: starlette.requests.Request)
-
return html template for signup
Expand source code
@auth.get("/signup", response_class=HTMLResponse) async def signup_get(request: Request): """ return html template for signup """ return templates.TemplateResponse("signup.html", {"request": request})
def verify_password(plain_password, hashed_password)
-
Verifies plain text password with hashed password
Expand source code
def verify_password(plain_password, hashed_password): """ Verifies plain text password with hashed password """ return pwd_context.verify(plain_password, hashed_password)
Classes
class Settings (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class Settings(BaseModel): authjwt_secret_key: str = "secret" # Configure application to store and get JWT from cookies authjwt_token_location: set = {"cookies"} # Disable CSRF Protection for this example. default is True authjwt_cookie_csrf_protect: bool = False
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var authjwt_secret_key : str
var authjwt_token_location : set