Merge pull request #20 from TurTaskProject/feature/user-authentication

Separate Authentication and Users
This commit is contained in:
Sirin Puenggun 2023-11-04 03:54:17 +07:00 committed by GitHub
commit 79749dfd71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 215 additions and 183 deletions

View File

View File

@ -5,7 +5,7 @@ from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from .models import CustomUser
from users.models import CustomUser
def store_token(user_id, token, token_type):

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class AuthenticationsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'authentications'

View File

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View File

@ -0,0 +1,12 @@
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
class MyTokenObtainPairSerializer(TokenObtainPairSerializer):
@classmethod
def get_token(cls, user):
"""
Get the token for the user and add custom claims, such as 'username'.
"""
token = super(MyTokenObtainPairSerializer, cls).get_token(user)
token['username'] = user.username
return token

View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,12 @@
from django.urls import path
from rest_framework_simplejwt import views as jwt_views
from authentications.views import ObtainTokenPairWithCustomView, GreetingView, GoogleLogin, GoogleRetrieveUserInfo
urlpatterns = [
path('token/obtain/', jwt_views.TokenObtainPairView.as_view(), name='token_create'),
path('token/refresh/', jwt_views.TokenRefreshView.as_view(), name='token_refresh'),
path('token/custom_obtain/', ObtainTokenPairWithCustomView.as_view(), name='token_create_custom'),
path('hello/', GreetingView.as_view(), name='hello_world'),
path('dj-rest-auth/google/', GoogleLogin.as_view(), name="google_login"),
path('auth/google/', GoogleRetrieveUserInfo.as_view()),
]

View File

@ -0,0 +1,168 @@
from django.shortcuts import render
# Create your views here.
"""This module defines API views for authentication, user creation, and a simple hello message."""
import json
import requests
from django.conf import settings
from django.contrib.auth.hashers import make_password
from rest_framework import status
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.tokens import RefreshToken
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
from dj_rest_auth.registration.views import SocialLoginView
from google_auth_oauthlib.flow import InstalledAppFlow
from authentications.access_token_cache import store_token
from authentications.serializers import MyTokenObtainPairSerializer
from users.managers import CustomAccountManager
from users.models import CustomUser
class ObtainTokenPairWithCustomView(APIView):
"""
Custom Token Obtain Pair View.
Allows users to obtain access and refresh tokens by providing credentials.
"""
permission_classes = (AllowAny,)
def post(self, request):
"""
Issue access and refresh tokens in response to a valid login request.
"""
serializer = MyTokenObtainPairSerializer(data=request.data)
if serializer.is_valid():
token = serializer.validated_data
return Response(token, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class GreetingView(APIView):
"""
Hello World View.
Returns a greeting and user information for authenticated users.
"""
permission_classes = (IsAuthenticated,)
def get(self, request):
"""
Retrieve a greeting message and user information.
"""
user = request.user
user_info = {
"username": user.username,
}
response_data = {
"message": "Hello, world!",
"user_info": user_info,
}
return Response(response_data, status=status.HTTP_200_OK)
class GoogleLogin(SocialLoginView):
"""
Google Login View.
Handles Google OAuth2 authentication.
"""
# permission_classes = (AllowAny,)
adapter_class = GoogleOAuth2Adapter
# client_class = OAuth2Client
# callback_url = 'http://localhost:8000/accounts/google/login/callback/'
class GoogleRetrieveUserInfo(APIView):
"""
Retrieve user information from Google and create a user if not exists.
"""
permission_classes = (AllowAny,)
client_config = {"web":{"client_id": settings.GOOGLE_CLIENT_ID,
"project_id":"turtask","auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_secret": settings.GOOGLE_CLIENT_SECRET,
}
}
scopes = [
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/calendar.readonly',
]
def post(self, request):
code = request.data.get("code")
payload = self.exchange_authorization_code(code=code)
if 'error' in payload:
return Response({'error': payload['error']})
user_info = self.call_google_api(api_url='https://www.googleapis.com/oauth2/v2/userinfo?alt=json',
access_token=payload['access_token'])
payload['email'] = user_info['email']
user = self.get_or_create_user(payload)
token = RefreshToken.for_user(user)
response = {
'username': user.username,
'access_token': str(token.access_token),
'refresh_token': str(token),
}
return Response(response)
def get(self, request):
"""Get authorization url."""
flow = InstalledAppFlow.from_client_config(client_config=self.client_config,
scopes=self.scopes)
flow.redirect_uri = 'http://localhost:5173/'
authorization_url, state = flow.authorization_url(
access_type='offline',
# include_granted_scopes='true',
)
return Response({'url': authorization_url})
def exchange_authorization_code(self, code):
"""Exchange authorization code for access, id, refresh token."""
url = 'https://oauth2.googleapis.com/token'
payload = {
'code': code,
'client_id': settings.GOOGLE_CLIENT_ID,
'client_secret': settings.GOOGLE_CLIENT_SECRET,
'redirect_uri': 'postmessage',
'grant_type': 'authorization_code',
}
response = requests.post(url, data=payload)
return json.loads(response.text)
def get_or_create_user(self, user_info):
"""Get or create a user based on email."""
try:
user = CustomUser.objects.get(email=user_info['email'])
user.refresh_token = user_info['refresh_token']
user.save()
except CustomUser.DoesNotExist:
user = CustomUser()
user.username = user_info['email']
user.password = make_password(CustomAccountManager().make_random_password())
user.email = user_info['email']
user.refresh_token = user_info['refresh_token']
user.save()
store_token(user.id, user_info['access_token'], 'access')
store_token(user.id, user_info['id_token'], 'id')
return user
def call_google_api(self, api_url, access_token):
"""Call Google API with access token."""
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
raise Exception('Google API Error', response)

View File

@ -50,6 +50,7 @@ INSTALLED_APPS = [
'tasks',
'users',
'authentications',
'corsheaders',

View File

@ -21,5 +21,6 @@ urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('users.urls')),
path('api/', include('tasks.urls')),
path('api/', include('authentications.urls')),
path('accounts/', include('allauth.urls')),
]

