DEV Community

Cover image for YubiKey OTP Lambda Authorizer in API Gateway

YubiKey OTP Lambda Authorizer in API Gateway

Do you have a YubiKey? It has a button (or a touch sensor), right? So, plug it in, open a text editor, focus on the text field and press your YubiKey. Then press again. You should see some random lowercase characters. Weird, right? What is that string after all? Let's go back to the features mentioned on the Yubico website. My YubiKey 5 supports the following:

  • Yubico OTP
  • OATH-TOTP and OATH-HOTP
  • FIDO and FIDO2
  • PIV/Smart Card
  • OpenPGP

The first one on this list, Yubico OTP, is the one that generates this seemingly random string of characters. Each YubiKey also has two slots for them: one for short press and one for long press. By default only short press is active and only this one produces the key, it will always use the first slot. However, let's dissect this example string a bit.

cccccbghcbbc rectdidufjjckknnudkkjjjjdhduuttf
Enter fullscreen mode Exit fullscreen mode

Repository for this article available on GitHub

Structure of the OTP key

These characters are encoded as so called ModHex - similarly to base64 it uses printable characters to encode binary data. However, ModHex uses 16 characters and correspond to the hexadecimals. You may ask why not just use hexadecimals 🤔? The idea behind is compatibility with different keyboard layouts. In ModHex alphabet there's no z, y, a, w and q - tailored specifically for QWERTY, QWERTZ and AZERTY layouts. If you use Dvorak, you are unlucky 😅.

I placed the space on position 12 to indicate the part that is always constant. No matter how many times you press the button, this 12 characters will not change. This is the public ID of your OTP key. By default it should be your YubiKey's serial number. The rest of the string is the OTP itself - an AES-128-ECB encrypted blob of data fields that contain the following:

  • Private ID - this is constant password that is the secret shared between you and the server, it has 6 bytes,
  • Usage counter - how many times YubiKey was powered up or session counter hit 256 (2 bytes),
  • Timestamp - 3 bytes incremented with 8 Hz starting from a random value,
  • Session usage counter - how many times you pressed the button since power on (1 byte),
  • Random number - 2 bytes,
  • Checksum - 2 bytes.

YubiKey OTP structure

So it totals to 16 bytes (32 ModHex characters). In general the public ID, private ID, counter are anyhow important - everything else is entropy. Usage counters should be stored in a database so that no two keys can be reused. Despite the session counter being only 8 bytes, reaching 256 causes the usage counter to increase. We can use a single column for the counter because with each OTP, we get both values and knowing that the largest session counter is 256 we can do the following: counter = usage_counter * 1000 + session_counter. This makes it easier for comparisons during authentication. Public ID can be considered as the username. Private ID is the password, similar one to what you would type on the keyboard.

However, how is this OTP key secure? If private ID is constant, how is it safer than a password? As I mentioned, the second part of the OTP is AES encrypted. Unlike SSH, this encryption is symmetric. So the encryption key is the same for the server and for you, just like Private ID. Adding all these random bytes into the string makes it harder to brute-force the private ID. In a database such as DynamoDB it would look something like this:

DynamoDB example

Obviously in this example, storing these values in DynamoDB is not the smartest idea - both AES keys and private IDs should be kept in a very secure service, for example AWS Secrets Manager and just the name of the secret can be stored in DynamoDB.

The default OTP keys

However, just as you bought the YubiKey, some values are generated when you press the button. Turns out that in the factory, YubiKeys are pre-programmed with a unique private ID and encryption key that is stored on Yubico servers. With YubiCloud you can validate each OTP password in your application flow. How to do this? First, you need to obtain a YubiCloud API key. Go to Yubico API key signup, give your email address, click on the second text field and press the key. It will type the OTP and it will let you get the API key. With them we will be able to use the YubiCloud API to authenticate with our service.

Register for Yubico API key

Copy the key into your password manager

The keys and OTP displayed above are just examples. Do not try to use them 😊.

Lambda function

