Select and optimize embedding models for semantic search and RAG applications. Use when choosing embedding models, implementing chunking strategies, or optimizing embedding quality for specific domains.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Guide to selecting and optimizing embedding models for vector search applications.
| Model | Dimensions | Max Tokens | Best For |
|---|---|---|---|
| text-embedding-3-large | 3072 | 8191 | High accuracy |
| text-embedding-3-small | 1536 | 8191 | Cost-effective |
| voyage-2 | 1024 | 4000 | Code, legal |
| bge-large-en-v1.5 | 1024 | 512 | Open source |
| all-MiniLM-L6-v2 | 384 | 256 | Fast, lightweight |
| multilingual-e5-large | 1024 | 512 | Multi-language |
Document → Chunking → Preprocessing → Embedding Model → Vector
↓
[Overlap, Size] [Clean, Normalize] [API/Local]
from openai import OpenAI
from typing import List
import numpy as np
client = OpenAI()
def get_embeddings(
texts: List[str],
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Get embeddings from OpenAI."""
# Handle batching for large lists
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
kwargs = {"input": batch, "model": model}
if dimensions:
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def get_embedding(text: str, **kwargs) -> List[float]:
"""Get single embedding."""
return get_embeddings([text], **kwargs)[0]
# Dimension reduction with OpenAI
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Get embedding with reduced dimensions (Matryoshka)."""
return get_embedding(
text,
model="text-embedding-3-small",
dimensions=dimensions
)
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np
class LocalEmbedder:
"""Local embedding with sentence-transformers."""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
def embed(
self,
texts: List[str],
normalize: bool = True,
show_progress: bool = False
) -> np.ndarray:
"""Embed texts with optional normalization."""
embeddings = self.model.encode(
texts,
normalize_embeddings=normalize,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Embed a query with BGE-style prefix."""
# BGE models benefit from query prefix
if "bge" in self.model.get_sentence_embedding_dimension():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
def embed_documents(self, documents: List[str]) -> np.ndarray:
"""Embed documents for indexing."""
return self.embed(documents)
# E5 model with instructions
class E5Embedder:
def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
return self.model.encode(f"passage: {document}")
from typing import List, Tuple
import re
def chunk_by_tokens(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer=None
) -> List[str]:
"""Chunk text by token count."""
import tiktoken
tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")
tokens = tokenizer.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
return chunks
def chunk_by_sentences(
text: str,
max_chunk_size: int = 1000,
min_chunk_size: int = 100
) -> List[str]:
"""Chunk text by sentences, respecting size limits."""
import nltk
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > max_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_semantic_sections(
text: str,
headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
"""Chunk markdown by headers, preserving hierarchy."""
lines = text.split('\n')
chunks = []
current_header = ""
current_content = []
for line in lines:
if re.match(headers_pattern, line, re.MULTILINE):
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
current_header = line
current_content = []
else:
current_content.append(line)
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
return chunks
def recursive_character_splitter(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: List[str] = None
) -> List[str]:
"""LangChain-style recursive splitter."""
separators = separators or ["\n\n", "\n", ". ", " ", ""]
def split_text(text: str, separators: List[str]) -> List[str]:
if not text:
return []
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Character-level split
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]
splits = text.split(separator)
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > chunk_size and current_chunk:
chunk_text = separator.join(current_chunk)
# Recursively split if still too large
if len(chunk_text) > chunk_size and remaining_separators:
chunks.extend(split_text(chunk_text, remaining_separators))
else:
chunks.append(chunk_text)
# Start new chunk with overlap
overlap_splits = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= chunk_overlap:
overlap_splits.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_splits
current_length = overlap_length
current_chunk.append(split)
current_length += split_length
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
return split_text(text, separators)
class DomainEmbeddingPipeline:
"""Pipeline for domain-specific embeddings."""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
def _default_preprocess(self, text: str) -> str:
"""Default preprocessing."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
async def process_documents(
self,
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: List[str] = None
) -> List[dict]:
"""Process documents for vector storage."""
processed = []
for doc in documents:
content = doc[content_field]
doc_id = doc[id_field]
# Preprocess
cleaned = self.preprocess(content)
# Chunk
chunks = chunk_by_tokens(
cleaned,
self.chunk_size,
self.chunk_overlap
)
# Create embeddings
embeddings = get_embeddings(chunks, self.embedding_model)
# Create records
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
record = {
"id": f"{doc_id}_chunk_{i}",
"document_id": doc_id,
"chunk_index": i,
"text": chunk,
"embedding": embedding
}
# Add metadata
if metadata_fields:
for field in metadata_fields:
if field in doc:
record[field] = doc[field]
processed.append(record)
return processed
# Code-specific pipeline
class CodeEmbeddingPipeline:
"""Specialized pipeline for code embeddings."""
def __init__(self, model: str = "voyage-code-2"):
self.model = model
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Chunk code by functions/classes."""
import tree_sitter
# Parse with tree-sitter
# Extract functions, classes, methods
# Return chunks with context
pass
def embed_with_context(self, chunk: str, context: str) -> List[float]:
"""Embed code with surrounding context."""
combined = f"Context: {context}\n\nCode:\n{chunk}"
return get_embedding(combined, model=self.model)
import numpy as np
from typing import List, Tuple
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # List of relevant doc IDs per query
retrieved_docs: List[List[str]], # List of retrieved doc IDs per query
k: int = 10
) -> dict:
"""Evaluate embedding quality for retrieval."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / len(relevant) if relevant else 0
def mrr(relevant: set, retrieved: List[str]) -> float:
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1 / (i + 1)
return 0
def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
dcg = sum(
1 / np.log2(i + 2) if doc in relevant else 0
for i, doc in enumerate(retrieved[:k])
)
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
metrics = {
f"precision@{k}": [],
f"recall@{k}": [],
"mrr": [],
f"ndcg@{k}": []
}
for relevant, retrieved in zip(relevant_docs, retrieved_docs):
relevant_set = set(relevant)
metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
metrics["mrr"].append(mrr(relevant_set, retrieved))
metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))
return {name: np.mean(values) for name, values in metrics.items()}
def compute_embedding_similarity(
embeddings1: np.ndarray,
embeddings2: np.ndarray,
metric: str = "cosine"
) -> np.ndarray:
"""Compute similarity matrix between embedding sets."""
if metric == "cosine":
# Normalize
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
elif metric == "euclidean":
from scipy.spatial.distance import cdist
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T