DEV Community

Charlotte Towell
Charlotte Towell

Posted on

Using Pub/Sub to Build a Serverless Async Processing Pipeline on GCP

Google Cloud Run functions seem like the godsend of serverless computing until you hit reach the many different limitations all ultimately related to runtime. I'm talking about the max timeout in functions themselves (3600s), or other products such as API Gateway (300s).

This is the problem I faced at work where we had a long-running computation process that fires off as a result of an API call from a user. Sure, we could set the max deadline to 1 hour and have the user wait for 60 mins just to get a response, not. Plus, as I mentioned, API Gateway caps it at 5 minutes anyway.

This is an obvious use case for asynchronous processing with the ability to start a process, return a response, and keep it running...

... which is something not supported inherently in a serverless function which ceases running once returning a response.

Pub/Sub to the rescue

Pubsub Superhero Meme
My solution: pub/sub messaging

Contrainsted to our serverless setup, one thing I've learnt about Google Cloud is that the limitations of one GCP product generally can be solved by combining with another.

Thus, my multi-step solution to a fully serverless, async processing pipeline with status updates.

Async Process Flow Diagram

The flow is as follows:

1. Client calls an API to start the job

  • This API generates a unique ID
  • Pubsub message is published with this unique ID, and any other attributes
  • Return the unique ID to the client
def publish_to_pubsub(data, attributes=None):
    publisher = pubsub_v1.PublisherClient()
    topic_name = 'projects/myproject/topics/async-messages'
    #convert to bytes
    data_bytes = json.dumps(data).encode('utf-8')
    #publish to topic
    future = publisher.publish(topic_name, data=data_bytes, **attributes)
    future.result() #wait for msg to be published
Enter fullscreen mode Exit fullscreen mode

2. The pubsub topic has push subscribers, optionally with attribute filters. I use this to re-use the same topic for different async processes.

  • Receive the message and a subscriber cloud function runs
  • Check if the long-running process has started based on provided ID
  • If not started, make a HTTPS request to start the long-running cloud function, but don't wait for a resposne
  • We update the database with our ID to mark the function has started

gcloud pubsub subscriptions create EXAMPLE_WORKFLOW --topic=async-messages --push-endpoint=https://example-workflow-cloud-function.a.run.app --ack-deadline=10 --push-auth-service-account=pubsub-push-sa@myproject.iam.gserviceaccount.com --message-filter="attributes.workflow=\"Example Workflow\"" --dead-letter-topic=dead-letter-topic --max-delivery-attempts=5

from multiprocessing import Process

def send_request(url, body):
    authenticated_post_request(url=url, json_payload=body)

process = Process(target=send_request, args=(url, body))
process.start()

process.join(5) #cancel process
if process.is_alive():
   process.terminate()
Enter fullscreen mode Exit fullscreen mode

At this moment, our long-running process has started, essentially on its own in the wide galactic space of Google's servers with no client to receive its response 🌌

3. Long running function does our actual processing

  • This is really just another cloud function now, but our only time constraint is the timeout of cloud functions themself as there is no client listening to a resposne
  • If for whatever reason you need a longer timeout (maybe assess why you're using serverless 🤔) you could chain the process of starting and abandoning cloud functions, so long as you continue to pass the id and any requried data
  • Periodically update the status of the process to provide incremental updates, via the database
    • At this point, you could build more robust re-try handling by adding deadlines for certain status changes & resumability

4. We have another API endpoint to GET the current status.
This is just a database query based on the ID we provided back to the client

UI Job Status Check GIF

Idempotency ?

By using pub/sub, a key requirement is to ensure idempotency as messages are often sent multiple times. This is why the subcriber function must update the database to indicate the job has started.

Retries ?

Pub/sub also has a retry-backoff functionality. A downside of this set-up is that our long-running function does not benefit from this as it is only called once from the subscriber. However any logic we put into the subscriber function can throw an error to result in an unacked message.

So in summary,

Could we not have just started the long-running function from the initial API call?

Yes of course.

The reason I opted to use pub-sub is primarily to allow multiple subscriber functions to fire from messages. By using subscription message filters, you can what would have been a long-running process into several smaller ones to run in parallel.

After all, with serverless, we want to scale horizontally first wherever possible :)

Top comments (0)