Async API
weaviate-client
v4.7.0
The async Python client is available in weaviate-client
versions 4.7.0
and higher.
The Python client library provides a synchronous API by default, but an asynchronous API is also available for concurrent applications.
For asynchronous operations, use the WeaviateAsyncClient
async client, available in weaviate-client
v4.7.0
and up.
The WeaviateAsyncClient
async client largely supports the same functions and methods as the WeaviateClient
synchronous client, with the key difference that the async client is designed to be used in an async
function running in an asyncio
event loop.
Installation
The async client is already included in the weaviate-client
package. Follow the installation instructions in the Python client library documentation.
Instantiation
An async client WeaviateAsyncClient
object can be instantiated using a helper function, or by explicitly creating an instance of the class.
Instantiation helper functions
These instantiation helper functions are similar to the synchronous client helper functions, and return an equivalent async client object.
use_async_with_local
use_async_with_weaviate_cloud
use_async_with_custom
However, the async helper functions do not connect to the server as their synchronous counterparts do.
When using the async helper functions, you must call the async .connect()
method to connect to the server, and call .close()
before exiting to clean up. (Except when using a context manager.)
The async helper functions take the same parameters for external API keys, connection timeout values and authentication details.
- WCD
- Local
- Custom
import weaviate
from weaviate.classes.init import Auth
import os
# Best practice: store your credentials in environment variables
wcd_url = os.environ["WCD_DEMO_URL"]
wcd_api_key = os.environ["WCD_DEMO_RO_KEY"]
async_client = weaviate.use_async_with_weaviate_cloud(
cluster_url=wcd_url, # Replace with your Weaviate Cloud URL
auth_credentials=Auth.api_key(wcd_api_key), # Replace with your Weaviate Cloud key
)
import weaviate
async_client = weaviate.use_async_with_local()
import weaviate
import os
async_client = weaviate.use_async_with_custom(
http_host="localhost",
http_port=8080,
http_secure=False,
grpc_host="localhost",
grpc_port=50051,
grpc_secure=False,
)
Explicit instantiation
If you need to pass custom parameters, use the weaviate.WeaviateAsyncClient
class to instantiate a client. This is the most flexible way to instantiate the client object.
import weaviate
from weaviate.connect import ConnectionParams
from weaviate.classes.init import AdditionalConfig, Timeout, Auth
import os
async_client = weaviate.WeaviateAsyncClient(
connection_params=ConnectionParams.from_params(
http_host="localhost",
http_port=8099,
http_secure=False,
grpc_host="localhost",
grpc_port=50052,
grpc_secure=False,
),
auth_client_secret=Auth.api_key("secr3tk3y"),
additional_headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
},
additional_config=AdditionalConfig(
timeout=Timeout(init=30, query=60, insert=120), # Values in seconds
),
skip_init_checks=False
)
When you instantiate a connection directly, you have to call the (now async) .connect()
method to connect to the server.
import weaviate
from weaviate.connect import ConnectionParams
from weaviate import WeaviateAsyncClient
import os
async def instantiate_and_connect() -> WeaviateAsyncClient:
client = weaviate.WeaviateAsyncClient(
connection_params=ConnectionParams.from_params(
http_host="localhost",
http_port=8099,
http_secure=False,
grpc_host="localhost",
grpc_port=50052,
grpc_secure=False,
),
# Additional settings not shown
)
await client.connect()
return client
Sync and async methods
The async client object is designed to be used in an async
function running in an asyncio
event loop.
Accordingly, a majority of the client methods are async
functions that return Coroutines
objects. However, some methods are synchronous and can be used in a synchronous context.
As a rule of thumb, a method that involves a request to Weaviate will be an async function, while a method that executes in a local context will be synchronous.
How to identify async methods
Async methods are identified by their method signatures. Async methods are defined with the async
keyword, and they return Coroutine
objects.
To see a method signature, you can use the help()
function in Python, or use an IDE that supports code completion such as Visual Studio Code or PyCharm.
Example async methods
Methods that involve sending requests to Weaviate will be async functions. For example, each of the following operations is an async function:
async_client.connect()
: Connect to a Weaviate serverasync_client.collections.create()
: Create a new collection<collection_object>.data.insert_many()
: Insert a list of objects into a collection
Example sync methods
Methods that execute in a local context are likely to be synchronous. For example, each of the following operations is a sync function:
async_client.collections.get("<COLLECTION_NAME>")
: Create a Python object to interact with an existing collection (this does not create a collection)async_client.is_connected()
: Check the last known connection status to the Weaviate server
Context manager
The async client can be used in an asynchronous context manager, in a pattern similar to:
async def context_manager_example() -> bool:
import weaviate
async with weaviate.use_async_with_local() as async_client:
# The async context manager automatically connects and disconnects
# Use the async client - for example, check if it's ready
readiness = await async_client.is_ready()
return readiness
loop = asyncio.new_event_loop()
try:
readiness = loop.run_until_complete(context_manager_example())
finally:
loop.close()
When using the async client in a context manager, you do not need to call .connect()
or .close()
explicitly. The client handles the connection and disconnection automatically.
Async usage examples
The async client object largely provides the same functionality as the synchronous Python client, with some key differences. First, the async client is designed to be used in an async
function running in an asyncio
event loop. Accordingly, many of the client methods are async
functions that return Coroutine
objects.
To execute an async client method, you must await
it in another async
function. To execute an async
function in a Python script, you can use asyncio.run(my_async_function)
or the event loop directly:
loop = asyncio.new_event_loop()
loop.run_until_complete(my_async_function())
Data insertion
In this example, we create a new collection and insert a list of objects into the collection using the async client.
Note the use of a context manager in the async function. The context manager is used to ensure that the client is connected to the server during the data insertion operation.
import weaviate
from weaviate.collections.classes.batch import BatchObjectReturn
import asyncio
import os
cohere_api_key = os.getenv("COHERE_API_KEY") # Best practice: store your API keys in environment variables
async_client = weaviate.use_async_with_local(
headers={
"X-Cohere-Api-Key": cohere_api_key # Replace with your Cohere API key
}
)
async def async_insert(async_client) -> BatchObjectReturn:
from weaviate.classes.config import Configure, Property, DataType
# This example uses an async context manager
# The client will automatically connect and disconnect as it enters and exits the context manager
async with async_client:
collection = await async_client.collections.create(
name="Movie",
vectorizer_config=[
Configure.NamedVectors.text2vec_cohere(
"overview_vector",
source_properties=["overview"]
)
],
generative_config=Configure.Generative.cohere(),
properties=[
Property(name="title", data_type=DataType.TEXT),
Property(name="overview", data_type=DataType.TEXT),
],
)
# Build objects to insert
response = await collection.data.insert_many(objects)
return response
loop = asyncio.new_event_loop()
try:
response = loop.run_until_complete(async_insert(async_client))
finally:
loop.close()
Search & RAG
In this example, we perform retrieval augmented generation (RAG) with hybrid search results using the async client.
Note the use of a context manager in the async function. The context manager is used to ensure that the client is connected to the server during the data insertion operation.
import weaviate
from weaviate.collections.classes.internal import GenerativeSearchReturnType
import asyncio
import os
cohere_api_key = os.getenv("COHERE_API_KEY") # Best practice: store your API keys in environment variables
async_client = weaviate.use_async_with_local(
headers={
"X-Cohere-Api-Key": cohere_api_key # Replace with your Cohere API key
}
)
async def async_query(async_client) -> GenerativeSearchReturnType:
async with async_client:
# Note `collections.get()` is not an async method
collection = async_client.collections.get(name="Movie")
response = await collection.generate.hybrid(
"romantic comedy set in Europe",
target_vector="overview_vector",
grouped_task="Write an ad, selling a bundle of these movies together",
limit=3,
)
return response
loop = asyncio.new_event_loop()
try:
response = loop.run_until_complete(async_query(async_client))
finally:
loop.close()
print(response.generated)
for o in response.objects:
print(o.properties["title"])
Bulk data insertion
For server-side batch operations, we recommend using the synchronous client and its batch
operations. The batch operations are designed to handle large amounts of data efficiently through concurrent requests already.
The async client still offers insert
and insert_many
methods for data insertion, which can be used in an async context.
Application-level example
A common use case for the async client is in web applications, where multiple requests are handled concurrently. Here is an indicative, minimal example integrating the async client with FastAPI, a popular web framework for creating modular web API microservices:
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
import os
import weaviate
from weaviate.collections.classes.internal import GenerativeSearchReturnType
cohere_api_key = os.getenv("COHERE_API_KEY") # Best practice: store your API keys in environment variables
# Initialize the Weaviate client
async_client = weaviate.use_async_with_local(
headers={
"X-Cohere-Api-Key": cohere_api_key # Replace with your Cohere API key
}
)
@asynccontextmanager
async def lifespan(app: FastAPI): # See https://fastapi.tiangolo.com/advanced/events/#lifespan-function
# Connect the client to Weaviate
await async_client.connect()
yield
# Close the connection to Weaviate
await async_client.close()
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
collection = async_client.collections.get("Movie")
obj_count = await collection.aggregate.over_all(total_count=True)
return {"object_count": obj_count.total_count}
@app.get("/search")
async def search(query: str) -> dict:
if not await async_client.is_ready():
raise HTTPException(status_code=503, detail="Weaviate is not ready")
collection = async_client.collections.get("Movie")
print(query)
response: GenerativeSearchReturnType = await collection.generate.hybrid(
query=query,
target_vector="overview_vector",
single_prompt=f"""
The user searched for query: {query}.
Based on the following overview, simply recommend or not whether the
user might want to watch the movie. Do not offer any follow-ups or additional info.
MOVIE TITLE: {{title}}. OVERVIEW: {{overview}}
""",
limit=3,
)
return {
"responses": [
{
"title": object.properties["title"],
"overview": object.properties["overview"],
"recommendation": object.generated,
}
for object in response.objects
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
If you run this example, you will see the FastAPI server running on http://localhost:8000
. You can interact with the server using the /
and /search
endpoints.
Note that this example is minimal and does not include collection creation or object insertion. It assumes that the collection Movie
already exists.
Questions and feedback
If you have any questions or feedback, let us know in the user forum.