Skip to main content

Make Real-Time AI a Reality with Weaviate + Confluent

· 10 min read
Mohd Shukri Hasan

Today, we’re excited to announce our new integration with Confluent Cloud. Weaviate users now have simple access to data streams from across their entire business to build a real-time, contextual, and trustworthy knowledge base fueling their AI applications. Confluent’s data streaming platform bridges the legacy and modern data stack, providing a continuous supply of AI-ready data for development of sophisticated customer experiences with constant awareness of what’s happening in the world and their business right now.

“With our new integration with Confluent Cloud, Weaviate is taking a giant leap forward in empowering businesses to build AI applications that are not just smart, but also real-time and context-aware. Now, you can seamlessly tap into data streams from every corner of your enterprise, creating a continuously updated knowledge base that lets your AI systems respond to the world as it happens.”

— Etienne Dilocker, Chief Technology Officer, Weaviate

High-value, trusted AI applications require real-time data

Real-time AI needs real-time data from everywhere. The promise of real-time AI is only unlocked when models have all the fresh contextual data they need to respond just in time with the most accurate, relevant, and helpful information. However, building these real-time data connections across on-prem, multicloud, public, and private cloud environments for AI use cases is non-trivial.

Traditional data integration and processing tools are batch-based and inflexible, creating an untenable number of tightly coupled point-to-point connections that are hard to scale and lack governance. As a result, data made available is stale and of low fidelity. This introduces unavoidable latency into the AI application and may outright block implementation altogether. The difficulty in gaining access to high-quality, ready-to-use, contextual and trustworthy data in real-time is hindering developer agility and the pace of AI innovation.

Confluent’s data streaming platform fuels Weaviate with real-time data

With Confluent, Weaviate users can break down data silos, promote data reusability, improve engineering agility, and foster greater trust throughout their organization. This allows more teams to securely and confidently unlock the full potential of all their data with Weaviate. Confluent enables organizations to make real-time contextual inferences on an astonishing amount of data by bringing well curated, trustworthy streaming data to hybrid search and generative AI applications.

With easy access to data streams from across their entire business, Weaviate users can now:

  • Create a real-time knowledge base: Build a shared source of real-time truth for all your operational and analytical data, no matter where it lives for sophisticated model building and fine-tuning. Think business competitive analysis dashboards that are updated with latest market news updates.
  • Bring real-time context at query time: Convert raw data into meaningful chunks with real-time enrichment and continually update your embedding databases for your GenAI use cases. Think real-time filtering based on region, demographics, personas in online shopping, etc.
  • Build governed, secured, and trusted AI: Establish data lineage, quality and traceability, providing all your teams with a clear understanding of data origin, movement, transformations and usage.
  • Experiment, scale and innovate faster: Reduce innovation friction as new AI apps and models become available. Decouple data from your data science tools and production AI apps to test and build faster.

Weaviate and Confluent enable simple development of real-time AI applications

Our new Confluent integration enables all your teams to tap into a continuously enriched real-time knowledge base, so they can quickly scale and build AI-enabled applications using trusted data streams. Let’s take a look at how it works:

Figure1Dark Figure1Light

The integration architecture is designed to be both robust and straightforward, ensuring a seamless flow of real-time data from Confluent Cloud to Weaviate Cloud Services.

  1. Kafka Topic in Confluent Cloud: The journey begins in Confluent Cloud, where you create a fully managed Kafka topic that holds the data you wish to stream into Weaviate.
  2. Spark Cluster with confluent-connector Library: Next, you'll spin up a Spark cluster loaded with our specialized Confluent-Connector library. This cluster acts as the data processing engine that facilitates the data flow between Confluent Cloud and Weaviate.
  3. Streaming DataFrame in Spark: Within the Spark environment, you'll create a streaming DataFrame. This DataFrame is configured to continuously pull the data from your designated Kafka topic in Confluent Cloud.
  4. DataFrame to Weaviate via Confluent-Connector: Finally, the streaming DataFrame is written into Weaviate using the Confluent-Connector. This ensures that your Weaviate knowledge base is always up-to-date, enriched with real-time data streaming from Confluent.

