import json
import time
from datetime import datetime
from mem0 import MemoryClient
from alchemyst_ai import AlchemystAI
MEM0_KEY = "mem0_api_key"
ALCHEMYST_KEY = "alchemyst_api_key"
# 1. Export from Mem0
mem0 = MemoryClient(api_key=MEM0_KEY)
export = mem0.create_memory_export(
schema={
"type": "object",
"properties": {
"memories": {"type": "array", "items": {"type": "object"}},
},
},
filters={},
)
print(f"Started export {export['id']}")
time.sleep(30)
payload = mem0.get_memory_export(memory_export_id=export["id"])
memories = payload.get("memories", [])
print(f"Fetched {len(memories)} memories")
# 2. Import into Alchemyst AI
alchemyst = AlchemystAI(api_key=ALCHEMYST_KEY)
documents = []
for memory in memories:
content = memory.get("memory") or memory.get("content") or memory.get("summary")
if not content:
continue
documents.append(
{
"content": content,
"metadata": {
"memoryId": memory.get("id"),
"userId": memory.get("user_id"),
"source": "mem0",
},
}
)
if documents:
response = alchemyst.v1.context.add(
documents=documents,
source="mem0_migration",
context_type="resource",
scope="internal",
metadata={
"fileName": f"mem0_migration-{export['id']}.json",
"fileType": "json",
"fileSize": len(json.dumps(documents).encode("utf-8")),
"lastModified": datetime.utcnow().isoformat() + "Z",
"groupName": ["mem0", "imported"],
},
)
print("Migration complete ✅")
print(response)
else:
print("No memories to migrate")