Migrate data
Follow these examples to migrate data manually when using a backup is not possible. They cover all permutations between:
- a single-tenancy collection (Collection), and
- a tenant in a multi-tenancy collection (Tenant).
Additional information
The examples use two different Weaviate instances, exposed through different ports. The same process can be used for two different instances as well.
Cross-references in Weaviate are properties. As such, you can retrieve cross-reference as a part of the object.
What about cross-references?
These scripts should migrate cross-references as well.
Cross-references are properties. As such, these cursor-based exports will include them. During restoration, restore the cross-referenced (i.e. "to") object first, then the object that contains the cross-reference (i.e. "from" object).
Collection → Collection
Step 1: Create the target collection(s)
Create a collection (e.g. WineReview
) at the target instance, matching the collection (e.g. WineReview
) at the source instance.
- Python Client v4
- Python Client v3
- JS/TS Client v3
import weaviate
import weaviate.classes as wvc
from weaviate.collections import Collection
from weaviate.client import WeaviateClient
client_src = weaviate.connect_to_local(
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
client_tgt = weaviate.connect_to_local(
port=8090,
grpc_port=50061,
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
def create_collection(client_in: WeaviateClient, collection_name: str, enable_mt=False):
reviews = client_in.collections.create(
name=collection_name,
multi_tenancy_config=wvc.config.Configure.multi_tenancy(enabled=enable_mt),
# Additional settings not shown
)
return reviews
reviews_tgt = create_collection(client_tgt, "WineReview", enable_mt=False)
from weaviate import Client
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_client.schema.create(
{
"classes": [
{
"class": "WineReview",
"multiTenancyConfig": {"enabled": False}, # This is also the default
# Additional settings not shown
}
]
}
)
import weaviate, { Collection, WeaviateClient } from 'weaviate-client'
let client_src: WeaviateClient,client_tgt: WeaviateClient;
let reviews_mt_tgt, reviews_mt_src;
let reviews_tgt, reviews_src;
client_src = await weaviate.connectToLocal({
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
client_tgt = await weaviate.connectToLocal({
port: 8090,
grpcPort: 50061,
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
async function createCollection(clientIn: WeaviateClient, collectionName: string, enableMt: boolean) {
let reviews = clientIn.collections.create({
name: collectionName,
multiTenancy: {
enabled: enableMt
},
// Additional settings not shown
})
return reviews
}
reviews_tgt = await createCollection(client_tgt, "WineReview", true)
Step 2: Migrate the data
Migrate:
- The
source collection
data in theclient_src
instance - to
target collection
in theclient_tgt
instance
- Python Client v4
- Python Client v3
- JS/TS Client v3
def migrate_data(collection_src: Collection, collection_tgt: Collection):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
reviews_src = client_src.collections.get("WineReview")
reviews_tgt = client_tgt.collections.get("WineReview")
migrate_data(reviews_src, reviews_tgt)
client_src.close()
client_tgt.close()
The migrate_data_from_weaviate_to_weaviate
function is called to migrate the data.
from typing import List, Optional
from tqdm import tqdm
from weaviate import Client
def migrate_data_from_weaviate_to_weaviate(
client_src: Client,
client_tgt: Client,
from_class_name: str,
to_class_name: str,
from_tenant: Optional[str] = None,
to_tenant: Optional[str] = None,
limit: int = 500,
batch_size: int = 50,
after_uuid: Optional[str] = None,
count: int = 0,
) -> None:
"""
Migrate Weaviate data from a Source Weaviate to a Target Weaviate. This function
allows to migrate data in 4 different configs:
1. Class -> Class
2. Class -> Tenant
3. Tenant -> Class
4. Tenant -> Tenant
Note that this is mean to migrate data that has no cross-references properties, if
you have cross-references for the class to migrate some changes might be needed for
this script.
Parameters
----------
client_src: Client
The Source Weaviate Client object instance from which to query the data
(including the UUID and the underlying vector, if one is present.)
client_tgt: Client
The Target Weaviate Client object instance to which to ingest the data.
NOTE: The batch config is going to be overridden in this function. If you want
to keep your previous config of the batch, you can remove the `batch.configure`
call in this function.
from_class_name: str
The Source Weaviate class that should be migrated.
to_class_name: str
The Target Weaviate class that should host the Source Weaviate data.
from_tenant: Optional[str] = None
The Source Weaviate class tenant that that should be migrated. If it is None,
then it means that the Source class has no Multi-Tenancy enabled and the whole
class needs to be migrated.
By default None
to_tenant: Optional[str] = None
The Target Weaviate class tenant that should host the migrated data.mIf it is
None then it means that Target Weaviate has no Multi-Tenancy enabled and the
data from the Source Weaviate will be in non-Multi-Tenancy class.
By default None
limit: int = 500
The limit used for quering data from Source Weaviate.
NOTE: Do not set to high value to avoid long requests.
batch_size: int = 50
The batch size configured for the Target Weaviate.
NOTE: Do not set to high value to avoid long requests.
after_uuid: Optional[str] = None
The after UUID to be used in cursor API. It is meant to be used in case the script
faild in the middle of the process of migration. Leave it to None on first run.
By default None
count: int = 0
The number of objects that were already ingested in the Target Weaviate. It is
meant to be used in case the script faild in the middle of the process of migration,
and is used ONLY for the progress bar. Can be ignored.
"""
# get source class properties
properties = [
prop["name"] for prop in client_src.schema.get(from_class_name)["properties"]
]
# get number of items in the class/tenant
obj_count_query = client_src.query.aggregate(
class_name=from_class_name
).with_meta_count()
if from_tenant is not None:
obj_count_query = obj_count_query.with_tenant(from_tenant)
resp = obj_count_query.do()
num_objects = resp["data"]["Aggregate"][from_class_name][0]["meta"][
"count"
]
try:
# configure Target Weaviate Batch
client_tgt.batch.configure(
batch_size=batch_size,
)
additional_item_config = {"tenant": to_tenant}
with client_tgt.batch as target_batch, tqdm(total=(num_objects - count)) as pbar:
# helper function to ingest data into Target Weaviate
def ingest_data_in_batches(objects: List[dict]) -> str:
"""
Ingest data into Target Weaviate using Batch API.
Parameters
----------
objects: List[dict]
A list of Waviate objects from the Source Weaviate, the list conatins
all objects of the current Source Weaviate page.
Cannot be empty list!!!
Returns
-------
str
The last UUID in the Page to be used with cursor API feature.
"""
for obj in objects:
weaviate_obj = obj.copy()
vector = weaviate_obj["_additional"]["vector"]
uuid = weaviate_obj["_additional"]["id"]
del weaviate_obj["_additional"]
if len(vector) == 0:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
**additional_item_config,
)
else:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
vector=vector,
**additional_item_config,
)
return uuid
# migrate data
while True:
query = (
client_src.query.get(
class_name=from_class_name, properties=properties
)
.with_additional(["vector", "id"])
.with_limit(limit)
)
if after_uuid:
query = query.with_after(after_uuid)
if from_tenant:
query = query.with_tenant(from_tenant)
source_data = query.do()
if "errors" in source_data:
raise Exception(
f"Failed to get data after object UUID '{after_uuid}' for class '{from_class_name}'",
f" from '{from_tenant}'!\n" if from_tenant else "\n",
source_data["errors"],
)
page_object = source_data["data"]["Get"][from_class_name]
if len(page_object) == 0:
break
after_uuid = ingest_data_in_batches(objects=page_object)
pbar.update(limit)
except:
print(
f"Something went wrong. The last after_uuid was: '{after_uuid}' for Source Weaviate "
f"class {from_class_name}"
f" from tenant {from_tenant}! "
if from_tenant
else ". "
f"The Target Weaviate class was {to_class_name}"
f" with tenant {to_tenant}!\n"
if to_tenant
else "!\n"
)
raise
finally:
# The migration function uses the batch API in a context manager and when it exits
# the context manager it also shuts down the BatchExecutor, so we can re-start it here.
# It gets automatically started when entering a new context manager but prints a warning.
# It is started in 'finally' in case there is a re-try mechanism on errors
client_tgt.batch.start()
SOURCE_WEAVIATE_URL = "http://localhost:8080" # Your source endpoint
TARGET_WEAVIATE_URL = "http://localhost:8099" # Your target endpoint
source_client = Client(url=SOURCE_WEAVIATE_URL)
target_client = Client(url=TARGET_WEAVIATE_URL)
# Migrate the data with the `migrate_data_from_weaviate_to_weaviate` function defined above
source_class = "WineReview"
target_class = "WineReview"
print(f"Start migration for class '{source_class}'")
migrate_data_from_weaviate_to_weaviate(
client_src=source_client,
client_tgt=target_client,
from_class_name=source_class,
to_class_name=target_class,
)
print(f"Class '{source_class}' migrated to '{target_class}' in '{TARGET_WEAVIATE_URL}'")
let reviews_tgt, reviews_src;
reviews_src = client_src.collections.get("WineReview")
reviews_tgt = client_tgt.collections.get("WineReview")
let maxItems = await reviews_src.length()
let counter: number
async function migrateData(collection_src: Collection, collection_tgt: Collection) {
let itemsToInsert = []
const promises = []
for await (const item of collection_src.iterator({ includeVector: true })) {
// Check if we've reached the maximum items
if (counter >= maxItems) {
console.log(`Reached maximum items limit of ${maxItems}`);
break;
}
counter++;
if (counter % 1000 == 0)
console.log(`Import: ${counter}`)
let objectToInsert = {
properties: item.properties,
vector: item.vectors.default,
uuid: item.uuid,
}
// Add object to batching array
itemsToInsert.push(objectToInsert)
if (itemsToInsert.length == 1000 || counter == maxItems) {
const promise = collection_tgt.data.insertMany(itemsToInsert)
.then((response) => {
console.log(`Successfully imported batch of ${Object.keys(response.uuids).length} items`);
if (response.hasErrors) {
throw new Error("Error in batch import!");
}
})
.catch((error) => {
console.error('Error importing batch:', error);
})
promises.push(promise)
itemsToInsert = [];
}
}
// Runs all promises
await Promise.all(promises)
}
migrateData(reviews_src, reviews_tgt)
client_src.close()
client_tgt.close()
Collection → Tenant
Step 1: Create the target collection(s)
Create a collection (e.g. WineReview
) at the target instance, matching the collection (e.g. WineReview
) at the source instance, and enable multi-tenancy for the target collection.
- Python Client v4
- Python Client v3
- JS/TS Client v3
import weaviate
import weaviate.classes as wvc
from weaviate.collections import Collection
from weaviate.client import WeaviateClient
client_src = weaviate.connect_to_local(
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
client_tgt = weaviate.connect_to_local(
port=8090,
grpc_port=50061,
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
def create_collection(client_in: WeaviateClient, collection_name: str, enable_mt=False):
reviews = client_in.collections.create(
name=collection_name,
multi_tenancy_config=wvc.config.Configure.multi_tenancy(enabled=enable_mt),
# Additional settings not shown
)
return reviews
reviews_mt_tgt = create_collection(client_tgt, "WineReviewMT", enable_mt=True)
from weaviate import Client
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_client.schema.create(
{
"classes": [
{
"class": "WineReviewMT",
"multiTenancyConfig": {"enabled": True}, # Set this to enable multi-tenancy
# Additional settings not shown
}
]
}
)
import weaviate, { Collection, WeaviateClient } from 'weaviate-client'
let client_src: WeaviateClient,client_tgt: WeaviateClient;
let reviews_mt_tgt, reviews_mt_src;
client_src = await weaviate.connectToLocal({
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
client_tgt = await weaviate.connectToLocal({
port: 8090,
grpcPort: 50061,
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
async function createCollection(clientIn: WeaviateClient, collectionName: string, enableMt: boolean) {
let reviews = clientIn.collections.create({
name: collectionName,
multiTenancy: {
enabled: enableMt
},
// Additional settings not shown
})
return reviews
}
reviews_mt_tgt = createCollection(client_tgt, "WineReviewMT", true)
Step 2: Create the tenant(s)
Add tenants at the target instance before adding data objects.
- Python Client v4
- Python Client v3
- JS/TS Client v3
tenants_tgt = [wvc.tenants.Tenant(name="tenantA"), wvc.tenants.Tenant(name="tenantB")]
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_mt_tgt.tenants.create(tenants_tgt)
from weaviate import Client, Tenant
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_tenants = [Tenant("TenantA"), Tenant("TenantB")] # Tenants to add to the target
target_client.schema.add_class_tenants("WineReviewMT", target_tenants)
let tenantsTgt = [
{ name: 'tenantA'},
{ name: 'tenantB'}
]
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_mt_tgt.tenants.create(tenantsTgt)
Step 3: Migrate the data
Migrate:
- The
source collection
data in theclient_src
instance - to
target tenant
data fromtarget collection
in theclient_tgt
instance
- Python Client v4
- Python Client v3
- JS/TS Client v3
def migrate_data(collection_src: Collection, collection_tgt: Collection):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
reviews_src = client_src.collections.get("WineReview")
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_tgt_tenant_a = reviews_mt_tgt.with_tenant(tenants_tgt[0].name)
migrate_data(reviews_src, reviews_tgt_tenant_a)
client_src.close()
client_tgt.close()
The migrate_data_from_weaviate_to_weaviate
function is called to migrate the data.
from weaviate import Client, Tenant
def migrate_data_from_weaviate_to_weaviate(
client_src: Client,
client_tgt: Client,
from_class_name: str,
to_class_name: str,
from_tenant: Optional[str] = None,
to_tenant: Optional[str] = None,
limit: int = 500,
batch_size: int = 50,
after_uuid: Optional[str] = None,
count: int = 0,
) -> None:
"""
Migrate Weaviate data from a Source Weaviate to a Target Weaviate. This function
allows to migrate data in 4 different configs:
1. Class -> Class
2. Class -> Tenant
3. Tenant -> Class
4. Tenant -> Tenant
Note that this is mean to migrate data that has no cross-references properties, if
you have cross-references for the class to migrate some changes might be needed for
this script.
Parameters
----------
client_src: Client
The Source Weaviate Client object instance from which to query the data
(including the UUID and the underlying vector, if one is present.)
client_tgt: Client
The Target Weaviate Client object instance to which to ingest the data.
NOTE: The batch config is going to be overridden in this function. If you want
to keep your previous config of the batch, you can remove the `batch.configure`
call in this function.
from_class_name: str
The Source Weaviate class that should be migrated.
to_class_name: str
The Target Weaviate class that should host the Source Weaviate data.
from_tenant: Optional[str] = None
The Source Weaviate class tenant that that should be migrated. If it is None,
then it means that the Source class has no Multi-Tenancy enabled and the whole
class needs to be migrated.
By default None
to_tenant: Optional[str] = None
The Target Weaviate class tenant that should host the migrated data.mIf it is
None then it means that Target Weaviate has no Multi-Tenancy enabled and the
data from the Source Weaviate will be in non-Multi-Tenancy class.
By default None
limit: int = 500
The limit used for quering data from Source Weaviate.
NOTE: Do not set to high value to avoid long requests.
batch_size: int = 50
The batch size configured for the Target Weaviate.
NOTE: Do not set to high value to avoid long requests.
after_uuid: Optional[str] = None
The after UUID to be used in cursor API. It is meant to be used in case the script
faild in the middle of the process of migration. Leave it to None on first run.
By default None
count: int = 0
The number of objects that were already ingested in the Target Weaviate. It is
meant to be used in case the script faild in the middle of the process of migration,
and is used ONLY for the progress bar. Can be ignored.
"""
# get source class properties
properties = [
prop["name"] for prop in client_src.schema.get(from_class_name)["properties"]
]
# get number of items in the class/tenant
obj_count_query = client_src.query.aggregate(
class_name=from_class_name
).with_meta_count()
if from_tenant is not None:
obj_count_query = obj_count_query.with_tenant(from_tenant)
resp = obj_count_query.do()
num_objects = resp["data"]["Aggregate"][from_class_name][0]["meta"][
"count"
]
try:
# configure Target Weaviate Batch
client_tgt.batch.configure(
batch_size=batch_size,
)
additional_item_config = {"tenant": to_tenant}
with client_tgt.batch as target_batch, tqdm(total=(num_objects - count)) as pbar:
# helper function to ingest data into Target Weaviate
def ingest_data_in_batches(objects: List[dict]) -> str:
"""
Ingest data into Target Weaviate using Batch API.
Parameters
----------
objects: List[dict]
A list of Waviate objects from the Source Weaviate, the list conatins
all objects of the current Source Weaviate page.
Cannot be empty list!!!
Returns
-------
str
The last UUID in the Page to be used with cursor API feature.
"""
for obj in objects:
weaviate_obj = obj.copy()
vector = weaviate_obj["_additional"]["vector"]
uuid = weaviate_obj["_additional"]["id"]
del weaviate_obj["_additional"]
if len(vector) == 0:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
**additional_item_config,
)
else:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
vector=vector,
**additional_item_config,
)
return uuid
# migrate data
while True:
query = (
client_src.query.get(
class_name=from_class_name, properties=properties
)
.with_additional(["vector", "id"])
.with_limit(limit)
)
if after_uuid:
query = query.with_after(after_uuid)
if from_tenant:
query = query.with_tenant(from_tenant)
source_data = query.do()
if "errors" in source_data:
raise Exception(
f"Failed to get data after object UUID '{after_uuid}' for class '{from_class_name}'",
f" from '{from_tenant}'!\n" if from_tenant else "\n",
source_data["errors"],
)
page_object = source_data["data"]["Get"][from_class_name]
if len(page_object) == 0:
break
after_uuid = ingest_data_in_batches(objects=page_object)
pbar.update(limit)
except:
print(
f"Something went wrong. The last after_uuid was: '{after_uuid}' for Source Weaviate "
f"class {from_class_name}"
f" from tenant {from_tenant}! "
if from_tenant
else ". "
f"The Target Weaviate class was {to_class_name}"
f" with tenant {to_tenant}!\n"
if to_tenant
else "!\n"
)
raise
finally:
# The migration function uses the batch API in a context manager and when it exits
# the context manager it also shuts down the BatchExecutor, so we can re-start it here.
# It gets automatically started when entering a new context manager but prints a warning.
# It is started in 'finally' in case there is a re-try mechanism on errors
client_tgt.batch.start()
from weaviate import Client, Tenant
SOURCE_WEAVIATE_URL = "http://localhost:8080" # Your source endpoint
TARGET_WEAVIATE_URL = "http://localhost:8099" # Your target endpoint
source_client = Client(url=SOURCE_WEAVIATE_URL)
target_client = Client(url=TARGET_WEAVIATE_URL)
# Migrate the data with the `migrate_data_from_weaviate_to_weaviate` function defined above
source_class = "WineReview"
target_class = "WineReviewMT"
target_tenant = target_tenants[0] # Pick a target tenant
print(f"Start migration for class '{source_class}'")
migrate_data_from_weaviate_to_weaviate(
client_src=source_client,
client_tgt=target_client,
from_class_name=source_class,
to_class_name=target_class,
to_tenant=target_tenant.name,
)
print(f"Class '{source_class}' migrated to '{target_class}' and tenant '{target_tenant.name}' in '{TARGET_WEAVIATE_URL}'")
let reviews_tgt, reviews_src;
reviews_src = client_src.collections.get("WineReview")
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
let maxItems = await reviews_src.length()
let counter: number
async function migrateData(collection_src: Collection, collection_tgt: Collection) {
let itemsToInsert = []
const promises = []
for await (const item of collection_src.iterator({ includeVector: true })) {
// Check if we've reached the maximum items
if (counter >= maxItems) {
console.log(`Reached maximum items limit of ${maxItems}`);
break;
}
counter++;
if (counter % 1000 == 0)
console.log(`Import: ${counter}`)
let objectToInsert = {
properties: item.properties,
vector: item.vectors.default,
uuid: item.uuid,
}
// Add object to batching array
itemsToInsert.push(objectToInsert)
if (itemsToInsert.length == 1000 || counter == maxItems) {
const promise = collection_tgt.data.insertMany(itemsToInsert)
.then((response) => {
console.log(`Successfully imported batch of ${Object.keys(response.uuids).length} items`);
if (response.hasErrors) {
throw new Error("Error in batch import!");
}
})
.catch((error) => {
console.error('Error importing batch:', error);
})
promises.push(promise)
itemsToInsert = [];
}
}
// Runs all promises
await Promise.all(promises)
}
let reviews_tgt_tenant_a = reviews_mt_tgt.withTenant(tenantsTgt[0].name)
migrateData(reviews_src, reviews_tgt_tenant_a)
client_src.close()
client_tgt.close()
Tenant → Collection
Step 1: Create the target collection(s)
Create a collection (e.g. WineReview
) at the target instance, matching the collection (e.g. WineReview
) at the source instance, and enable multi-tenancy for the target collection.
- Python Client v4
- Python Client v3
- JS/TS Client v3
import weaviate
import weaviate.classes as wvc
from weaviate.collections import Collection
from weaviate.client import WeaviateClient
client_src = weaviate.connect_to_local(
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
client_tgt = weaviate.connect_to_local(
port=8090,
grpc_port=50061,
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
def create_collection(client_in: WeaviateClient, collection_name: str, enable_mt=False):
reviews = client_in.collections.create(
name=collection_name,
multi_tenancy_config=wvc.config.Configure.multi_tenancy(enabled=enable_mt),
# Additional settings not shown
)
return reviews
reviews_tgt = create_collection(client_tgt, "WineReview", enable_mt=False)
from weaviate import Client
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_client.schema.create(
{
"classes": [
{
"class": "WineReview",
"multiTenancyConfig": {"enabled": False}, # This is also the default
# Additional settings not shown
}
]
}
)
import weaviate, { Collection, WeaviateClient } from 'weaviate-client'
let client_src: WeaviateClient,client_tgt: WeaviateClient;
let reviews_mt_tgt, reviews_mt_src;
client_src = await weaviate.connectToLocal({
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
client_tgt = await weaviate.connectToLocal({
port: 8090,
grpcPort: 50061,
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
async function createCollection(clientIn: WeaviateClient, collectionName: string, enableMt: boolean) {
let reviews = clientIn.collections.create({
name: collectionName,
multiTenancy: {
enabled: enableMt
},
// Additional settings not shown
})
return reviews
}
reviews_tgt = await createCollection(client_tgt, "WineReview", false)
Step 2: Migrate the data
Migrate:
- The
source tenant
data fromsource collection
in theclient_src
instance - to
target collection
in theclient_tgt
instance
- Python Client v4
- Python Client v3
- JS/TS Client v3
def migrate_data(collection_src: Collection, collection_tgt: Collection):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
reviews_src = client_src.collections.get("WineReviewMT")
reviews_src_tenant_a = reviews_src.with_tenant("tenantA")
reviews_tgt = client_tgt.collections.get("WineReview")
migrate_data(reviews_src_tenant_a, reviews_tgt)
client_src.close()
client_tgt.close()
The migrate_data_from_weaviate_to_weaviate
function is called to migrate the data.
from weaviate import Client, Tenant
def migrate_data_from_weaviate_to_weaviate(
client_src: Client,
client_tgt: Client,
from_class_name: str,
to_class_name: str,
from_tenant: Optional[str] = None,
to_tenant: Optional[str] = None,
limit: int = 500,
batch_size: int = 50,
after_uuid: Optional[str] = None,
count: int = 0,
) -> None:
"""
Migrate Weaviate data from a Source Weaviate to a Target Weaviate. This function
allows to migrate data in 4 different configs:
1. Class -> Class
2. Class -> Tenant
3. Tenant -> Class
4. Tenant -> Tenant
Note that this is mean to migrate data that has no cross-references properties, if
you have cross-references for the class to migrate some changes might be needed for
this script.
Parameters
----------
client_src: Client
The Source Weaviate Client object instance from which to query the data
(including the UUID and the underlying vector, if one is present.)
client_tgt: Client
The Target Weaviate Client object instance to which to ingest the data.
NOTE: The batch config is going to be overridden in this function. If you want
to keep your previous config of the batch, you can remove the `batch.configure`
call in this function.
from_class_name: str
The Source Weaviate class that should be migrated.
to_class_name: str
The Target Weaviate class that should host the Source Weaviate data.
from_tenant: Optional[str] = None
The Source Weaviate class tenant that that should be migrated. If it is None,
then it means that the Source class has no Multi-Tenancy enabled and the whole
class needs to be migrated.
By default None
to_tenant: Optional[str] = None
The Target Weaviate class tenant that should host the migrated data.mIf it is
None then it means that Target Weaviate has no Multi-Tenancy enabled and the
data from the Source Weaviate will be in non-Multi-Tenancy class.
By default None
limit: int = 500
The limit used for quering data from Source Weaviate.
NOTE: Do not set to high value to avoid long requests.
batch_size: int = 50
The batch size configured for the Target Weaviate.
NOTE: Do not set to high value to avoid long requests.
after_uuid: Optional[str] = None
The after UUID to be used in cursor API. It is meant to be used in case the script
faild in the middle of the process of migration. Leave it to None on first run.
By default None
count: int = 0
The number of objects that were already ingested in the Target Weaviate. It is
meant to be used in case the script faild in the middle of the process of migration,
and is used ONLY for the progress bar. Can be ignored.
"""
# get source class properties
properties = [
prop["name"] for prop in client_src.schema.get(from_class_name)["properties"]
]
# get number of items in the class/tenant
obj_count_query = client_src.query.aggregate(
class_name=from_class_name
).with_meta_count()
if from_tenant is not None:
obj_count_query = obj_count_query.with_tenant(from_tenant)
resp = obj_count_query.do()
num_objects = resp["data"]["Aggregate"][from_class_name][0]["meta"][
"count"
]
try:
# configure Target Weaviate Batch
client_tgt.batch.configure(
batch_size=batch_size,
)
additional_item_config = {"tenant": to_tenant}
with client_tgt.batch as target_batch, tqdm(total=(num_objects - count)) as pbar:
# helper function to ingest data into Target Weaviate
def ingest_data_in_batches(objects: List[dict]) -> str:
"""
Ingest data into Target Weaviate using Batch API.
Parameters
----------
objects: List[dict]
A list of Waviate objects from the Source Weaviate, the list conatins
all objects of the current Source Weaviate page.
Cannot be empty list!!!
Returns
-------
str
The last UUID in the Page to be used with cursor API feature.
"""
for obj in objects:
weaviate_obj = obj.copy()
vector = weaviate_obj["_additional"]["vector"]
uuid = weaviate_obj["_additional"]["id"]
del weaviate_obj["_additional"]
if len(vector) == 0:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
**additional_item_config,
)
else:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
vector=vector,
**additional_item_config,
)
return uuid
# migrate data
while True:
query = (
client_src.query.get(
class_name=from_class_name, properties=properties
)
.with_additional(["vector", "id"])
.with_limit(limit)
)
if after_uuid:
query = query.with_after(after_uuid)
if from_tenant:
query = query.with_tenant(from_tenant)
source_data = query.do()
if "errors" in source_data:
raise Exception(
f"Failed to get data after object UUID '{after_uuid}' for class '{from_class_name}'",
f" from '{from_tenant}'!\n" if from_tenant else "\n",
source_data["errors"],
)
page_object = source_data["data"]["Get"][from_class_name]
if len(page_object) == 0:
break
after_uuid = ingest_data_in_batches(objects=page_object)
pbar.update(limit)
except:
print(
f"Something went wrong. The last after_uuid was: '{after_uuid}' for Source Weaviate "
f"class {from_class_name}"
f" from tenant {from_tenant}! "
if from_tenant
else ". "
f"The Target Weaviate class was {to_class_name}"
f" with tenant {to_tenant}!\n"
if to_tenant
else "!\n"
)
raise
finally:
# The migration function uses the batch API in a context manager and when it exits
# the context manager it also shuts down the BatchExecutor, so we can re-start it here.
# It gets automatically started when entering a new context manager but prints a warning.
# It is started in 'finally' in case there is a re-try mechanism on errors
client_tgt.batch.start()
from weaviate import Client, Tenant
SOURCE_WEAVIATE_URL = "http://localhost:8080" # Your source endpoint
TARGET_WEAVIATE_URL = "http://localhost:8099" # Your target endpoint
source_client = Client(url=SOURCE_WEAVIATE_URL)
target_client = Client(url=TARGET_WEAVIATE_URL)
# Migrate the data with the `migrate_data_from_weaviate_to_weaviate` function defined above
source_class = "WineReviewMT"
source_tenant = source_tenants[0] # Pick a source tenant
target_class = "WineReview"
print(f"Start migration for class '{source_class}'")
migrate_data_from_weaviate_to_weaviate(
client_src=source_client,
client_tgt=target_client,
from_class_name=source_class,
from_tenant=source_tenant.name,
to_class_name=target_class,
)
print(f"Tenant '{source_tenant.name}' in class '{source_class}' migrated to '{target_class}' in '{TARGET_WEAVIATE_URL}'")
let reviews_tgt, reviews_src;
let maxItems = await reviews_src.length()
let counter: number
async function migrateData(collection_src: Collection, collection_tgt: Collection) {
let itemsToInsert = []
const promises = []
for await (const item of collection_src.iterator({ includeVector: true })) {
// Check if we've reached the maximum items
if (counter >= maxItems) {
console.log(`Reached maximum items limit of ${maxItems}`);
break;
}
counter++;
if (counter % 1000 == 0)
console.log(`Import: ${counter}`)
let objectToInsert = {
properties: item.properties,
vector: item.vectors.default,
uuid: item.uuid,
}
// Add object to batching array
itemsToInsert.push(objectToInsert)
if (itemsToInsert.length == 1000 || counter == maxItems) {
const promise = collection_tgt.data.insertMany(itemsToInsert)
.then((response) => {
console.log(`Successfully imported batch of ${Object.keys(response.uuids).length} items`);
if (response.hasErrors) {
throw new Error("Error in batch import!");
}
})
.catch((error) => {
console.error('Error importing batch:', error);
})
promises.push(promise)
itemsToInsert = [];
}
}
// Runs all promises
await Promise.all(promises)
}
let reviews_src_tenant_a;
reviews_src = client_src.collections.get("WineReviewMT")
reviews_src_tenant_a = reviews_src.withTenant("tenantA")
reviews_tgt = client_tgt.collections.get("WineReview")
migrateData(reviews_src_tenant_a, reviews_tgt)
client_src.close()
client_tgt.close()
Tenant → Tenant
Step 1: Create the target collection(s)
Create a collection (e.g. WineReview
) at the target instance, matching the collection (e.g. WineReview
) at the source instance including enabling multi-tenancy.
- Python Client v4
- Python Client v3
- JS/TS Client v3
import weaviate
import weaviate.classes as wvc
from weaviate.collections import Collection
from weaviate.client import WeaviateClient
client_src = weaviate.connect_to_local(
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
client_tgt = weaviate.connect_to_local(
port=8090,
grpc_port=50061,
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_APIKEY")
}
)
def create_collection(client_in: WeaviateClient, collection_name: str, enable_mt=False):
reviews = client_in.collections.create(
name=collection_name,
multi_tenancy_config=wvc.config.Configure.multi_tenancy(enabled=enable_mt),
# Additional settings not shown
)
return reviews
reviews_mt_tgt = create_collection(client_tgt, "WineReviewMT", enable_mt=True)
from weaviate import Client
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_client.schema.create(
{
"classes": [
{
"class": "WineReviewMT",
"multiTenancyConfig": {"enabled": True}, # Set this to enable multi-tenancy
# Additional settings not shown
}
]
}
)
import weaviate, { Collection, WeaviateClient } from 'weaviate-client'
let client_src: WeaviateClient,client_tgt: WeaviateClient;
let reviews_mt_tgt, reviews_mt_src;
client_src = await weaviate.connectToLocal({
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
client_tgt = await weaviate.connectToLocal({
port: 8090,
grpcPort: 50061,
headers: {
"X-Cohere-Api-Key": process.env.COHERE_API_KEY as string
}
})
async function createCollection(clientIn: WeaviateClient, collectionName: string, enableMt: boolean) {
let reviews = clientIn.collections.create({
name: collectionName,
multiTenancy: {
enabled: enableMt
},
// Additional settings not shown
})
return reviews
}
reviews_mt_tgt = createCollection(client_tgt, "WineReviewMT", true)
Step 2: Create the tenant(s)
Add tenants at the target instance before adding data objects.
- Python Client v4
- Python Client v3
- JS/TS Client v3
tenants_tgt = [wvc.tenants.Tenant(name="tenantA"), wvc.tenants.Tenant(name="tenantB")]
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_mt_tgt.tenants.create(tenants_tgt)
from weaviate import Client, Tenant
target_client = Client(url="http://localhost:8099") # Your target endpoint
target_tenants = [Tenant("TenantA"), Tenant("TenantB")] # Tenants to add to the target
target_client.schema.add_class_tenants("WineReviewMT", target_tenants)
let tenantsTgt = [
{ name: 'tenantA'},
{ name: 'tenantB'}
]
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_mt_tgt.tenants.create(tenantsTgt)
Step 3: Migrate the data
Migrate:
- The
source tenant
data fromsource collection
in theclient_src
instance - to
target tenant
data fromtarget collection
in theclient_tgt
instance
- Python Client v4
- Python Client v3
- JS/TS Client v3
def migrate_data(collection_src: Collection, collection_tgt: Collection):
with collection_tgt.batch.fixed_size(batch_size=100) as batch:
for q in tqdm(collection_src.iterator(include_vector=True)):
batch.add_object(
properties=q.properties,
vector=q.vector["default"],
uuid=q.uuid
)
return True
reviews_mt_src = client_src.collections.get("WineReviewMT")
reviews_src_tenant_a = reviews_mt_src.with_tenant("tenantA")
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_tgt_tenant_a = reviews_mt_tgt.with_tenant(tenants_tgt[0].name)
migrate_data(reviews_src_tenant_a, reviews_tgt_tenant_a)
client_src.close()
client_tgt.close()
The migrate_data_from_weaviate_to_weaviate
function is called to migrate the data.
from weaviate import Client, Tenant
def migrate_data_from_weaviate_to_weaviate(
client_src: Client,
client_tgt: Client,
from_class_name: str,
to_class_name: str,
from_tenant: Optional[str] = None,
to_tenant: Optional[str] = None,
limit: int = 500,
batch_size: int = 50,
after_uuid: Optional[str] = None,
count: int = 0,
) -> None:
"""
Migrate Weaviate data from a Source Weaviate to a Target Weaviate. This function
allows to migrate data in 4 different configs:
1. Class -> Class
2. Class -> Tenant
3. Tenant -> Class
4. Tenant -> Tenant
Note that this is mean to migrate data that has no cross-references properties, if
you have cross-references for the class to migrate some changes might be needed for
this script.
Parameters
----------
client_src: Client
The Source Weaviate Client object instance from which to query the data
(including the UUID and the underlying vector, if one is present.)
client_tgt: Client
The Target Weaviate Client object instance to which to ingest the data.
NOTE: The batch config is going to be overridden in this function. If you want
to keep your previous config of the batch, you can remove the `batch.configure`
call in this function.
from_class_name: str
The Source Weaviate class that should be migrated.
to_class_name: str
The Target Weaviate class that should host the Source Weaviate data.
from_tenant: Optional[str] = None
The Source Weaviate class tenant that that should be migrated. If it is None,
then it means that the Source class has no Multi-Tenancy enabled and the whole
class needs to be migrated.
By default None
to_tenant: Optional[str] = None
The Target Weaviate class tenant that should host the migrated data.mIf it is
None then it means that Target Weaviate has no Multi-Tenancy enabled and the
data from the Source Weaviate will be in non-Multi-Tenancy class.
By default None
limit: int = 500
The limit used for quering data from Source Weaviate.
NOTE: Do not set to high value to avoid long requests.
batch_size: int = 50
The batch size configured for the Target Weaviate.
NOTE: Do not set to high value to avoid long requests.
after_uuid: Optional[str] = None
The after UUID to be used in cursor API. It is meant to be used in case the script
faild in the middle of the process of migration. Leave it to None on first run.
By default None
count: int = 0
The number of objects that were already ingested in the Target Weaviate. It is
meant to be used in case the script faild in the middle of the process of migration,
and is used ONLY for the progress bar. Can be ignored.
"""
# get source class properties
properties = [
prop["name"] for prop in client_src.schema.get(from_class_name)["properties"]
]
# get number of items in the class/tenant
obj_count_query = client_src.query.aggregate(
class_name=from_class_name
).with_meta_count()
if from_tenant is not None:
obj_count_query = obj_count_query.with_tenant(from_tenant)
resp = obj_count_query.do()
num_objects = resp["data"]["Aggregate"][from_class_name][0]["meta"][
"count"
]
try:
# configure Target Weaviate Batch
client_tgt.batch.configure(
batch_size=batch_size,
)
additional_item_config = {"tenant": to_tenant}
with client_tgt.batch as target_batch, tqdm(total=(num_objects - count)) as pbar:
# helper function to ingest data into Target Weaviate
def ingest_data_in_batches(objects: List[dict]) -> str:
"""
Ingest data into Target Weaviate using Batch API.
Parameters
----------
objects: List[dict]
A list of Waviate objects from the Source Weaviate, the list conatins
all objects of the current Source Weaviate page.
Cannot be empty list!!!
Returns
-------
str
The last UUID in the Page to be used with cursor API feature.
"""
for obj in objects:
weaviate_obj = obj.copy()
vector = weaviate_obj["_additional"]["vector"]
uuid = weaviate_obj["_additional"]["id"]
del weaviate_obj["_additional"]
if len(vector) == 0:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
**additional_item_config,
)
else:
target_batch.add_data_object(
data_object=weaviate_obj,
class_name=to_class_name,
uuid=uuid,
vector=vector,
**additional_item_config,
)
return uuid
# migrate data
while True:
query = (
client_src.query.get(
class_name=from_class_name, properties=properties
)
.with_additional(["vector", "id"])
.with_limit(limit)
)
if after_uuid:
query = query.with_after(after_uuid)
if from_tenant:
query = query.with_tenant(from_tenant)
source_data = query.do()
if "errors" in source_data:
raise Exception(
f"Failed to get data after object UUID '{after_uuid}' for class '{from_class_name}'",
f" from '{from_tenant}'!\n" if from_tenant else "\n",
source_data["errors"],
)
page_object = source_data["data"]["Get"][from_class_name]
if len(page_object) == 0:
break
after_uuid = ingest_data_in_batches(objects=page_object)
pbar.update(limit)
except:
print(
f"Something went wrong. The last after_uuid was: '{after_uuid}' for Source Weaviate "
f"class {from_class_name}"
f" from tenant {from_tenant}! "
if from_tenant
else ". "
f"The Target Weaviate class was {to_class_name}"
f" with tenant {to_tenant}!\n"
if to_tenant
else "!\n"
)
raise
finally:
# The migration function uses the batch API in a context manager and when it exits
# the context manager it also shuts down the BatchExecutor, so we can re-start it here.
# It gets automatically started when entering a new context manager but prints a warning.
# It is started in 'finally' in case there is a re-try mechanism on errors
client_tgt.batch.start()
from weaviate import Client, Tenant
SOURCE_WEAVIATE_URL = "http://localhost:8080" # Your source endpoint
TARGET_WEAVIATE_URL = "http://localhost:8099" # Your target endpoint
source_client = Client(url=SOURCE_WEAVIATE_URL)
target_client = Client(url=TARGET_WEAVIATE_URL)
# Migrate the data with the `migrate_data_from_weaviate_to_weaviate` function defined above
source_class = "WineReviewMT"
source_tenant = source_tenants[0] # Pick a source tenant
target_class = "WineReviewMT"
target_tenant = target_tenants[0] # Pick a target tenant
print(f"Start migration for class '{source_class}'")
migrate_data_from_weaviate_to_weaviate(
client_src=source_client,
client_tgt=target_client,
from_class_name=source_class,
from_tenant=source_tenant.name,
to_class_name=target_class,
to_tenant=target_tenant.name
)
print(f"Tenant '{source_tenant.name}' in class '{source_class}' migrated to tenant '{target_tenant.name}' in '{target_class}' in '{TARGET_WEAVIATE_URL}'")
# END TenantToCollection
agg_response = target_client.query.aggregate("WineReviewMT").with_meta_count().with_tenant(target_tenants[0].name).do()
assert str(DATASET_SIZE) in str(agg_response)
import weaviate, { Collection, WeaviateClient } from 'weaviate-client'
let client_src: WeaviateClient,client_tgt: WeaviateClient;
let reviews_mt_tgt, reviews_mt_src;
let maxItems = await reviews_src.length()
let counter: number
async function migrateData(collection_src: Collection, collection_tgt: Collection) {
let itemsToInsert = []
const promises = []
for await (const item of collection_src.iterator({ includeVector: true })) {
// Check if we've reached the maximum items
if (counter >= maxItems) {
console.log(`Reached maximum items limit of ${maxItems}`);
break;
}
counter++;
if (counter % 1000 == 0)
console.log(`Import: ${counter}`)
let objectToInsert = {
properties: item.properties,
vector: item.vectors.default,
uuid: item.uuid,
}
// Add object to batching array
itemsToInsert.push(objectToInsert)
if (itemsToInsert.length == 1000 || counter == maxItems) {
const promise = collection_tgt.data.insertMany(itemsToInsert)
.then((response) => {
console.log(`Successfully imported batch of ${Object.keys(response.uuids).length} items`);
if (response.hasErrors) {
throw new Error("Error in batch import!");
}
})
.catch((error) => {
console.error('Error importing batch:', error);
})
promises.push(promise)
itemsToInsert = [];
}
}
// Runs all promises
await Promise.all(promises)
}
// Variables initialized above
reviews_mt_src = client_src.collections.get("WineReviewMT")
reviews_src_tenant_a = reviews_mt_src.withTenant("tenantA")
reviews_mt_tgt = client_tgt.collections.get("WineReviewMT")
reviews_tgt_tenant_a = reviews_mt_tgt.withTenant(tenantsTgt[0].name)
migrateData(reviews_src_tenant_a, reviews_tgt_tenant_a)
client_src.close()
client_tgt.close()
Related pages
Questions and feedback
If you have any questions or feedback, let us know in the user forum.