By connecting these components, you create a powerful, real-time data pipeline that enables your teams to build AI-enabled applications with a continuously enriched and trustworthy knowledge base.

Getting Started

We'll be using code snippets from a Python notebook, which assumes you're running a local Spark cluster in standalone mode. For a comprehensive, end-to-end walkthrough, you can access the full Python notebook here.

tip

A version of the notebook designed to run on databricks and Weaviate Cloud Services (WCS) can be found here.

Configure The Spark Session

jar_packages = [
"org.apache.spark:spark-avro_2.12:3.4.1",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
]

CONFLUENT_WEAVIATE_JAR = "../target/scala-2.12/confluent-connector_2.12-3.4.0_0.0.1.jar"

spark = (
SparkSession.builder.appName("demo-confluent-weaviate-integration")
.config("spark.jars.packages", ",".join(jar_packages))
.config("spark.jars", CONFLUENT_WEAVIATE_JAR)
.getOrCreate()
)

This code snippet sets up the Spark environment required for the Confluent Cloud-Weaviate integration. It specifies the necessary JAR packages, including Avro and Kafka SQL libraries, to ensure compatibility and functionality with Confluent Cloud. Additionally, a custom JAR file, confluent-connector_2.12-3.4.0_0.0.1.jar, is included to establish the connection between Spark and Weaviate. By running this script, you initialize a Spark session with all the required configurations, laying the groundwork for streaming data from Confluent Cloud to Weaviate Cloud Services.

Create A Schema In Weaviate

Before you can start streaming data into Weaviate, you'll need to create a schema to define the structure of the Kafka messages you'll be storing. Here's how to do it:

with open("../src/it/resources/schema.json", "r") as f:
weaviate_schema = json.load(f)

client.schema.create_class(weaviate_schema)

This code snippet reads a JSON file containing the schema definition and uses the Weaviate client's create_class method to create the schema in Weaviate. This sets the stage for the data you'll be streaming from Confluent Cloud.

Create A Streaming DataFrame To Stream A Topic From Confluent Cloud

After initializing your Spark environment, the next step is to configure the Kafka data stream that you'll be pulling from Confluent Cloud. Here's how to set it up:

caution

In this example, we've already set up a topic on Confluent Cloud that receives data from the DataGen source, specifically utilizing the clickstream user template.

clickstreamDF = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", confluentBootstrapServers)
.option("subscribe", confluentTopicName)
.option("startingOffsets", "latest")
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
confluentApiKey, confluentSecret
),
)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("failOnDataLoss", "false")
.option("name", "clickStreamReadFromConfluent")
.load()
)

This code snippet creates a streaming DataFrame named clickstreamDF in Spark, configured to read from your Kafka topic in Confluent Cloud. It specifies various options like the bootstrap servers, topic name, and security protocols, ensuring a secure and efficient data stream. The confluentApiKey and confluentSecret are used for authentication, and the stream is set to start reading from the latest available data. By running this code, you establish a real-time data stream from Confluent Cloud to your Spark environment, setting the stage for the next step: writing this data into Weaviate.

Define A Function To Write The Kafka Messages Into Weaviate

Once you've established a real-time data stream from Confluent Cloud to your Spark environment, the final step is to write this data into Weaviate. Here's the Python code snippet that accomplishes this:

def f(df, batch_id):
global total_rows_processed
row_count = df.count()
total_rows_processed += row_count

