(Batch) Import items
Overviewโ
This page shows how to efficiently add data objects and cross-references to Weaviate. We will collectively refer to these as "items".
We suggest you use batch imports unless you have a specific reason not to. A batch import drastically improves import speed by processing multiple items per request, and clients can parallelize requests.
Requirementsโ
To import items in batches using a Weaviate client,
- Initialize a batcher,
- For each source data row:
- Build an item (object or cross-reference) for Weaviate,
- Specify the class to add to, and
- Add it to the batcher.
- Ensure that the batches are flushed (some clients require manual flushing).
Depending on the client, additional settings or auto-flushing of batches may be available.
Basic batch import exampleโ
The following example will add objects to YourClassName
class using a batch import.
- Python
- TypeScript
class_name = "YourClassName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5)
]
with client.batch() as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name
)
let className = 'YourClassName'; // Replace with your class name
let dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcher5 = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcher5 = batcher5.withObject({
class: className,
properties: dataObj,
});
// Flush
await batcher5.do();
In this example, if the class YourClassName
does not already exist, it will be created by Weaviate with its auto-schema feature.
Optional object parametersโ
The following apply to objects only (not to cross-references).
id
โ
You can optionally specify an ID in UUID format. Otherwise, Weaviate will generate a random UUID.
- Python
- TypeScript
from weaviate.util import generate_uuid5
class_name = "YourClassName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
with client.batch() as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name,
uuid=generate_uuid5(data_obj) # Optional: Specify an object ID
)
import { generateUuid5 } from 'weaviate-ts-client'; // requires v1.3.2+
className = 'YourClassName'; // Replace with your class name
dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcherId = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcherId = batcherId.withObject({
class: className,
properties: dataObj,
id: generateUuid5(dataObj.title),
});
// Flush
await batcherId.do();
vector
โ
You can optionally specify a vector to represent each object. Otherwise, Weaviate will follow the relevant vectorizer setting.
- Python
- TypeScript
class_name = "YourClassName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
vectors = [
[0.25 + i/100] * 10 for i in range(5) # Replace with your actual vectors
]
with client.batch() as batch:
for i, data_obj in enumerate(data_objs):
batch.add_data_object(
data_obj,
class_name,
vector=vectors[i] # Optional: Specify an object vector
)
className = 'YourClassName'; // Replace with your class name
dataObjs = [];
const vectors = [];
for (let i = 1; i <= 5; i++) {
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
}
let batcherVectors = client.batch.objectsBatcher();
for (let i = 0; i < 5; i++)
batcherVectors = batcherVectors.withObject({
class: className,
properties: dataObjs[i],
vector: vectors[i],
});
// Flush
await batcherVectors.do();
Batch parameters - Python clientโ
Performance parametersโ
The following parameters will have the greatest impact on the batch import speed:
batch_size
(int) - the number of items to batch before flushingnum_workers
(int) - the number of parallel workers to use for the batchdynamic
(bool) - whether to dynamically adjust thebatch_size
based on the number of items in the batch
We recommend starting with batch sizes of 100-300 typically, and the number of workers at 1-4. The dynamic
parameter is useful if you are not sure what the optimal batch size is.
Unless batch_size
is set to None
, the batch will be flushed when the number of items in the batch reaches batch_size
.
Using these parameters, you can set dynamic or automatic batching:
- Automatic
- Dynamic
with client.batch(
batch_size=100, # Specify the batch size
num_workers=2, # Parallelize the process
# dynamic=False # By default
) as batch:
# Add objects to batch
with client.batch(
batch_size=100, # Specify the batch size
num_workers=2, # Parallelize the process
dynamic=True, # Weaviate will dynamically adjust the batch size
) as batch:
# Add objects to batch
Error handling parametersโ
The following parameters will most impact error handling:
timeout_retries
(int) &connection_error_retries
(int) - Batch-level numbers of retriesweaviate_error_retries
(int) - Object-level number of retries for an error originating from Weaviate (for example inference / transformer timeouts)callback
- Call a function at the end of batching - for example to inspect the response- The default is
weaviate.util.check_batch_result
- The default is
For a complete list of batch parameters and details about the types of batching, see the Python client batching section.
How to set batch parametersโ
We recommend setting batch parameters via the client.batch()
context manager, which will also automatically flush the batch when exiting.
The below example specifies a batch size of 200 and parallelizes the import process with 2 threads:
with client.batch(
batch_size=200, # Specify the batch size for auto batching
num_workers=2, # Maximum number of parallel threads used during import
) as batch:
Batch parameters - other clientsโ
At this point in time, the other clients do not support batch parameters, and manual flushing is required.
Managing potential duplicationโ
In a Weaviate collection, each object has a unique ID, in UUID format. If not supplied at object creation time, Weaviate will generate a UUID.
Weaviate does not check if you are uploading items with the same properties as ones that exist already. So, to avoid duplication, we recommend using a deterministic ID generation method, such as the generate_uuid5
function in the Python client, or the generateUuid5
function in the TypeScript client.
Classes generally act like namespaces, so it is possible to have duplicate IDs across classes.
Tip: Stream data from large filesโ
A good practice for importing large datasets is to stream the input file rather than risking running out of memory by loading it all at once. For Python, this can be achieved with libraries like ijson for JSON files and pandas for CSV files. For Node, a streams-based solution is presented below.
- Python - JSON
- Python - CSV
- TypeScript - JSON
- TypeScript - CSV
import weaviate
import ijson
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
'question': obj['Question'],
'answer': obj['Answer'],
}
# Add the object to the batch
client.batch.add_data_object(
data_object=properties,
class_name='JeopardyQuestion',
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f'Imported {counter} articles...')
# Configure the batch import
client.batch.configure(
batch_size=10,
)
print('JSON streaming, to avoid running out of memory on large files...')
with open('jeopardy_1k.json', 'rb') as f:
objects = ijson.items(f, 'item')
for o in objects:
add_object(o)
# Flush any remaining objects in the batch
client.batch.flush()
print(f'Finished importing {counter} articles.')
import weaviate
import pandas as pd
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
'question': obj['Question'],
'answer': obj['Answer'],
}
# Add the object to the batch
client.batch.add_data_object(
data_object=properties,
class_name='JeopardyQuestion',
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f'Imported {counter} articles...')
# Configure the batch import
client.batch.configure(
batch_size=10,
)
print('pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...')
with pd.read_csv(
'jeopardy_1k.csv',
usecols=['Question', 'Answer', 'Category'],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
add_object(row)
# Flush any remaining objects in the batch
client.batch.flush()
print(f'Finished importing {counter} articles.')
import weaviate from 'weaviate-ts-client';
import fs from 'fs';
import parser from 'stream-json';
import StreamArray from 'stream-json/streamers/StreamArray';
import Chain from 'stream-chain';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
console.log(`Imported ${counter} articles...`);
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
}
}
async function importJson(filePath) {
const pipeline = new Chain([
fs.createReadStream(filePath),
parser(),
new StreamArray(),
]);
for await (const { value } of pipeline) {
await addObject(value);
}
}
await importJson('jeopardy_1k.json');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
import weaviate from 'weaviate-ts-client';
import fs from 'fs';
import csv from 'csv-parser';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
console.log(`Imported ${counter} articles...`);
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
}
}
async function importCSV(filePath) {
const stream = fs.createReadStream(filePath).pipe(csv());
for await (const row of stream) {
await addObject(row);
}
}
await importCSV('jeopardy_1k.csv');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
More Resourcesโ
If you can't find the answer to your question here, please look at the:
- Frequently Asked Questions. Or,
- Knowledge base of old issues. Or,
- For questions: Stackoverflow. Or,
- For more involved discussion: Weaviate Community Forum. Or,
- We also have a Slack channel.