mirror of
https://github.com/TurTaskProject/TurTaskWeb.git
synced 2025-12-19 22:14:07 +01:00
154 lines
6.1 KiB
Python
154 lines
6.1 KiB
Python
"""This module defines API views for authentication, user creation, and a simple hello message."""
|
|
|
|
import json
|
|
import requests
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.hashers import make_password
|
|
|
|
from rest_framework import status
|
|
from rest_framework.permissions import AllowAny
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
|
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
|
|
from authentications.access_token_cache import store_token
|
|
from authentications.serializers import MyTokenObtainPairSerializer
|
|
from users.managers import CustomAccountManager
|
|
from users.models import CustomUser
|
|
|
|
|
|
class CheckAccessTokenAndRefreshToken(APIView):
|
|
permission_classes = (AllowAny,)
|
|
JWT_authenticator = JWTAuthentication()
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
access_token = request.data.get('access_token')
|
|
refresh_token = request.data.get('refresh_token')
|
|
# Check if the access token is valid
|
|
if access_token:
|
|
response = self.JWT_authenticator.authenticate(request)
|
|
if response is not None:
|
|
return Response({'status': 'true'}, status=status.HTTP_200_OK)
|
|
|
|
# Check if the refresh token is valid
|
|
if refresh_token:
|
|
try:
|
|
refresh = RefreshToken(refresh_token)
|
|
access_token = str(refresh.access_token)
|
|
return Response({'access_token': access_token}, status=status.HTTP_200_OK)
|
|
except Exception as e:
|
|
return Response({'status': 'false'}, status=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
return Response({'status': 'false'}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class ObtainTokenPairWithCustomView(APIView):
|
|
"""
|
|
Custom Token Obtain Pair View.
|
|
Allows users to obtain access and refresh tokens by providing credentials.
|
|
"""
|
|
permission_classes = (AllowAny,)
|
|
|
|
def post(self, request):
|
|
"""
|
|
Issue access and refresh tokens in response to a valid login request.
|
|
"""
|
|
serializer = MyTokenObtainPairSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
token = serializer.validated_data
|
|
return Response(token, status=status.HTTP_200_OK)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class GoogleRetrieveUserInfo(APIView):
|
|
"""
|
|
Retrieve user information from Google and create a user if not exists.
|
|
"""
|
|
permission_classes = (AllowAny,)
|
|
client_config = {"web":{"client_id": settings.GOOGLE_CLIENT_ID,
|
|
"project_id":"turtask","auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
|
"token_uri": "https://oauth2.googleapis.com/token",
|
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
|
"client_secret": settings.GOOGLE_CLIENT_SECRET,
|
|
}
|
|
}
|
|
scopes = [
|
|
'https://www.googleapis.com/auth/userinfo.email',
|
|
'https://www.googleapis.com/auth/userinfo.profile',
|
|
'https://www.googleapis.com/auth/calendar.readonly',
|
|
]
|
|
|
|
def post(self, request):
|
|
code = request.data.get("code")
|
|
payload = self.exchange_authorization_code(code=code)
|
|
if 'error' in payload:
|
|
return Response({'error': payload['error']})
|
|
user_info = self.call_google_api(api_url='https://www.googleapis.com/oauth2/v2/userinfo?alt=json',
|
|
access_token=payload['access_token'])
|
|
payload['email'] = user_info['email']
|
|
user = self.get_or_create_user(payload)
|
|
token = RefreshToken.for_user(user)
|
|
|
|
response = {
|
|
'username': user.username,
|
|
'access_token': str(token.access_token),
|
|
'refresh_token': str(token),
|
|
}
|
|
|
|
return Response(response)
|
|
|
|
def get(self, request):
|
|
"""Get authorization url."""
|
|
flow = InstalledAppFlow.from_client_config(client_config=self.client_config,
|
|
scopes=self.scopes)
|
|
flow.redirect_uri = 'http://localhost:5173/'
|
|
authorization_url, state = flow.authorization_url(
|
|
access_type='offline',
|
|
# include_granted_scopes='true',
|
|
)
|
|
return Response({'url': authorization_url})
|
|
|
|
def exchange_authorization_code(self, code):
|
|
"""Exchange authorization code for access, id, refresh token."""
|
|
url = 'https://oauth2.googleapis.com/token'
|
|
payload = {
|
|
'code': code,
|
|
'client_id': settings.GOOGLE_CLIENT_ID,
|
|
'client_secret': settings.GOOGLE_CLIENT_SECRET,
|
|
'redirect_uri': 'postmessage',
|
|
'grant_type': 'authorization_code',
|
|
}
|
|
response = requests.post(url, data=payload)
|
|
return json.loads(response.text)
|
|
|
|
def get_or_create_user(self, user_info):
|
|
"""Get or create a user based on email."""
|
|
try:
|
|
user = CustomUser.objects.get(email=user_info['email'])
|
|
user.refresh_token = user_info['refresh_token']
|
|
user.save()
|
|
except CustomUser.DoesNotExist:
|
|
user = CustomUser()
|
|
user.username = user_info['email']
|
|
user.password = make_password(CustomAccountManager().make_random_password())
|
|
user.email = user_info['email']
|
|
user.refresh_token = user_info['refresh_token']
|
|
user.save()
|
|
store_token(user.id, user_info['access_token'], 'access')
|
|
store_token(user.id, user_info['id_token'], 'id')
|
|
return user
|
|
|
|
def call_google_api(self, api_url, access_token):
|
|
"""Call Google API with access token."""
|
|
headers = {
|
|
'Authorization': f'Bearer {access_token}'
|
|
}
|
|
|
|
response = requests.get(api_url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
raise Exception('Google API Error', response) |