print(f"Number of rows in the batch with batch id {batch_id}: {row_count}")
df.write.format("io.weaviate.confluent.Weaviate"
).option(
"batchsize", 200
).option(
"scheme", "http"
).option(
"host", weaviate_host
).option(
"apiKey", weaviate_api_key
).option(
"className", weaviate_schema["class"]
).option(
"schemaRegistryUrl", schemaRegistryUrl
).option(
"schemaRegistryApiKey", confluentRegistryApiKey
).option(
"schemaRegistryApiSecret", confluentRegistrySecret
).mode(
"append"
).save()

This code snippet defines a function f that takes a DataFrame df and a batch_id as inputs. The function writes the DataFrame into Weaviate using the confluent-connector library. It specifies various options, such as the batch size, Weaviate host, and API key for authentication.

One of the key features of this integration is its seamless interaction with Confluent's Schema Registry. The integration automatically extracts the schema ID from the message's value and queries the Schema Registry to retrieve the associated schema. This schema is then used to deserialize the rest of the data, ensuring that the data is accurately streamed into Weaviate.

Write The Streaming DataFrame

With your Spark environment initialized and your data stream configured, you're now ready to start the actual data streaming process from Confluent Cloud to Weaviate. Here's the Python code snippet to kick things off:

clickstreamDF.writeStream.foreachBatch(f)
.queryName("write_stream_to_weaviate")
.start()

This code snippet uses the writeStream method on the clickstreamDF DataFrame, which contains the real-time data stream from Confluent Cloud in microbatches. The foreachBatch(f) function call specifies that for each batch of data, the function f defined earlier will be executed. This function writes the data into Weaviate, as configured.

The queryName("write_stream_to_weaviate") sets a name for the query, making it easier to monitor and manage. Finally, the start() method initiates the data streaming process, effectively sending your real-time data from Confluent Cloud into Weaviate Cloud Services.

Here's a sample of what the data looks like in Weaviate:

{
'class': 'Clickstream',
'creationTimeUnix': 1694786286944,
'id': '00fa91d1-453d-4d6f-8b05-5ea56fd81a75',
'lastUpdateTimeUnix': 1694786286944,
'properties': {
'_kafka_key': '202882',
'_kafka_offset': 34052,
'_kafka_partition': 2,
'_kafka_schemaId': 100002,
'_kafka_timestamp': '2023-09-15T13:58:01.245Z',
'_kafka_timestampType': 0,
'_kafka_topic': 'clickstreams-users',
'city': 'Palo Alto',
'first_name': 'Elwyn',
'last_name': 'Vanyard',
'level': 'Platinum',
'registered_at': 1502262440871,
'user_id': 202882,
'username': 'AbdelKable_86'
},
'vectorWeights': None
}

Key Points to Note:

  1. Kafka Message Fields in Weaviate: All the fields that form a Kafka message have been ingested into Weaviate. These fields are prefixed with _kafka, such as _kafka_key, _kafka_offset, and so on. This prefixing is incredibly useful for data lineage, allowing you to easily trace the origin of each object back to its Kafka source.
  2. Automatic Deserialization: Thanks to the integration with Confluent's Schema Registry, the data in the Kafka message has been automatically deserialized. Each key in the message's data becomes its own property in the corresponding Weaviate object. For example, city, first_name, and last_name are all individual properties in Weaviate, making the data immediately usable for your AI applications.

Closing CTAs

Not yet a Weaviate customer? Kickstart your AI journey with a free sandbox environment in Weaviate Cloud Services today - no credit card required.

Not yet a Confluent customer? Start your free trial of Confluent Cloud today. New signups receive $400 to spend during their first 30 days—no credit card required.

Join the Open-Source Movement: This integration is open-source, and we welcome your participation in its development. Whether it's sharing feedback, filing bug reports, or contributing code, your collaboration can help make this integration even better. Learn how to contribute.

Ready to start building?

Check out the Quickstart tutorial, and begin building amazing apps with the free trial of Weaviate Cloud Services (WCS).

Don't want to miss another blog post?

Sign up for our bi-weekly newsletter to stay updated!
By submitting, I agree to the Terms of Service and Privacy Policy.