Mocking a user authentication with cached bearer token to reduce latencies and enhance end-user experience and reduce response time from application(s).
Image courtesy to Reddit
TL;DR
Giving a very simple authentication application to mocking caching the authentication bearer to overcome latencies, related to a recent case I’m working on. For sure this is a basic over-simplfied solution to show the methodology.
Implementation
The users database is the following ⬇️😄
{
"john": {
"password": "password123",
"role": "admin"
},
"jane": {
"password": "password456",
"role": "user"
},
"alice": {
"password": "securePass",
"role": "user"
}
}
The sample FastAPI server implementation;
# server.py (FastAPI with authentication and caching)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, Optional
import time
import secrets
import hashlib
import json
app = FastAPI()
# In-memory token cache (replace with Redis or Memcached in production)
token_cache: Dict[str, dict] = {}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
password: str
class Item(BaseModel):
name: str
description: Optional[str] = None
def load_credentials(filepath="credentials.json"):
try:
with open(filepath, "r") as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Credentials file '{filepath}' not found.")
return {}
except json.JSONDecodeError:
print(f"Error: Invalid JSON in '{filepath}'.")
return {}
def get_user(username: str):
credentials = load_credentials()
if username in credentials:
return credentials[username]
return None
def generate_token(username: str):
token = secrets.token_hex(32)
return token
def verify_password(password: str, hashed_password: str):
return hashlib.sha256(password.encode()).hexdigest() == hashed_password
def authenticate_user(user: User):
user_data = get_user(user.username)
if not user_data:
return None
hashed_password = hashlib.sha256(user.password.encode()).hexdigest()
if verify_password(user.password, hashlib.sha256(user_data["password"].encode()).hexdigest()):
return user_data
return None
def get_current_user(token: str = Depends(oauth2_scheme)):
if token in token_cache:
return token_cache[token]
credentials = load_credentials()
for username, user_data in credentials.items():
if generate_token(username) == token:
token_cache[token] = user_data
return user_data
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
@app.post("/token", response_model=Token)
async def login_for_access_token(user: User):
authenticated_user = authenticate_user(user)
if not authenticated_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = generate_token(user.username)
token_cache[access_token] = authenticated_user
return Token(access_token=access_token, token_type="bearer")
@app.get("/items/")
async def read_items(current_user: dict = Depends(get_current_user)):
if current_user["role"] == "admin":
return {"items": [{"name": "Foo", "description": "Admin Items"}]}
else:
return {"items": [{"name": "Foo", "description": "User Items"}]}
@app.post("/items/")
async def create_item(item: Item, current_user: dict = Depends(get_current_user)):
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Insufficient privileges")
return item
@app.get("/user/")
async def get_user_info(current_user: dict = Depends(get_current_user)):
return current_user
And finally the client application!
import requests
import json
import getpass
BASE_URL = "http://127.0.0.1:8000"
def load_credentials(filepath="credentials.json"):
try:
with open(filepath, "r") as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Credentials file '{filepath}' not found.")
return {}
except json.JSONDecodeError:
print(f"Error: Invalid JSON in '{filepath}'.")
return {}
def get_token(username):
credentials = load_credentials()
if username not in credentials:
print(f"Error: Username '{username}' not found in credentials.")
return None
password = getpass.getpass(f"Enter password for {username}: ")
if credentials[username]["password"] != password:
print("Error: Incorrect password.")
return None
data = {"username": username, "password": password}
response = requests.post(f"{BASE_URL}/token", json=data)
response.raise_for_status()
return response.json()["access_token"]
def get_items(token):
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(f"{BASE_URL}/items/", headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
try:
error_message = e.response.json().get('detail')
print(error_message)
except json.JSONDecodeError:
print("Error: Server returned a 403 but no valid JSON in the response.")
else:
raise e
def create_item(token, item_name, item_description):
headers = {"Authorization": f"Bearer {token}"}
data = {"name": item_name, "description": item_description} # moved data out of the try block
try:
response = requests.post(f"{BASE_URL}/items/", headers=headers, json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 403:
try:
error_message = e.response.json().get('detail')
print(error_message)
except json.JSONDecodeError:
print("Error: Server returned a 403 but no valid JSON in the response.")
else:
raise e
return None # Added return None to handle errors.
def get_user_data(token):
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/user/", headers=headers)
response.raise_for_status()
return response.json()
if __name__ == "__main__":
credentials = load_credentials()
while True:
username = input("Enter username (or 'quit'/'exit' to end): ").lower()
if username in ("quit", "exit"):
break
try:
token = get_token(username)
if token:
print(f"Token: {token}")
items = get_items(token)
if items: #only print items if there are any, and if there are no errors.
print(f"Items: {items}")
new_item = create_item(token, "New Item", "Description of new item")
if new_item: #only print the new item if there are no errors
print(f"Created item: {new_item}")
user_data = get_user_data(token)
print(f"User Data: {user_data}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
And the output of the application…
Enter username (or 'quit'/'exit' to end): john
Enter password for john:
Token: b049545df8dcb70db93b1e331d8449341c42abc5a7736eae9abb6d0e9a2fadde
Items: {'items': [{'name': 'Foo', 'description': 'Admin Items'}]}
Created item: {'name': 'New Item', 'description': 'Description of new item'}
User Data: {'password': 'password123', 'role': 'admin'}
Enter username (or 'quit'/'exit' to end): jane
Enter password for jane:
Token: 2f9fca445e6e3a50a8d3bfcadcf6cdaf04ae8b469f38aee63df25508856c6f00
Items: {'items': [{'name': 'Foo', 'description': 'User Items'}]}
Insufficient privileges
User Data: {'password': 'password456', 'role': 'user'}
Enter username (or 'quit'/'exit' to end): hanz
Error: Username 'hanz' not found in credentials.
Enter username (or 'quit'/'exit' to end): end
Error: Username 'end' not found in credentials.
Conclusion
In this application, caching the bearer token offers a significant performance boost and reduces server load. By storing the validated bearer tokens along with their associated user data in an in-memory cache (simulated here, but ideally a dedicated caching system like Redis or others…), subsequent requests from the same user can bypass the expensive authentication process. Instead of re-verifying credentials, re-calculating hashes, or querying a database, the server simply retrieves the user’s information from the cache, which is much faster. This reduces latency, improves response times, and allows the server to handle more concurrent requests, leading to better scalability. Furthermore, by minimizing the number of database interactions or authentication calculations, the server consumes fewer resources, ultimately enhancing the application’s overall efficiency.
Thanks for reading 🕴️!