Objectif: implémenter une API FastAPI sécurisée par JWT avec un niveau de qualité exploitable en équipe.
Fonctionnement: exécute les routes une par une, valide les statuts HTTP et gère les cas invalides.
api-python/ ├── app/ │ ├── __init__.py │ ├── main.py # Point d'entrée FastAPI │ ├── config.py # Configuration │ ├── database.py # Connexion DB │ ├── models/ │ │ ├── __init__.py │ │ └── user.py # Modèle SQLAlchemy │ ├── schemas/ │ │ ├── __init__.py │ │ ├── user.py # Schémas Pydantic │ │ └── token.py │ ├── routers/ │ │ ├── __init__.py │ │ ├── auth.py # Routes d'authentification │ │ └── users.py # Routes utilisateurs │ ├── services/ │ │ ├── __init__.py │ │ ├── auth_service.py # Logique métier auth │ │ └── user_service.py │ └── utils/ │ ├── __init__.py │ ├── jwt.py # Gestion JWT │ └── security.py # Hachage de mots de passe ├── requirements.txt ├── .env └── alembic/ # Migrations DB (optionnel)
# Créer un environnement virtuel python -m venv venv source venv/bin/activate # Linux/Mac venv\Scripts\activate # Windows # Installer les dépendances pip install fastapi uvicorn sqlalchemy psycopg2-binary python-jose[cryptography] passlib[bcrypt] python-multipart pydantic-settings
fastapi==0.104.1 uvicorn[standard]==0.24.0 sqlalchemy==2.0.23 psycopg2-binary==2.9.9 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 python-multipart==0.0.6 pydantic-settings==2.1.0 python-dotenv==1.0.0
# Base de données PostgreSQL DATABASE_URL=postgresql://postgres:secret@localhost:5432/api_rest # JWT JWT_SECRET_KEY=3F2504E0-4F89-11D3-9A0C-0305E82C3301-8A8F6B9C2D3E4F5A6B7C8D9E0F1A2B3C JWT_ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=15 REFRESH_TOKEN_EXPIRE_DAYS=7 # Application APP_NAME=API REST JWT APP_VERSION=1.0.0 DEBUG=True
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Base de données
database_url: str
# JWT
jwt_secret_key: str
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 15
refresh_token_expire_days: int = 7
# Application
app_name: str = "API REST JWT"
app_version: str = "1.0.0"
debug: bool = False
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings() -> Settings:
return Settings()
settings = get_settings()
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
# Créer le moteur de base de données
engine = create_engine(
settings.database_url,
pool_pre_ping=True,
echo=settings.debug
)
# Session locale
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base pour les modèles
Base = declarative_base()
# Dépendance pour obtenir la session DB
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
first_name = Column(String(50), nullable=True)
last_name = Column(String(50), nullable=True)
role = Column(String(20), default="user", nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relation avec les refresh tokens
refresh_tokens = relationship("RefreshToken", back_populates="user", cascade="all, delete-orphan")
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
token = Column(String(255), unique=True, index=True, nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relation avec l'utilisateur
user = relationship("User", back_populates="refresh_tokens")
# Script de création (create_tables.py)
from app.database import engine, Base
from app.models.user import User, RefreshToken
# Créer toutes les tables
Base.metadata.create_all(bind=engine)
print("Tables créées avec succès !")
# Exécuter
python create_tables.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from datetime import datetime
from typing import Optional
# Schéma de base
class UserBase(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
first_name: Optional[str] = Field(None, max_length=50)
last_name: Optional[str] = Field(None, max_length=50)
# Création d'utilisateur
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
# Mise à jour d'utilisateur
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
first_name: Optional[str] = Field(None, max_length=50)
last_name: Optional[str] = Field(None, max_length=50)
# Réponse utilisateur (sans mot de passe)
class UserResponse(UserBase):
id: int
role: str
is_active: bool
created_at: datetime
updated_at: Optional[datetime]
model_config = ConfigDict(from_attributes=True)
# Connexion
class UserLogin(BaseModel):
username: str
password: str
from pydantic import BaseModel
from typing import Optional
from app.schemas.user import UserResponse
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
user: UserResponse
class TokenRefresh(BaseModel):
refresh_token: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
from passlib.context import CryptContext
# Contexte pour hasher les mots de passe
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""Hasher un mot de passe"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Vérifier un mot de passe"""
return pwd_context.verify(plain_password, hashed_password)
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from app.config import settings
import secrets
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Créer un access token JWT"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret_key,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def decode_access_token(token: str) -> Dict[str, Any]:
"""Décoder et valider un access token"""
try:
payload = jwt.decode(
token,
settings.jwt_secret_key,
algorithms=[settings.jwt_algorithm]
)
return payload
except JWTError as e:
raise ValueError(f"Token invalide: {str(e)}")
def generate_refresh_token() -> str:
"""Générer un refresh token aléatoire"""
return secrets.token_urlsafe(32)
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.utils.security import hash_password
from typing import Optional
class UserService:
@staticmethod
def create_user(db: Session, user_data: UserCreate) -> User:
"""Créer un nouvel utilisateur"""
db_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hash_password(user_data.password),
first_name=user_data.first_name,
last_name=user_data.last_name
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
"""Récupérer un utilisateur par ID"""
return db.query(User).filter(
User.id == user_id,
User.is_active == True
).first()
@staticmethod
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Récupérer un utilisateur par username"""
return db.query(User).filter(
User.username == username,
User.is_active == True
).first()
@staticmethod
def username_exists(db: Session, username: str) -> bool:
"""Vérifier si un username existe"""
return db.query(User).filter(User.username == username).first() is not None
@staticmethod
def email_exists(db: Session, email: str) -> bool:
"""Vérifier si un email existe"""
return db.query(User).filter(User.email == email).first() is not None
@staticmethod
def update_user(db: Session, user_id: int, user_data: UserUpdate) -> Optional[User]:
"""Mettre à jour un utilisateur"""
db_user = UserService.get_user_by_id(db, user_id)
if not db_user:
return None
update_data = user_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from app.models.user import User, RefreshToken
from app.utils.security import verify_password
from app.utils.jwt import create_access_token, generate_refresh_token
from app.config import settings
from typing import Optional, Tuple
class AuthService:
@staticmethod
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""Authentifier un utilisateur"""
user = db.query(User).filter(
User.username == username,
User.is_active == True
).first()
if not user or not verify_password(password, user.hashed_password):
return None
return user
@staticmethod
def create_tokens(db: Session, user: User) -> Tuple[str, str]:
"""Créer access token et refresh token"""
# Access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={
"sub": str(user.id),
"username": user.username,
"email": user.email,
"role": user.role
},
expires_delta=access_token_expires
)
# Refresh token
refresh_token_str = generate_refresh_token()
refresh_token_expires = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days)
# Sauvegarder le refresh token en DB
db_refresh_token = RefreshToken(
user_id=user.id,
token=refresh_token_str,
expires_at=refresh_token_expires
)
db.add(db_refresh_token)
db.commit()
return access_token, refresh_token_str
@staticmethod
def verify_refresh_token(db: Session, token: str) -> Optional[User]:
"""Vérifier un refresh token"""
db_token = db.query(RefreshToken).filter(
RefreshToken.token == token,
RefreshToken.expires_at > datetime.utcnow()
).first()
if not db_token:
return None
return db_token.user
@staticmethod
def revoke_refresh_token(db: Session, token: str) -> bool:
"""Révoquer un refresh token"""
db_token = db.query(RefreshToken).filter(RefreshToken.token == token).first()
if db_token:
db.delete(db_token)
db.commit()
return True
return False
@staticmethod
def revoke_all_user_tokens(db: Session, user_id: int) -> None:
"""Révoquer tous les refresh tokens d'un utilisateur"""
db.query(RefreshToken).filter(RefreshToken.user_id == user_id).delete()
db.commit()
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.database import get_db
from app.utils.jwt import decode_access_token
from app.services.user_service import UserService
from app.models.user import User
from typing import Optional
# Schéma de sécurité Bearer
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Obtenir l'utilisateur courant à partir du token JWT"""
token = credentials.credentials
try:
payload = decode_access_token(token)
user_id: int = int(payload.get("sub"))
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide"
)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide ou expiré"
)
user = UserService.get_user_by_id(db, user_id)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utilisateur non trouvé"
)
return user
def require_admin(current_user: User = Depends(get_current_user)) -> User:
"""Vérifier que l'utilisateur est admin"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Accès refusé : droits administrateur requis"
)
return current_user
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.user import UserCreate, UserLogin, UserResponse
from app.schemas.token import Token, TokenRefresh, TokenResponse
from app.services.user_service import UserService
from app.services.auth_service import AuthService
from app.dependencies import get_current_user
from app.models.user import User
from app.config import settings
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Créer un nouveau compte utilisateur"""
# Validation
if UserService.username_exists(db, user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ce username est déjà utilisé"
)
if UserService.email_exists(db, user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cet email est déjà utilisé"
)
# Créer l'utilisateur
user = UserService.create_user(db, user_data)
return user
@router.post("/login", response_model=Token)
def login(credentials: UserLogin, db: Session = Depends(get_db)):
"""Se connecter et obtenir les tokens"""
# Authentifier l'utilisateur
user = AuthService.authenticate_user(db, credentials.username, credentials.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Identifiants incorrects"
)
# Créer les tokens
access_token, refresh_token = AuthService.create_tokens(db, user)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"expires_in": settings.access_token_expire_minutes * 60,
"user": user
}
@router.post("/refresh", response_model=TokenResponse)
def refresh(token_data: TokenRefresh, db: Session = Depends(get_db)):
"""Renouveler l'access token"""
# Vérifier le refresh token
user = AuthService.verify_refresh_token(db, token_data.refresh_token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token invalide ou expiré"
)
# Rotation : révoquer l'ancien et créer un nouveau
AuthService.revoke_refresh_token(db, token_data.refresh_token)
access_token, new_refresh_token = AuthService.create_tokens(db, user)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": settings.access_token_expire_minutes * 60
}
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT)
def logout(token_data: TokenRefresh, db: Session = Depends(get_db)):
"""Se déconnecter (révoquer le refresh token)"""
AuthService.revoke_refresh_token(db, token_data.refresh_token)
return None
@router.get("/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_user)):
"""Obtenir les informations de l'utilisateur connecté"""
return current_user
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.user import UserResponse
from app.services.user_service import UserService
from app.dependencies import get_current_user, require_admin
from app.models.user import User
router = APIRouter(prefix="/api/users", tags=["Users"])
@router.get("/{user_id}", response_model=UserResponse)
def get_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Récupérer un utilisateur par ID"""
# Seul l'utilisateur lui-même ou un admin peut voir les détails
if current_user.id != user_id and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Accès refusé"
)
user = UserService.get_user_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Utilisateur non trouvé"
)
return user
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import auth, users
from app.config import settings
# Créer l'application FastAPI
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="API REST avec authentification JWT",
docs_url="/docs",
redoc_url="/redoc"
)
# Configuration CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # À adapter en production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enregistrer les routers
app.include_router(auth.router)
app.include_router(users.router)
# Route de test
@app.get("/")
def root():
return {
"message": "API REST avec JWT",
"version": settings.app_version,
"docs": "/docs"
}
@app.get("/health")
def health_check():
return {"status": "healthy"}
# Lancer avec : uvicorn app.main:app --reload
# Activer l'environnement virtuel source venv/bin/activate # Linux/Mac venv\Scripts\activate # Windows # Créer les tables python create_tables.py # Démarrer le serveur uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # Accéder à la documentation interactive http://localhost:8000/docs # Swagger UI http://localhost:8000/redoc # ReDoc
curl -X POST http://localhost:8000/api/auth/register \
-H "Content-Type: application/json" \
-d '{
"username": "testuser",
"email": "test@example.com",
"password": "password123",
"first_name": "Test",
"last_name": "User"
}'
# Réponse
{
"id": 1,
"username": "testuser",
"email": "test@example.com",
"first_name": "Test",
"last_name": "User",
"role": "user",
"is_active": true,
"created_at": "2025-11-30T12:00:00.000000+00:00",
"updated_at": null
}
curl -X POST http://localhost:8000/api/auth/login \
-H "Content-Type: application/json" \
-d '{
"username": "testuser",
"password": "password123"
}'
# Réponse
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "x9y8z7w6v5u4t3s2r1q0...",
"token_type": "bearer",
"expires_in": 900,
"user": {
"id": 1,
"username": "testuser",
"email": "test@example.com",
"first_name": "Test",
"last_name": "User",
"role": "user",
"is_active": true,
"created_at": "2025-11-30T12:00:00.000000+00:00",
"updated_at": null
}
}
TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
curl -X GET http://localhost:8000/api/auth/me \
-H "Authorization: Bearer $TOKEN"
# Réponse
{
"id": 1,
"username": "testuser",
"email": "test@example.com",
"first_name": "Test",
"last_name": "User",
"role": "user",
"is_active": true,
"created_at": "2025-11-30T12:00:00.000000+00:00",
"updated_at": null
}
curl -X POST http://localhost:8000/api/auth/refresh \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "x9y8z7w6v5u4t3s2r1q0..."
}'
# Réponse
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... (nouveau)",
"token_type": "bearer",
"expires_in": 900
}
curl -X POST http://localhost:8000/api/auth/logout \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "x9y8z7w6v5u4t3s2r1q0..."
}'
# Réponse : 204 No Content
| Aspect | PHP | Python (FastAPI) |
|---|---|---|
| Framework | Custom (vanilla PHP) | FastAPI (moderne, async) |
| Type hints | Optionnel (PHP 7.4+) | Obligatoire (Pydantic) |
| Validation | Manuelle | Automatique (Pydantic) |
| Documentation | Manuelle | Auto-générée (Swagger/ReDoc) |
| ORM | PDO (SQL brut) | SQLAlchemy (ORM complet) |
| Async | Non (PHP 8.1+ expérimental) | Natif (async/await) |
| Performance | Bonne | Excellente (async + uvicorn) |
| Typage | Dynamique (optionnel) | Statique (runtime validation) |
| Syntaxe | Verbeuse | Concise et moderne |
| Gestion erreurs | Manuelle (try/catch) | Automatique (HTTPException) |
| Dépendances | Composer | pip + requirements.txt |
| Tests | PHPUnit | pytest + TestClient intégré |
$errors = [];
if (empty($data['username'])) {
$errors[] = ['field' => 'username', 'message' => 'Username requis'];
}
if (empty($data['email'])) {
$errors[] = ['field' => 'email', 'message' => 'Email requis'];
} elseif (!filter_var($data['email'], FILTER_VALIDATE_EMAIL)) {
$errors[] = ['field' => 'email', 'message' => 'Email invalide'];
}
if (!empty($errors)) {
Response::error('Erreur de validation', 400, $errors);
}
# Pydantic fait tout automatiquement !
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr # Validation email automatique
password: str = Field(..., min_length=8)
# Si les données sont invalides, FastAPI renvoie automatiquement
# une erreur 422 avec les détails
Avec FastAPI, vous obtenez gratuitement une interface interactive pour tester votre API :
http://localhost:8000/docs - Swagger UIhttp://localhost:8000/redoc - ReDocMission: industrialiser l'API FastAPI JWT avec qualité logicielle et sécurité opérationnelle.