Model deployment with Kafka

In this guide, we will walk through the steps to take any model and deploy it in a scalable infrastructure using Verta and Kafka. With Verta as the serving platform, the models essentially will be able to take their inputs from a Kafka topic and push their outputs to a Kafka topic.

Context

  • A Kafka message is an arbitrary key and value. For more information learn here.

  • Verta endpoint is a containerized microservice serving a model that allows you to make real-time predictions. Learn more about Verta Model Deployment.

  • The inputs to a Verta endpoint need to be JSON.

A Verta endpoint is a containerized microservice serving a model that allows you to make real-time predictions. Kafka messages are arbitrary keys and values. If the values of messages in a Kafka topic are valid JSON, conforming to the expected structure of a deployed model, that Kafka topic can be used as the input topic for a Verta endpoint.

Setup

Pre-requisites:

To get started, you need the following setup;

  • A packaged model ready to be deployed as an endpoint in Verta

  • A Verta installation configured to talk to Kafka brokers. Please contact Verta at help@verta.ai about setting up the configuration and/or connecting to your existing Kafka cluster

  • Kafka topics to be used as the model input source, and the destination for model outputs and errors.

Steps for deployment with Kafka:

Using Web UI

  • During deployment, enable Kafka config and provide the Kafka topics (input topic, output topic, and error topic) as set up in your Kafka cluster.

  • If you have a live end-point, or you are using the one-click deploy option, go to the settings tab for the endpoint to add or update Kafka configuration.

Using Client Example to add Kafka setting when deploying an endpoint:

from verta.endpoint.update import DirectUpdateStrategy
from verta.endpoint import KafkaSettings

kafka_settings = KafkaSettings(
    input_topic="my_input_topic",
    output_topic="my_output_topic",
    error_topic="my_error_topic",
)

endpoint = client.create_endpoint(path="/some-path", kafka_settings=kafka_settings)
print(endpoint.kafka_settings)

Example to update Kafka setting for a deployed endpoint:

from verta.endpoint.update import DirectUpdateStrategy
from verta.endpoint import KafkaSettings

new_kafka_settings = KafkaSettings(
    input_topic="new_input_topic",
    output_topic="new_output_topic",
    error_topic="new_error_topic",
)

endpoint.update(model_version, DirectUpdateStrategy(), kafka_settings=new_kafka_settings)

How does it work?

For an endpoint configured to read from and write to Kafka this is how the Kafka messages will be published and subscribed to;

  • The Verta consumer will subscribe to the input topic.

  • Verta will only process messages sent to Kafka after this subscription starts.

  • The message key will be passed through unchanged to output and error topics. Metadata stored in the message key may help in tracing, auditing, and debugging model behavior. For example, the message key might be a unique identifier like UUID, or a JSON object which contains metadata.

  • The input topic message value is sent as the HTTP request body. The message value should be JSON conforming to the expected model input schema.

  • For successful responses, the response body will be used as the value of a message written to the output topic.

  • For unsuccessful responses, e.g. any HTTP error message in the 400 range, the response body will be used as the value of a message written to the error topic.

Last updated