Our Lambda function will be implemented in Python. We will use YubiCloud API to verify the user's OTP key. To authenticate with the cloud, we will use the client ID and the secret key that were obtained above. As yubico-client is not a standard package for Python, we need to include it with the Lambda. Let's start a new Terraform project in a new directory. In this directory create a subdirectory called auth. Open terminal in it and run the following command:

$ mkdir auth
$ cd auth
$ pip install yubico-client -t .
Enter fullscreen mode Exit fullscreen mode

This will install the yubico-client package in the current directory so that in the future we will be able to zip it and upload to AWS. Create a new file called lambda.py. This will be the logic of our authentication function. We will mount it to API Gateway, so the structure of the function should be correct. To not hit the YubiCloud API too often, we will rely on the API Gateway caching the policy for five minutes.

import re, yubico_client, os, base64

# You should use Secrets Manager, this is just an example for simplicity
client = yubico_client.Yubico(os.getenv('CLIENT_ID'), os.getenv('SECRET_KEY'))

def validate_otp(otp):
  """Validate the OTP key using YubiCloud API"""
  try:
    response = client.verify(otp)
    return True if response is True else False # Normalize in case it is None or anything unexpected
  except Exception as e:
    print(f"Error validating OTP: {e}")
    return False

def authenticate(token):
  """Authenticate the user using the OTP key. Returns tuple of public ID and if OTP is valid"""
  if not token:
    raise Exception("Missing Authorization header")

  # From "Basic base64(username:password)", get only "password"
  otp = base64.b64decode(token.split(' ')[1]).decode('utf-8').split(':')[1]

  if not re.match(r'^[cbdefghijklnrtuv]{44}$', otp):
    raise Exception("Invalid OTP format")

  return otp[:12], validate_otp(otp)

def construct_policy(public_id, event):
  _, _, _, region, accountId, apiGwPath = event['methodArn'].split(':')
  restApiId, stage = apiGwPath.split('/')[:2]
  return {
    'principalId' : public_id,
    'policyDocument' : {
      'Version' : '2012-10-17',
      'Statement' : [
        {
          'Action': 'execute-api:Invoke',
          'Effect': 'Allow',
          'Resource': [ f"arn:aws:execute-api:{region}:{accountId}:{restApiId}/{stage}/GET/*" ]
        }
      ]
    }
  }

def lambda_handler(event, context):
  try:
    username, valid = authenticate(event['authorizationToken'])
    if valid:
      policy = construct_policy(username, event)
      return policy
  except Exception as e:
    print(f"Error: {e}")
    raise Exception("Unauthorized")
  raise Exception("Unauthorized")
Enter fullscreen mode Exit fullscreen mode

The API

Using API Gateway we will create a new API that will be used to hide a hidden website that will be protected by this Lambda authorizer. First, let's build just the API with a mock hidden website. For now we will keep it open to see it.

resource "aws_api_gateway_rest_api" "api" {
  name = var.api_name
}

resource "aws_api_gateway_method" "get" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_rest_api.api.root_resource_id
  http_method   = "GET"
  authorization = "NONE" # 👈 We will come back here later
}

resource "aws_api_gateway_stage" "prod" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  stage_name    = var.stage_name
  deployment_id = aws_api_gateway_deployment.prod.id
  lifecycle {
    replace_triggered_by = [aws_api_gateway_deployment.prod]
  }
}

resource "aws_api_gateway_deployment" "prod" {
  rest_api_id = aws_api_gateway_rest_api.api.id

  variables = {
    "deployed_version" = "10" # Change this to force deployment, otherwise you have to do it manually
  }
}

output "api_gateway_url" {
  value = "${aws_api_gateway_deployment.prod.invoke_url}${var.stage_name}"
}
Enter fullscreen mode Exit fullscreen mode

Now as we have the basic API Gateway, we can proceed to add a static site to it. Of course you can do your own integrations but I will just go with a simple mock integration that just returns static text. In here I will just return an empty model from the method and as integration response I will return a static HTML file that I defined in static-site.html file in the same directory as this project. You can put anything in there (for example see the comment in response_templates below). Because of some dependency conditions, we need to also add depends_on to the aws_api_gateway_deployment resource.

