Batch import
Batch imports are an efficient way to add multiple data objects and cross-references.
Additional information
To create a bulk import job, follow these steps:
- Initialize a batch object.
- Add items to the batch object.
- Ensure that the last batch is sent (flushed).
Basic import
The following example adds objects to the YourName
collection.
- Python (v4)
- Python (v3)
- JS/TS (Beta)
- JS/TS
- Java
- Go
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5)
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name,
# tenant="tenantA" # If multi-tenancy is enabled, specify the tenant to which the object will be added.
)
const collection = 'myCollectionName'
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({ title: `Object ${i}`})
}
const myCollection = client.collections.get(collection)
const response = await myCollection.data.insertMany(dataObject);
console.log(response);
let className = 'YourName'; // Replace with your class name
let dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcher5 = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcher5 = batcher5.withObject({
class: className,
properties: dataObj,
// tenant: 'tenantA' // If multi-tenancy is enabled, specify the tenant to which the object will be added.
});
// Flush
await batcher5.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
// .tenant("tenantA") // If multi-tenancy is enabled, specify the tenant to which the object will be added.
.build()
);
}
// Flush
batcher.run();
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
// Tenant: "tenantA", // If multi-tenancy is enabled, specify the tenant to which the object will be added.
})
}
// Flush
batcher.Do(ctx)
Specify an ID value
Weaviate generates an UUID for each object. Object IDs must be unique. If you set object IDs, use one of these deterministic UUID methods to prevent duplicate IDs:
generate_uuid5
(Python)generateUuid5
(TypeScript)
- Python (v4)
- Python (v3)
- JS/TS (Beta)
- JS/TS
- Java
- Go
from weaviate.util import generate_uuid5 # Generate a deterministic ID
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for data_row in data_rows:
obj_uuid = generate_uuid5(data_row)
batch.add_object(
properties=data_row,
uuid=obj_uuid
)
from weaviate.util import generate_uuid5 # Generate a deterministic ID
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name,
uuid=generate_uuid5(data_obj) # Optional: Specify an object ID
)
import { generateUuid5 } from 'weaviate-client'; // requires v1.3.2+
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
id: generateUuid5(questions.name, `Object ${i}`),
properties: {
title: `Object ${i}`
}
})
}
await questions.data.insertMany(dataObject)
import { generateUuid5 } from 'weaviate-ts-client'; // requires v1.3.2+
className = 'YourName'; // Replace with your class name
dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcherId = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcherId = batcherId.withObject({
class: className,
properties: dataObj,
id: generateUuid5(dataObj.title),
});
// Flush
await batcherId.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.id(UUID.nameUUIDFromBytes(((String) properties.get("title")).getBytes()).toString())
.build()
);
}
// Flush
batcher.run();
generateUUID := func(input string) strfmt.UUID {
input = strings.ToLower(input)
hash := md5.Sum([]byte(input))
uuid := fmt.Sprintf("%x-%x-%x-%x-%x", hash[0:4], hash[4:6], hash[6:8], hash[8:10], hash[10:])
return strfmt.UUID(uuid)
}
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
ID: generateUUID((dataObj.(map[string]interface{}))["title"].(string)),
})
}
// Flush
batcher.Do(ctx)
Specify a vector
Use the vector
property to specify a vector for each object.
- Python (v4)
- Python (v3)
- JS/TS (Beta)
- JS/TS
- Java
- Go
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
vectors = [[0.1] * 1536 for i in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector=vectors[i]
)
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
vectors = [
[0.25 + i/100] * 10 for i in range(5) # Replace with your actual vectors
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for i, data_obj in enumerate(data_objs):
batch.add_data_object(
data_obj,
class_name,
vector=vectors[i] # Optional: Specify an object vector
)
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
properties: {
title: `Object ${i}`
},
vectors: Array(100).fill(0.25136 + i / 100)
})
}
await questions.data.insertMany(dataObject)
className = 'YourName'; // Replace with your class name
dataObjs = [];
const vectors = [];
for (let i = 1; i <= 5; i++) {
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
}
let batcherVectors = client.batch.objectsBatcher();
for (let i = 0; i < 5; i++)
batcherVectors = batcherVectors.withObject({
class: className,
properties: dataObjs[i],
vector: vectors[i],
});
// Flush
await batcherVectors.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
List<Float[]> vectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Float[] vector = new Float[10];
Arrays.fill(vector, 0.25f + i / 100f);
vectors.add(vector); // Replace with your actual vectors
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (int i = 0; i < 5; i++) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(dataObjs.get(i))
.vector(vectors.get(i))
.build()
);
}
// Flush
batcher.run();
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
vectors := [][]float32{}
for i := 0; i < 5; i++ {
vector := make([]float32, 10)
for j := range vector {
vector[j] = 0.25 + float32(j/100) // Replace with your actual vectors
}
vectors = append(vectors, vector)
}
batcher := client.Batch().ObjectsBatcher()
for i, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
Vector: vectors[i],
})
}
// Flush
batcher.Do(ctx)
Specify named vectors
v1.24
When you create an object, you can specify named vectors (if configured in your collection).
- Python (v4)
- Python (v3)
- JS/TS (Beta)
- JS/TS
data_rows = [{
"title": f"Object {i+1}",
"body": f"Body {i+1}"
} for i in range(5)]
title_vectors = [[0.12] * 1536 for _ in range(5)]
body_vectors = [[0.34] * 1536 for _ in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector={
"title": title_vectors[i],
"body": body_vectors[i],
}
)
# Unfortunately, named vectors are not suppored in the v3 API / Python client.
# Please upgrade to the v4 API / Python client to use named vectors.
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
properties: {
title: `Object ${i}`
},
vectors: {
title: Array(100).fill(0.25136 + i / 100),
body: Array(100).fill(0.89137 + i / 100)
}
})
}
await questions.data.insertMany(dataObject)
className = 'YourCollection'; // Replace with your class name
dataObjs = [];
const title_vectors = [];
const body_vectors = [];
for (let i = 1; i <= 5; i++) {
dataObjs.push({ title: `Object ${i}`, body: `Body ${i}` }); // Replace with your actual objects
title_vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
body_vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
}
let namedVectors = client.batch.objectsBatcher();
for (let i = 0; i < 5; i++)
namedVectors = namedVectors.withObject({
class: className,
properties: dataObjs[i],
vectors: {
title: title_vectors[i],
body: body_vectors[i]
},
});
// Flush
await namedVectors.do();
Import with references
You can batch create links from an object to another other object through cross-references.
- Python (v4)
- Python (v3)
- JS/TS
collection = client.collections.get("Author")
with collection.batch.fixed_size(batch_size=100) as batch:
batch.add_reference(
from_property="writesFor",
from_uuid=from_uuid,
to=target_uuid,
)
with client.batch as batch:
batch.add_reference(
from_object_uuid="36ddd591-2dee-4e7e-a3cc-eb86d30a4303",
from_object_class_name="Author",
from_property_name="wroteArticles",
to_object_uuid="6bb06a43-e7f0-393e-9ecf-3c0f4e129064",
to_object_class_name="Article",
# tenant="tenantA", # Optional; specify the tenant in multi-tenancy collections
)
const response = await client.batch
.referencesBatcher()
.withReference(
client.batch
.referencePayloadBuilder()
.withFromClassName('Author')
.withFromRefProp('wroteArticles')
.withFromId('36ddd591-2dee-4e7e-a3cc-eb86d30a4303')
.withToClassName('Article') // prior to v1.14 omit .withToClassName()
.withToId('6bb06a43-e7f0-393e-9ecf-3c0f4e129064')
.payload()
)
// You can add multiple references
// .withReference(
// client.batch
// .referencePayloadBuilder()
// .withFromClassName('Author')
// .withFromRefProp('wroteArticles')
// .withFromId('36ddd591-2dee-4e7e-a3cc-eb86d30a4303')
// .withToClassName('Article') // prior to v1.14 omit .withToClassName()
// .withToId('b72912b9-e5d7-304e-a654-66dc63c55b32')
// .payload()
// )
.withConsistencyLevel('ALL') // default QUORUM
// .withTenant('tenantA') // Optional; specify the tenant in multi-tenancy collections
.do();
console.log(JSON.stringify(response, null, 2));
Python-specific batching
The Python clients have built-in batching methods to help you optimize your import speed. Please see the relevant documentation for more information.
Stream data from large files
If your dataset is large, consider streaming the import to avoid out-of-memory issues.
- Python (v4) - JSON
- Python (v4) - CSV
- Python (v3) - JSON
- Python (v3) - CSV
- TypeScript - JSON
- TypeScript - CSV
import ijson
# Settings for displaying the import progress
counter = 0
interval = 100 # print progress every this many records; should be bigger than the batch_size
print("JSON streaming, to avoid running out of memory on large files...")
with client.batch.fixed_size(batch_size=200) as batch:
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for obj in objects:
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import pandas as pd
# Settings for displaying the import progress
counter = 0
interval = 100 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
with client.batch.fixed_size(batch_size=200) as batch:
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with client.batch.fixed_size(batch_size=200) as batch:
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate
import ijson
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
# Add the object to the batch
batch.add_data_object(
data_object=properties,
class_name="JeopardyQuestion",
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("JSON streaming, to avoid running out of memory on large files...")
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for o in objects:
add_object(o)
print(f"Finished importing {counter} articles.")
import weaviate
import pandas as pd
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
# Add the object to the batch
batch.add_data_object(
data_object=properties,
class_name="JeopardyQuestion",
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
add_object(row)
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import parser from 'stream-json';
import StreamArray from 'stream-json/streamers/StreamArray';
import Chain from 'stream-chain';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importJson(filePath) {
const pipeline = new Chain([
fs.createReadStream(filePath),
parser(),
new StreamArray(),
]);
for await (const { value } of pipeline) {
await addObject(value);
}
}
await importJson('jeopardy_1k.json');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
import weaviate from 'weaviate-client';
import fs from 'fs';
import csv from 'csv-parser';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importCSV(filePath) {
const stream = fs.createReadStream(filePath).pipe(csv());
for await (const row of stream) {
await addObject(row);
}
}
await importCSV('jeopardy_1k.csv');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
Additional considerations
v1.23
.To maximize import speed, enable asynchronous indexing and use gRPC batch imports.
Asynchronous indexing is an experimental feature in 1.22
, and may not be suitable for production use. The Python client v4
uses gRPC. If you cannot use the new client, access the gRPC API directly.
To enable asynchronous indexing, set the ASYNC_INDEXING
environment variable to true
in your Weaviate configuration file.
weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.24.10
...
environment:
ASYNC_INDEXING: 'true'
...
Related pages
Questions and feedback
If you have any questions or feedback, please let us know on our forum. For example, you can: