In the previous post, we have created a Lambda Authorizer for API Gateway that used default factory YubiKey OTP and YubiCloud for validation. Today, we will use the second slot of the YubiKey OTP (long-press) to create a custom AES key and IDs. We will use official YubiKey CLI tool for this purpose. This post will build on top of the previous one, so be sure that you went through it first. If you want the ready code, check out the GitHub repository on tag v2.
Previous post: YubiKey OTP Lambda Authorizer in API Gateway
You will need:
- Terraform/OpenTofu and basic knowledge of it
- Python3 and basic knowledge of it
- Boto3 (AWS) SDK and, you guessed it, basic knowledge of it
- Possibly Docker or any other container software
๐ Setting up the new OTP
ykman
(YubiKey Manager) is a tool that allows you to configure your YubiKey. You can install it for example using Homebrew on macOS: brew install ykman
but the tool is made with Python so you can even use pip install --user yubikey-manager
. On this website you can find the manual for the tool.
After you already have the tool, let's list the OTP slots on your YubiKey:
$ ykman otp info
Slot 1: programmed
Slot 2: empty
As you see by default the second slot is empty. It can be reprogrammed multiple times so don't worry if you use it for now but be aware that if you have something there, you can't retrieve the original config - it's write-only. I used the command below to program the second slot with custom OTP. Do not use the same values as I did, they are just an example. Public ID should be ModHex formatted and other two values simply hexadecimals. I suggest using OpenSSL to get some random values. Store the values you get somewhere safe.
$ PRIVATE_ID=$(openssl rand -hex 6)
$ AES_KEY=$(openssl rand -hex 16)
$ ykman otp yubiotp 2 --public-id vvccccvblhlu \
--private-id $PRIVATE_ID \
--key $AES_KEY
$ echo "Private ID: $PRIVATE_ID ; Key: $AES_KEY"
Private ID: a4b67dc931a1 ; Key: c157d96a6b551f8b9414ab6d94b6a54c
Dissecting the OTP for science ๐งโ๐ฌ
Now we can use the retrieved values in our authorization flow function. But first, let's prototype a bit. I created a simple Python script that prints the information about the OTP. It uses pycryptodomex
and yubico-client
libraries to decode the OTP, decrypt and extract values. These two resources were also helpful in correct implementation:
For the time being, I hardcoded the values of the public ID, private ID and the encryption key. First we split the public and private parts of the OTP. Then we need to decrypt the private part using AES in ECB mode. Lastly we compare if the private ID matches and if the CRC16 checksum is OK. In practice we would also test if the counters are higher than the last values saved in our database.
import yubico_client.modhex as modhex
from Cryptodome.Cipher import AES
TEST_PUBLIC_ID = "vvccccvblhlu"
TEST_PRIVATE_ID = bytes.fromhex("a4b67dc931a1")
TEST_AES_KEY = bytes.fromhex("c157d96a6b551f8b9414ab6d94b6a54c")
def validate(otp: bytes, expected_private: bytes):
m_crc = 0xffff
for this in otp:
m_crc ^= this
for _ in range(8):
j = m_crc & 1
m_crc >>= 1
if j:
m_crc ^= 0x8408
return m_crc == 0xf0b8 and expected_private == otp[:6]
def decode_key(otp):
private_part = otp[12:]
hex_private = list(modhex.translate(private_part, modhex.HEX))[0]
private = bytes.fromhex(hex_private)
decrypted = AES.new(TEST_AES_KEY, AES.MODE_ECB).decrypt(private)
is_valid = validate(decrypted, TEST_PRIVATE_ID)
return {
"public_id": otp[:12],
"private_id": decrypted[:6].hex(":"),
"usage_counter": int.from_bytes(decrypted[6:8], byteorder='little', signed=False),
"timestamp": int.from_bytes(decrypted[8:11], byteorder='little', signed=False),
"session_counter": int.from_bytes(decrypted[11:12], byteorder='little', signed=False),
"random": decrypted[12:14].hex(":"),
"checksum": decrypted[14:16].hex(":"),
"valid": is_valid,
"public_id_valid": otp[:12] == TEST_PUBLIC_ID
}
I created some extra functions to test the code and print the results. I also tried using a different OTP and resend the same OTP multiple times to check if the counters are working as expected as well as use the OTP from slot 1 to see that it shouldn't be validated.
๐ค Creating the database of users and secrets ๐คซ
Now what we can do is to define a DynamoDB Table that will hold our users. There will be just a primary key as this database is unstructured by default. Eventually there will be three values in each item: public ID, ARN of the secret in Secrets Manager and usage counter (combined both counters for simplicity) and maybe additionally some human-readable description such as name.
resource "aws_dynamodb_table" "yubikey_otp_auth" {
name = "yubikey_otp_auth"
billing_mode = "PAY_PER_REQUEST"
hash_key = "public_id"
attribute {
name = "public_id"
type = "S"
}
}
The database is currently empty so it is worth to create a new user that will be our test scenario. Instead of doing it by hand, we can make a script (to run locally with AWS credentials) that will import the private ID and encryption key to Secrets Manager and create an item in DynamoDB.
import boto3, json
dynamodb = boto3.resource("dynamodb")
secretsmanager = boto3.client("secretsmanager")
table = dynamodb.Table("yubikey_otp_auth")
def create_user_secret(public_id: str, private_id: str, key: str):
secret_name = f"yubikey-otp-{public_id}"
secret_string = json.dumps({
"private_id": private_id,
"key": key
})
response = secretsmanager.create_secret(Name=secret_name, SecretString=secret_string)
return response["ARN"]
def create_user(public_id: str, private_id: str, key: str, name: str):
secret_arn = create_user_secret(public_id, private_id, key)
table.put_item(Item={"public_id": public_id, "secret_arn": secret_arn, "usage_counter": 0, "name": name})
if __name__ == "__main__":
public_id = input("Enter the public ID: ")
private_id = input("Enter the private ID: ")
key = input("Enter the key: ")
name = input("Enter the user's name: ")
create_user(public_id, private_id, key, name)
Validating the user in Lambda โ
We have to completely redo the authorizer. First, we can get rid of any references to YubiCloud client. Then we will create AWS clients for DynamoDB and Secrets Manager. I have created a separate class that will be used to parse, decrypt and validate the given OTP. As it has a lot of properties, you can check it out here on GitHub. I will import it to the Lambda function. After the user gives us the OTP, we need to also check if such public ID exists in the database. Otherwise, we don't even have a key to decrypt, so we can simply reject such request. Let's start then! I will remove all the code that is not needed and leave only construct_policy
function that won't change from the previous post. Then I will import necessary libraries and the OTP
class from local process_otp.py
file. Next I will construct the clients that will be used to contact DynamoDB and Secrets Manager. I will also create a helper function to get the password part from the HTTP basic authentication form where we put OTP. The exceptions will help us with logic.
import base64, json, boto3
from process_otp import OTP
dynamodb = boto3.resource("dynamodb")
secretsmanager = boto3.client("secretsmanager")
table = dynamodb.Table("yubikey_otp_auth")
def construct_policy(username, event):
_, _, _, region, accountId, apiGwPath = event['methodArn'].split(':')
restApiId, stage = apiGwPath.split('/')[:2]
return {
'principalId' : username,
'policyDocument' : {
'Version' : '2012-10-17',
'Statement' : [ {
'Action': 'execute-api:Invoke',
'Effect': 'Allow',
'Resource': [ f"arn:aws:execute-api:{region}:{accountId}:{restApiId}/{stage}/GET/*" ]
} ]
}
}
def get_token_from_event(event):
token = event['authorizationToken']
if not token:
raise Exception("Missing Authorization header")
# From "Basic base64(username:password)", get only "password"
return base64.b64decode(token.split(' ')[1]).decode('utf-8').split(':')[1]
Now we need a function that will retrieve user's data from DynamoDB. If the user exists, we will also get associated secret from Secrets Manager and return private ID, encryption key and usage counter. If any of the following steps fail, such as some fields are missing, the function will throw an exception.
def get_user_from_database(otp: OTP) -> Tuple[bytes, bytes, int]:
item = table.get_item(Key={"public_id": otp.public_id})
if not 'Item' in item:
raise Exception(f"User {otp.public_id} not found!")
response = secretsmanager.get_secret_value(SecretId=item['Item']['secret_arn'])
secret = json.loads(response['SecretString'])
private_id = bytes.fromhex(secret['private_id'])
key = bytes.fromhex(secret['key'])
return private_id, key, item['Item']['usage_counter']
Next function is very simple - it will update the usage counter in DynamoDB so that we can't use the same OTP multiple times (replay attack). For simplicity, I assume that the function will succeed every time and the write operation will be consistent but for real-world secure usage, you should double verify this if the new value is in place. We will just take values from the constructed OTP object and directly put them into the database.
def update_usage_counter(otp: OTP):
table.update_item(
Key={"public_id": otp.public_id},
UpdateExpression="SET usage_counter = :c",
ExpressionAttributeValues={':c': otp.combined_counter}
)
The last function is the main handler. Be sure to remove the old handler from previous post if you have one. This just combines the logic: extract the OTP from event, construct the OTP object, find the user in the database, decrypt the OTP object to fill all fields, validate that it is correct and not replayed, update database and finally give API Gateway a policy to grant access to the website. In case anything fails, catch the exception and return a 401.
def lambda_handler(event, context):
try:
token = get_token_from_event(event)
otp = OTP(token)
private_id, key, usage_counter = get_user_from_database(otp)
otp.decrypt(key)
if otp.validate(private_id, usage_counter):
policy = construct_policy(otp.public_id, event)
update_usage_counter(otp)
return policy
except Exception as e:
print(f"Error: {e}")
raise Exception("Unauthorized")
raise Exception("Unauthorized")
Before deploying you need to be sure that your requirements.txt
file contains pycryptodomex
and yubico-client
libraries. What is also worth noting is that you need to install them using Amazon Linux 2023 so that all the encryption functions function as expected. You can use Docker for that. The following Terraform resource should help you with that:
resource "null_resource" "pip_install" {
provisioner "local-exec" {
command = <<-EOF
docker run --rm \
-v ${path.module}/auth:/build \
amazonlinux:2023 \
/bin/sh -c 'yum -y install python3-pip; \
cd /build; \
pip install -t . -r requirements.txt'
EOF
when = create
}
# I disabled the following command because it looks scary but it did not wipe
# my drive ๐
. If you feel confident, use it. If not, delete all directories
# inside `auth` by yourself.
#provisioner "local-exec" {
# command = "sh -c 'find ${path.module}/auth -mindepth 1 -maxdepth 1 -type d -exec rm -rf {} +'"
# when = destroy
#}
}
๐ Lambda's permissions
Even though we have updated the Lambda function for validation, it will still not work. We need to add permissions for DynamoDB and Secrets Manager so that the Lambda function can read and write to the database and secrets. The following code will create a new IAM policy just for that. It will allow also to read any secret that starts with yubikey-otp-
. Attach it further to the Lambda function role.
data "aws_region" "current" {}
data "aws_caller_identity" "current" {}
data "aws_iam_policy_document" "lambda_otp_auth_policy" {
statement {
actions = [
"dynamodb:UpdateItem",
"dynamodb:GetItem",
"dynamodb:PutItem"
]
resources = [aws_dynamodb_table.yubikey_otp_auth.arn]
}
statement {
actions = ["secretsmanager:GetSecretValue"]
resources = [
"arn:aws:secretsmanager:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:secret:yubikey-otp-*"
]
}
}
resource "aws_iam_policy" "lambda_otp_auth_policy" {
name = "LambdaOtpAuthPolicy"
policy = data.aws_iam_policy_document.lambda_otp_auth_policy.json
}
resource "aws_iam_role_policy_attachment" "lambda_otp_auth_policy" {
role = aws_iam_role.lambda_otp_auth_role.name
policy_arn = aws_iam_policy.lambda_otp_auth_policy.arn
}
๐งช Testing and summary
Now you should be able to deploy all the infrastructure and use the new system. Make sure that you have created a user in the database using the script we did previously. To speed up the testing process, I suggest lowering the TTL of authorizer cache to 60 seconds.
resource "aws_api_gateway_authorizer" "otp_auth" {
name = "otp_auth"
...
authorizer_result_ttl_in_seconds = 60
}
Especially if something is not working and you still have the previous implementation, or you created some resources along the way, the best approach is to destroy everything and rebuild using tofu destroy
and tofu apply
. This way you can ensure that everything cleans up and all the resources fit together. I had some issues with the API Gateway setup during development when Gateway/Integration Request/Response were not recreated when supposed to.
Of course this setup is not perfectly secure, I guarantee you that Yubico's deprecated Yubikey-VAL and Yubikey-KSM servers were much more thought through, but as mentioned, they are deprecated and used only for reference (like this post ๐). You can look for them under the following link: YubiCloud Validation Servers
๐ Bonus!
Under tag v2.1
you can find extra code that will allow you to get the name of the user from the database, add it to the policy context and greet this user in the response. To the get_user_from_database
function, add the following return field that will fall back to public ID if the name is not set:
def get_user_from_database(otp: OTP) -> Tuple[bytes, bytes, int]:
item = table.get_item(Key={"public_id": otp.public_id})
if not 'Item' in item:
raise Exception(f"User {otp.public_id} not found!")
response = secretsmanager.get_secret_value(SecretId=item['Item']['secret_arn'])
secret = json.loads(response['SecretString'])
private_id = bytes.fromhex(secret['private_id'])
key = bytes.fromhex(secret['key'])
name = item['Item']['name'] if 'name' in item['Item'] else f"Anonymous {otp.public_id}"
return private_id, key, item['Item']['usage_counter'], name
Then change the construct_policy
function, to add the following argument and context structure to the policy:
def construct_policy(username, event, name = None):
_, _, _, region, accountId, apiGwPath = event['methodArn'].split(':')
restApiId, stage = apiGwPath.split('/')[:2]
return {
'principalId' : username,
'policyDocument' : {
'Version' : '2012-10-17',
'Statement' : [
{
'Action': 'execute-api:Invoke',
'Effect': 'Allow',
'Resource': [ f"arn:aws:execute-api:{region}:{accountId}:{restApiId}/{stage}/GET/*" ]
}
]
},
'context' : {
'name' : name if name else username
}
}
At last update the lambda_handler
function to pass the name to the policy:
def lambda_handler(event, context):
try:
token = get_token_from_event(event)
otp = OTP(token)
private_id, key, usage_counter, name = get_user_from_database(otp)
otp.decrypt(key)
if otp.validate(private_id, usage_counter):
policy = construct_policy(otp.public_id, event, name)
update_usage_counter(otp)
return policy
except Exception as e:
print(f"Error: {e}")
raise Exception("Unauthorized")
raise Exception("Unauthorized")
Now in the static site HTML you can use the context field using $
expression:
<div>
<h1>Hidden site</h1>
<p>This is a hidden site. Only secrets here!</p>
<p>Hello, <span id="name">$context.authorizer.name</span>!</p>
</div>
That way you can greet the user based on their public ID. This can be later extended to control which resources in the API user should be able to access.
Top comments (0)