resource "aws_api_gateway_integration" "static-site" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_rest_api.api.root_resource_id
  http_method = aws_api_gateway_method.get.http_method
  type        = "MOCK"

  request_templates = {
    "text/html"        = jsonencode({ statusCode = 200 })
    "application/json" = jsonencode({ statusCode = 200 })
  }
}

resource "aws_api_gateway_method_response" "static" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_rest_api.api.root_resource_id
  http_method = aws_api_gateway_method.get.http_method
  status_code = "200"
  response_models = {
    "text/html" = "Empty"
  }
}

resource "aws_api_gateway_integration_response" "static" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_rest_api.api.root_resource_id
  http_method = aws_api_gateway_method.get.http_method
  status_code = "200"

  response_templates = {
    "text/html" = file("${path.module}/static-site.html")
    # You can use this instead 👇
    # "text/html" = "<html><body><h1>Welcome to the hidden site!</h1></body></html>"
  }

  depends_on = [aws_api_gateway_integration.static-site]
}

...

# Changed from the part above
resource "aws_api_gateway_deployment" "prod" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  depends_on  = [aws_api_gateway_integration.static-site]

  variables = {
    "deployed_version" = "10" # Change this to force deployment, otherwise you have to do it manually
  }
}
Enter fullscreen mode Exit fullscreen mode

You got the API Gateway output URL that you can use to access the static site in the browser. For now it's open to everyone (who know the link). However, effectively we want it to be behind the authorizer of YubiKey OTP.

Site open to everyone

Creating the function

First we need to deploy the Lambda. Using hashicorp/archive and hashicorp/null providers we can create a zip file but also install all the dependencies at once. First the null_resource will run pip install in the auth directory where we hold the lambda.py file. Then an archive_file will be created with all the contents of the auth directory. What is very unfortunate is that we have to assign IAM role to Lambda even if we don't need any permissions. But as we have to do it, I will either way assign LambdaBasicExecutionRole so that we can also see the logs in CloudWatch. Lastly we create our Lambda function. Because it's cheaper, I will use ARM64 runtime.

resource "null_resource" "pip_install" {
  provisioner "local-exec" {
    command = "pip install -r ${path.module}/auth/requirements.txt -t ${path.module}/auth/"
    when    = create
  }
}

resource "archive_file" "authorizer_lambda" {
  type        = "zip"
  source_dir  = "${path.module}/auth"
  output_path = "${path.module}/authorizer.zip"
  depends_on  = [null_resource.pip_install]
}

resource "aws_iam_role" "lambda_otp_auth_role" {
  name = "LambdaOtpAuthRole"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic_execution" {
  role       = aws_iam_role.lambda_otp_auth_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_lambda_function" "otp_auth" {
  function_name    = "otp_auth"
  role             = aws_iam_role.lambda_otp_auth_role.arn
  handler          = "lambda.lambda_handler"
  runtime          = "python3.12"
  architectures    = ["arm64"]
  filename         = archive_file.authorizer_lambda.output_path
  source_code_hash = archive_file.authorizer_lambda.output_base64sha256
  environment {
    variables = {
      CLIENT_ID  = var.client_id
      SECRET_KEY = var.secret_key
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

You have to also provide the CLIENT_ID and SECRET_KEY environment variables into the Lambda function. I suggest reading the SECRET_KEY using the following commands in Bash. You will need to run it every time you want to use Terraform after you have closed the terminal. However, on GitHub you will find even better solution where I added Secrets Manager to store the secret key and client ID. I also read it on the startup of the Lambda function.

read -s TF_VAR_yubicloud_secret_key # After running, paste the key, it won't echo
export TF_VAR_yubicloud_secret_key
Enter fullscreen mode Exit fullscreen mode

Creating and connecting the authorizer

So in order to make the Lambda usable as an authorizer for API Gateway, we need two things: create the authorizer but also an IAM role that will allow API Gateway service to invoke this function.

resource "aws_iam_role" "api_gateway_otp_auth_role" {
  name = "ApiGatewayOtpAuthRole"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "apigateway.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy" "api_gateway_otp_auth_policy" {
  name = "ApiGatewayOtpAuthPolicy"
  role = aws_iam_role.api_gateway_otp_auth_role.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action   = "lambda:InvokeFunction"
      Effect   = "Allow"
      Resource = aws_lambda_function.otp_auth.arn
    }]
  })
}

