Implementing a Serverless Authentication System with JWT, Dynamo DB, Secrets Manager and KMS
In this section, we will develop a serverless authentication system using JWT authentication. This system effectively transmits information from the client and authenticates users to gain access to endpoints containing private information.
JWT authentication is a secure method for transmitting information between parties as a JSON object. To gain a deeper understanding of JWT tokens and their functionality, you can refer to the article JSON Web Tokens.
The design for the Lambda functions we intend to create will look like this:
Setting Up the DynamoDB Tables
To get started, we must create tables to store user credentials securely. For maximum decoupling of environments, proceed to your AWS console and create three separate tables, each designated for a specific stage: Dev-Auth
, Staging-Auth
and Prod-Auth
.
Once you have obtained the ARNs for these tables, let's integrate them into the cdk.json
file within the corresponding environment.
Next, we'll create a new variable class within the DynamoDB class to reference our JWT tables.
infra/services/dynamodb.py | |
---|---|
Implementing Password Hashing with KMS
As we're dealing with sensitive data such as passwords, storing them in plain text poses a significant security risk. To mitigate this risk, we'll utilize KMS (Key Management Service), an AWS resource designed for hashing passwords and other sensitive information.
To create a new KMS service, execute the following command:
This command creates a new file within the infra/services
directory specifically for managing KMS keys.
infra
└── services
├── __init__.py
├── api_gateway.py
├── aws_lambda.py
├── dynamodb.py
├── kms.py
├── layers.py
├── s3.py
└── secrets_manager.py
Next, navigate to your AWS KMS console on AWS and create a new key. Then, update the KMS class with the ARN of the newly generated key.
from aws_cdk import aws_kms as kms
class KMS:
def __init__(self, scope, context) -> None:
self.auth_key = kms.Key.from_key_arn(
scope,
"AuthKey",
key_arn="$AUTH-KEY-ARN",
)
Creating a JWT Secret on Secrets Manager
To validate JWT tokens securely, a secret is essential. This secret, usually a random string, acts as a key for verifying whether the token was generated from a trusted source. It ensures that only authorized parties can generate and verify tokens, preventing unauthorized access to protected resources.
By storing the secret securely, you safeguard the integrity and confidentiality of your authentication system, mitigating the risk of unauthorized access and data breaches. Having that said, navigate to AWS Secrets Manager, create a new secret and save your random string there.
After obtaining the secret ARN from AWS Secrets Manager, integrate it into the Secrets Manager class.
Using the PYJWT Public Layer
To hash our JWT tokens, we'll leverage the widely-used Python library called pyjwt
. Due to its popularity, AWS conveniently offers it as a public layer, streamlining our authentication implementation.
- PYJWT:
arn:aws:lambda:us-east-2:770693421928:layer:Klayers-p39-PyJWT:3
Let's now create a new class variable refencing the pyjwt layer.
Don't forget to add the pyjwt layer in the requirements.txt
requirements.txt | |
---|---|
Implementing the SignUp Function
Now that we have all the necessary components set up, it's time to develop our authentication logic. We'll begin with the signup function, which is responsible for receiving an email and a password from the user. This function will then store them in the database, ensuring that the user is unique and storing a hashed version of the password for security purposes.
forge function signup --method "POST" --description "Securely handle user registration with unique credentials." --public --belongs-to auth --no-tests --endpoint signup
This command generates a new function within the auth
directory.
The signup functionality can be implemented as follows:
import json
import os
from dataclasses import dataclass
import boto3
@dataclass
class Input:
email: str
password: int
@dataclass
class Output:
pass
def encrypt_with_kms(plaintext: str, kms_key_id: str) -> str:
kms_client = boto3.client("kms")
response = kms_client.encrypt(KeyId=kms_key_id, Plaintext=plaintext.encode())
return response["CiphertextBlob"]
def lambda_handler(event, context):
# Retrieve the DynamoDB table name and KMS key ID from environment variables.
AUTH_TABLE_NAME = os.environ.get("AUTH_TABLE_NAME")
KMS_KEY_ID = os.environ.get("KMS_KEY_ID")
# Initialize a DynamoDB resource.
dynamodb = boto3.resource("dynamodb")
# Reference the DynamoDB table.
auth_table = dynamodb.Table(AUTH_TABLE_NAME)
# Parse the request body to get user data.
body = json.loads(event["body"])
# Verify if the user already exists.
user = auth_table.get_item(Key={"PK": body["email"]})
if user.get("Item"):
return {
"statusCode": 400,
"body": json.dumps({"message": "User already exists"}),
}
# Encrypt the password using KMS.
encrypted_password = encrypt_with_kms(body["password"], KMS_KEY_ID)
# Insert the new user into the DynamoDB table.
auth_table.put_item(Item={"PK": body["email"], "password": encrypted_password})
# Return a successful response with the newly created user ID.
return {"statusCode": 201}
This Lambda function basically handles user signup by encrypting passwords with KMS and storing them in DynamoDB, ensuring secure user registration.
With our implementation ready, let's configure it to utilize AWS resources for seamless functionality.
from infra.services import Services
class SignUpConfig:
def __init__(self, services: Services) -> None:
function = services.aws_lambda.create_function(
name="SignUp",
path="./functions/auth",
description="Securely handle user registration with unique credentials.",
directory="signup",
environment={
"AUTH_TABLE_NAME": services.dynamodb.auth_table.table_name,
"KMS_KEY_ID": services.kms.auth_key.key_id,
},
)
services.api_gateway.create_endpoint("POST", "/signup", function, public=True)
services.dynamodb.auth_table.grant_read_write_data(function)
services.kms.auth_key.grant_encrypt(function)
Implementing the SignIn Functionality
Now that the signup functionality is in place, let's proceed with the implementation of the signin function. This function will handle user input of email and password, verify them against existing credentials in the database, and decrypt the encrypted password to authenticate the user.
forge function signin --method "POST" --description "Authenticate user login by verifying email and password against stored credentials" --public --belongs-to auth --no-tests --endpoint signin
Here's our updated folder structure:
functions
└── auth
├── signin
│ ├── __init__.py
│ ├── config.py
│ └── main.py
└── signup
├── __init__.py
├── config.py
└── main.py
And now, it's implementation.
Note that upon matching the input password with the encrypted password, the email is encoded within the JWT token and returned to the client, specifically on line 62. This step is crucial for facilitating retrieval of this information at a later stage.
Now, let's move on to configure the signin function.
from infra.services import Services
class SigninConfig:
def __init__(self, services: Services) -> None:
function = services.aws_lambda.create_function(
name="Signin",
path="./functions/auth",
description="Authenticate user login by verifying email and password against stored credentials",
directory="signin",
layers=[services.layers.sm_utils_layer, services.layers.pyjwt_layer],
environment={
"AUTH_TABLE_NAME": services.dynamodb.auth_table.table_name,
"KMS_KEY_ID": services.kms.auth_key.key_id,
"JWT_SECRET_NAME": services.secrets_manager.jwt_secret.secret_name,
},
)
services.api_gateway.create_endpoint("POST", "/signin", function, public=True)
services.dynamodb.auth_table.grant_read_data(function)
services.kms.auth_key.grant_decrypt(function)
services.secrets_manager.jwt_secret.grant_read(function)
Creating the JWT Authorizer
Now that we have the signin function, it returns a token to the client, typically a frontend application, which must include this token in the headers of subsequent requests protected by the JWT authorizer. The authorizer's role is to decode if the token was generated with the same hash as its creation, and if so, decode the token and pass the email to the protected functions.
With that being said, let's proceed with its implementation.
This command creates a new jwt
authorizer under the authorizers
folder.
Now, let's proceed with the implementation.
import os
import jwt
import sm_utils
def lambda_handler(event, context):
# Extract the JWT token from the event
token = event["headers"].get("authorization")
# Retrieve the JWT secret from Secrets Manager
JWT_SECRET_NAME = os.environ.get("JWT_SECRET_NAME")
JWT_SECRET = sm_utils.get_secret(JWT_SECRET_NAME)
try:
# Decode the JWT token
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
effect = "allow"
email = decoded_token.get("email")
except:
effect = "deny"
email = None
# Set the decoded email as context
context = {"email": email}
# Allow access with the user's email
return {
"context": context,
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": effect,
"Resource": event["methodArn"],
}
],
},
}
This function attempts to decode the token received in the headers under the key authorization
using the same JWT secret stored in Secrets Manager that was used during its generation. If successful, it retrieves the hashed email from the signin function and passes it as context.
Now, let's set up our new JWT authorizer.
from infra.services import Services
class JwtAuthorizerConfig:
def __init__(self, services: Services) -> None:
function = services.aws_lambda.create_function(
name="JwtAuthorizer",
path="./authorizers/jwt",
description="A jwt authorizer for private lambda functions",
layers=[services.layers.sm_utils_layer, services.layers.pyjwt_layer],
environment={
"JWT_SECRET_NAME": services.secrets_manager.jwt_secret.secret_name
},
)
services.api_gateway.create_authorizer(function, name="jwt", default=False)
services.secrets_manager.jwt_secret.grant_read(function)
Creating a Private Function
Now it's time to create a simple private function that can only be acessible through requests that passes the validations made through the authorizer.
This command creates a standalone function in the root of the functions
folder.
Now, let's implement a very straightforward function that should simply retrieve the email decoded by the authorizer and return it to the user.
import json
from dataclasses import dataclass
@dataclass
class Input:
pass
@dataclass
class Output:
message: str
def lambda_handler(event, context):
email = event["requestContext"]["authorizer"]["email"]
return {"statusCode": 200, "body": json.dumps({"message": f"Hello, {email}!"})}
Finally, it's configuration.
from infra.services import Services
class HelloConfig:
def __init__(self, services: Services) -> None:
function = services.aws_lambda.create_function(
name="Hello",
path="./functions/hello",
description="A private function",
)
services.api_gateway.create_endpoint("GET", "/hello", function, authorizer="jwt")
Note that because we didn't specify the JWT authorizer as default, and this function isn't marked as public, we need to explicitly pass the authorizer's name to the create_endpoint
method.
Deploying the Functions
Next, we'll commit our code and push it to GitHub, following these steps:
# Send your changes to stage
git add .
# Commit with a descriptive message
git commit -m "JWT Authentication System"
# Push changes to the 'dev' branch
git push origin dev
# Merge 'dev' into 'staging' and push
git checkout staging
git merge dev
git push origin staging
# Finally, merge 'staging' into 'main' and push
git checkout main
git merge staging
git push origin main
This sequence ensures our code passes through development, staging, and finally, production environments, activating our three distinct deployment pipelines.
After the pipelines complete, the Authentication system should be available across development, staging, and production stages.
Testing the Functions
Let's start by testing the signup function with the credentials below:
- Email:
tutorial@lambda-forge.com
- Password:
12345678
curl --request POST \
--url https://api.lambda-forge.com/signup \
--header 'Content-Type: application/json' \
--data '{
"email": "tutorial@lambda-forge.com",
"password": "12345678"
}'
The endpoint returns a status code 201
.
However, if we navigate to the Prod-Auth
Table on the Dynamo DB console, we'll notice that the password stored isn't simply 12345678
, but rather a significantly lengthy hash string:
AQICAHinYrMBzzQKgEowcHc4llDo3C5gg+cRawehAsWTMZ24iwEvX3NrQs9oYi0hD2YnB28hAAAAZjBkBgkqhkiG9w0BBwagVzBVAgEAMFAGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMEeMCuyCVk4C+Nr4OAgEQgCOEKlx01+tGfqKTNXSktApuxUI31EnwzLt7GdW0wdXrT+Yu+A==
This showcases the robustness of the security measures in place to safeguard passwords.
Now, let's utilize the same credentials to log in:
curl --request POST \
--url https://api.lambda-forge.com/signin \
--header 'Content-Type: application/json' \
--data '{
"email": "tutorial@lambdaforge.com",
"password": "12345678"
}'
The signin endpoint returns a token:
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6InR1dG9yaWFsQGxhbWJkYWZvcmdlLmNvbSJ9.ppQLiYZ-6AtHdwaCb-H-vJnjTCle9ppULqq5-TqVPjk"
}
Next, let's attempt a GET request to the hello function without headers:
This returns the message:
However, if we pass the token generated by the signin function:
curl --request GET \
--url https://api.lambda-forge.com/hello \
--header 'authorization: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6InR1dG9yaWFsQGxhbWJkYWZvcmdlLmNvbSJ9.ppQLiYZ-6AtHdwaCb-H-vJnjTCle9ppULqq5-TqVPjk'
We receive the desired output:
🎉 Congratulations! You've successfully implemented a JWT authentication system, securing your endpoints.🔒