-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
136 lines (110 loc) · 3.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from datetime import datetime
import database
import schemas
import security
app = FastAPI()
def get_db():
db = database.SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="login")),
db: Session = Depends(get_db),
):
try:
payload = jwt.decode(
token, security.SECRET_KEY, algorithms=[security.ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=401, detail="Could not validate credentials"
)
except JWTError:
raise HTTPException(status_code=401, detail="Could not validate credentials")
user = db.query(database.User).filter(database.User.username == username).first()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
@app.post("/register", response_model=schemas.UserResponse)
def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if username or email already exists
existing_user = (
db.query(database.User)
.filter(
(database.User.username == user.username)
| (database.User.email == user.email)
)
.first()
)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username or email already registered",
)
# Create new user
hashed_password = security.get_password_hash(user.password)
db_user = database.User(
username=user.username,
email=user.email,
hashed_password=hashed_password,
full_name=user.full_name,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/login")
def login_user(user_login: schemas.UserLogin, db: Session = Depends(get_db)):
# Find user by username
user = (
db.query(database.User)
.filter(database.User.username == user_login.username)
.first()
)
if not user or not security.verify_password(
user_login.password, user.hashed_password
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
# Update last login time
user.last_login = datetime.utcnow()
db.commit()
# Generate access token
access_token = security.create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
@app.put("/update", response_model=schemas.UserResponse)
def update_user(
user_update: schemas.UserUpdate,
current_user: database.User = Depends(get_current_user),
db: Session = Depends(get_db),
):
# Update user details
if user_update.full_name is not None:
current_user.full_name = user_update.full_name
if user_update.email is not None:
# Check if new email is already in use
existing_email = (
db.query(database.User)
.filter(
(database.User.email == user_update.email)
& (database.User.id != current_user.id)
)
.first()
)
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Email already in use"
)
current_user.email = user_update.email
db.commit()
db.refresh(current_user)
return current_user