resource "aws_api_gateway_authorizer" "otp_auth" {
  name                   = "otp_auth"
  rest_api_id            = aws_api_gateway_rest_api.api.id
  type                   = "TOKEN"
  authorizer_uri         = aws_lambda_function.otp_auth.invoke_arn
  authorizer_credentials = aws_iam_role.api_gateway_otp_auth_role.arn
}
Enter fullscreen mode Exit fullscreen mode

But that's not all. Despite we lock down the site with authorizer (that is not yet mounted to the API Gateway), we still need to have a way to provide the OTP from the browser. To ask for it we will use the basic HTTP authentication form where the user has to put in the OTP to the password field. We will need to define a custom exception page for 401 HTTP status code. Although I included an instruction in the realm part of the header on how to use it, modern browsers unfortunately stopped showing this message. Instead they show a generic HTTP authentication form.

resource "aws_api_gateway_gateway_response" "unauthorized" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  response_type = "UNAUTHORIZED"
  status_code   = "401"

  response_parameters = {
    "gatewayresponse.header.WWW-Authenticate" = "'Basic realm=\"Place OTP in password field\"'"
  }

  response_templates = {
    "text/html"        = "<html><body><h1>Unauthorized</h1></body></html>"
  }
}
Enter fullscreen mode Exit fullscreen mode

As the last step, we will connect the authorizer to the API Gateway. This is a straightforward step, let's go back to aws_api_gateway_method.get resource. There we need to change the authorization field to use the authorizer we just created and add an authorizer_id field.

resource "aws_api_gateway_method" "get" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_rest_api.api.root_resource_id
  http_method   = "GET"
  authorization = "CUSTOM" # 👈 Changed
  authorizer_id = aws_api_gateway_authorizer.otp_auth.id
}
Enter fullscreen mode Exit fullscreen mode

After applying the changes, our API Gateway will now need us to provide a valid OTP from a YubiKey. What is important to note is that the YubiCloud secret key that you created at the beginning is not tied to your YubiKey. ⚠️ Any YubiKey with a factory OTP key will be able to authenticate! You need to both check the public ID and the signature part of the OTP to only allow your own key (this is not implemented in this example). I will now test the website in action.

Website protected by authorizer

You might ask now, why can you still refresh the page and the API Gateway doesn't ask you for the password again? HTTP Basic authentication is specific as the browser stores the credentials for the domain you logged in to for the lifetime of a session (or whatever they implement). If you inspect the network tab using developer tools, you will see that the header with Authorization is sent every time you refresh the page or even navigate on the same domain.

Network tab with Authorization header

But this is still weird, you know why? Because OTP keys are one-time use and the same token shouldn't be accepted by YubiCloud twice. What's going on here? If you peek to the logs of the Lambda function you will see that it's not executed as often as you refresh the page. There's a property in resource aws_api_gateway_authorizer.otp_auth called authorizer_result_ttl_in_seconds. By default it's set to 300 seconds. Turns out you get session handling (kind of) for free with API Gateway. It will store the contents of the Authorization header in a cache and the policy associated with it. Of course you shouldn't rely on this mechanism. In ideal scenario you should use some JWT or similar as the authorization token and issue them using another page with a login form that will accept YubiKey OTP. Or you can implement session handling using cookies if you don't like the modern SPAs.

Either way, if you wait five minutes and refresh the page, in the developer tools you will see that the Authorization header was sent but you got 401 and you need to provide a fresh OTP key.

Retry after five minutes

Summary

We have successfully implemented a basic authorizer for API Gateway that can validate YubiKey OTP keys. However, as noted, any YubiKey can be used to authenticate, so you need to create some more business logic to allow only your keys. What is more, currently it uses default factory keys that are also kept on Yubico servers. Although, I don't expect that they will be compromised, it's a smarter idea to create your own keys and IDs. It is a bit more complicated process, which I briefly drafted in "Structure of the OTP key" section, in the DynamoDB example.

Top comments (0)