View File

@ -1,6 +1,6 @@
from googleapiclient.discovery import build
from users.access_token_cache import get_credential_from_cache_token
from authentications.access_token_cache import get_credential_from_cache_token
def get_service(request):

View File

@ -1,19 +1,7 @@
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from rest_framework import serializers
from .models import CustomUser
class MyTokenObtainPairSerializer(TokenObtainPairSerializer):
@classmethod
def get_token(cls, user):
"""
Get the token for the user and add custom claims, such as 'username'.
"""
token = super(MyTokenObtainPairSerializer, cls).get_token(user)
token['username'] = user.username
return token
class CustomUserSerializer(serializers.ModelSerializer):
"""
Serializer for CustomUser model.

View File

@ -1,13 +1,6 @@
from django.urls import path
from rest_framework_simplejwt import views as jwt_views
from .views import ObtainTokenPairWithCustomView, CustomUserCreate, GreetingView, GoogleLogin, GoogleRetrieveUserInfo
from users.views import CustomUserCreate
urlpatterns = [
path('user/create/', CustomUserCreate.as_view(), name="create_user"),
path('token/obtain/', jwt_views.TokenObtainPairView.as_view(), name='token_create'),
path('token/refresh/', jwt_views.TokenRefreshView.as_view(), name='token_refresh'),
path('token/custom_obtain/', ObtainTokenPairWithCustomView.as_view(), name='token_create_custom'),
path('hello/', GreetingView.as_view(), name='hello_world'),
path('dj-rest-auth/google/', GoogleLogin.as_view(), name="google_login"),
path('auth/google/', GoogleRetrieveUserInfo.as_view()),
]

View File

@ -1,46 +1,11 @@
"""This module defines API views for authentication, user creation, and a simple hello message."""
import json
import requests
from django.conf import settings
from django.contrib.auth.hashers import make_password
"""This module defines API views for user creation"""
from rest_framework import status
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.tokens import RefreshToken
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
from allauth.socialaccount.providers.oauth2.client import OAuth2Client
from dj_rest_auth.registration.views import SocialLoginView
from google_auth_oauthlib.flow import InstalledAppFlow
from .access_token_cache import store_token
from .serializers import MyTokenObtainPairSerializer, CustomUserSerializer
from .managers import CustomAccountManager
from .models import CustomUser
class ObtainTokenPairWithCustomView(APIView):
"""
Custom Token Obtain Pair View.
Allows users to obtain access and refresh tokens by providing credentials.
"""
permission_classes = (AllowAny,)
def post(self, request):
"""
Issue access and refresh tokens in response to a valid login request.
"""
serializer = MyTokenObtainPairSerializer(data=request.data)
if serializer.is_valid():
token = serializer.validated_data
return Response(token, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
from .serializers import CustomUserSerializer
class CustomUserCreate(APIView):
@ -60,126 +25,3 @@ class CustomUserCreate(APIView):
if user:
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class GreetingView(APIView):
"""
Hello World View.
Returns a greeting and user information for authenticated users.
"""
permission_classes = (IsAuthenticated,)
def get(self, request):
"""
Retrieve a greeting message and user information.
"""
user = request.user
user_info = {
"username": user.username,
}
response_data = {
"message": "Hello, world!",
"user_info": user_info,
}
return Response(response_data, status=status.HTTP_200_OK)
class GoogleLogin(SocialLoginView):
"""
Google Login View.
Handles Google OAuth2 authentication.
"""
# permission_classes = (AllowAny,)
adapter_class = GoogleOAuth2Adapter
# client_class = OAuth2Client
# callback_url = 'http://localhost:8000/accounts/google/login/callback/'
class GoogleRetrieveUserInfo(APIView):
"""
Retrieve user information from Google and create a user if not exists.
"""
permission_classes = (AllowAny,)
client_config = {"web":{"client_id": settings.GOOGLE_CLIENT_ID,
"project_id":"turtask","auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_secret": settings.GOOGLE_CLIENT_SECRET,
}
}
scopes = [
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/calendar.readonly',
]
def post(self, request):
code = request.data.get("code")
payload = self.exchange_authorization_code(code=code)
if 'error' in payload:
return Response({'error': payload['error']})
user_info = self.call_google_api(api_url='https://www.googleapis.com/oauth2/v2/userinfo?alt=json',
access_token=payload['access_token'])
payload['email'] = user_info['email']
user = self.get_or_create_user(payload)
token = RefreshToken.for_user(user)
response = {
'username': user.username,
'access_token': str(token.access_token),
'refresh_token': str(token),
}
return Response(response)
def get(self, request):
"""Get authorization url."""
flow = InstalledAppFlow.from_client_config(client_config=self.client_config,
scopes=self.scopes)
flow.redirect_uri = 'http://localhost:5173/'
authorization_url, state = flow.authorization_url(
access_type='offline',
# include_granted_scopes='true',
)
return Response({'url': authorization_url})
def exchange_authorization_code(self, code):
"""Exchange authorization code for access, id, refresh token."""
url = 'https://oauth2.googleapis.com/token'
payload = {
'code': code,
'client_id': settings.GOOGLE_CLIENT_ID,
'client_secret': settings.GOOGLE_CLIENT_SECRET,
'redirect_uri': 'postmessage',
'grant_type': 'authorization_code',
}
response = requests.post(url, data=payload)
return json.loads(response.text)
def get_or_create_user(self, user_info):
"""Get or create a user based on email."""
try:
user = CustomUser.objects.get(email=user_info['email'])
user.refresh_token = user_info['refresh_token']
user.save()
except CustomUser.DoesNotExist:
user = CustomUser()
user.username = user_info['email']
user.password = make_password(CustomAccountManager().make_random_password())
user.email = user_info['email']
user.refresh_token = user_info['refresh_token']
user.save()
store_token(user.id, user_info['access_token'], 'access')
store_token(user.id, user_info['id_token'], 'id')
return user
def call_google_api(self, api_url, access_token):
"""Call Google API with access token."""
headers = {
'Authorization': f'Bearer {access_token}'
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
raise Exception('Google API Error', response)