Skip to content

OpenSearch Vector Search: Python Examples

This document contains practical Python code examples for implementing vector search with OpenSearch.


Index Setup and Configuration

OpenSearch Client Setup and Index Creation

from opensearchpy import OpenSearch
import json

# Client configuration
client = OpenSearch([
    {'host': 'localhost', 'port': 9200}
],
http_auth=('admin', 'admin'),  # Configure authentication
use_ssl=True,
verify_certs=False,
ssl_show_warn=False
)

def create_optimized_vector_index(index_name, vector_config):
    """Create vector index with optimized settings"""

    index_body = {
        "settings": {
            "index": {
                "knn": True,
                "number_of_shards": calculate_optimal_shards(vector_config['expected_docs']),
                "number_of_replicas": 1,
                "refresh_interval": "30s",  # Batch refresh for better performance
                "codec": "best_compression",  # Reduce disk usage
                "max_result_window": 50000  # Allow larger result sets
            }
        },
        "mappings": {
            "properties": create_vector_mapping(vector_config)
        }
    }

    try:
        response = client.indices.create(index=index_name, body=index_body)
        print(f"Created index {index_name}: {response}")
        return True
    except Exception as e:
        print(f"Error creating index: {e}")
        return False

def calculate_optimal_shards(expected_docs):
    """Calculate optimal shard count based on data size"""
    if expected_docs < 100_000:
        return 1
    elif expected_docs < 1_000_000:
        return 2
    elif expected_docs < 10_000_000:
        return 3
    else:
        return 5  # Balance parallelism with overhead

def create_vector_mapping(config):
    """Generate vector field mappings"""
    properties = {
        "title": {"type": "text", "analyzer": "standard"},
        "content": {"type": "text", "analyzer": "standard"},
        "category": {"type": "keyword"},
        "timestamp": {"type": "date"},
        "url": {"type": "keyword", "index": False}  # Store but don't index
    }

    # Add vector fields
    for field_name, field_config in config['vector_fields'].items():
        properties[field_name] = {
            "type": "knn_vector",
            "dimension": field_config['dimension'],
            "method": {
                "name": field_config['algorithm'],
                "space_type": field_config['space_type'],
                "engine": "lucene",
                "parameters": field_config['parameters']
            }
        }

    return properties

# Example usage
vector_config = {
    "expected_docs": 1_000_000,
    "vector_fields": {
        "content_vector": {
            "dimension": 384,
            "algorithm": "hnsw",
            "space_type": "cosinesimil",
            "parameters": {"ef_construction": 256, "m": 24}
        }
    }
}

create_optimized_vector_index("my_documents", vector_config)

Document Indexing

Single Document Indexing

def index_document_with_vector(index_name, doc_id, title, content, vector, metadata=None):
    """Index a single document with vector"""

    document = {
        "title": title,
        "content": content,
        "content_vector": vector,
        "timestamp": datetime.utcnow().isoformat(),
        "doc_length": len(content),
        **(metadata or {})
    }

    try:
        response = client.index(
            index=index_name,
            id=doc_id,
            body=document,
            refresh=False  # Don't force immediate refresh
        )
        return response['result'] == 'created'
    except Exception as e:
        print(f"Error indexing document {doc_id}: {e}")
        return False

Optimized Bulk Indexing

def bulk_index_documents(index_name, documents, batch_size=1000):
    """Efficiently bulk index documents with vectors"""

    total_docs = len(documents)
    successful = 0

    for i in range(0, total_docs, batch_size):
        batch = documents[i:i+batch_size]
        bulk_body = []

        for doc in batch:
            # Index action
            action = {
                "index": {
                    "_index": index_name,
                    "_id": doc.get("id", f"doc_{i}_{len(bulk_body)//2}")
                }
            }

            # Document body
            doc_body = {
                "title": doc["title"],
                "content": doc["content"],
                "content_vector": doc["vector"],
                "timestamp": doc.get("timestamp", datetime.utcnow().isoformat()),
                "category": doc.get("category", "general"),
                "url": doc.get("url", ""),
                "doc_length": len(doc["content"])
            }

            bulk_body.extend([action, doc_body])

        try:
            response = client.bulk(
                body=bulk_body,
                refresh=False,  # Batch refresh later
                timeout='60s'
            )

            # Count successful operations
            for item in response['items']:
                if 'index' in item and item['index']['status'] in [200, 201]:
                    successful += 1

        except Exception as e:
            print(f"Error in bulk indexing batch {i//batch_size}: {e}")

    # Force refresh after bulk operations
    client.indices.refresh(index=index_name)

    print(f"Successfully indexed {successful}/{total_docs} documents")
    return successful

# Example usage
documents = [
    {
        "id": "doc_1",
        "title": "Introduction to Machine Learning",
        "content": "Machine learning is a subset of artificial intelligence...",
        "vector": generate_embedding("Introduction to Machine Learning..."),
        "category": "education",
        "url": "https://example.com/ml-intro"
    },
    # ... more documents
]

bulk_index_documents("my_documents", documents)

def simple_vector_search(index_name, query_vector, k=10):
    """Perform basic vector similarity search"""

    search_body = {
        "size": k,
        "query": {
            "knn": {
                "content_vector": {
                    "vector": query_vector,
                    "k": k
                }
            }
        },
        "_source": ["title", "content", "category", "timestamp"]
    }

    response = client.search(index=index_name, body=search_body)
    return format_search_results(response)

def format_search_results(response):
    """Format OpenSearch response for easy consumption"""
    results = []

    for hit in response["hits"]["hits"]:
        result = {
            "id": hit["_id"],
            "score": hit["_score"],
            "title": hit["_source"].get("title", ""),
            "content": hit["_source"].get("content", "")[:200] + "...",
            "category": hit["_source"].get("category", ""),
            "timestamp": hit["_source"].get("timestamp", "")
        }
        results.append(result)

    return {
        "total_hits": response["hits"]["total"]["value"],
        "max_score": response["hits"]["max_score"],
        "results": results,
        "took_ms": response["took"]
    }

# Example usage
query_embedding = generate_embedding("machine learning algorithms")
results = simple_vector_search("my_documents", query_embedding, k=5)

for result in results["results"]:
    print(f"Score: {result['score']:.4f} - {result['title']}")

Advanced Vector Search with Parameters

def advanced_vector_search(index_name, query_vector, k=10, ef_search=None, algorithm_params=None):
    """Vector search with algorithm-specific parameter tuning"""

    knn_query = {
        "content_vector": {
            "vector": query_vector,
            "k": k
        }
    }

    # Add algorithm-specific parameters
    if ef_search:  # [HNSW](./glossary.md#hnsw-hierarchical-navigable-small-world) parameter
        knn_query["content_vector"]["ef_search"] = ef_search

    if algorithm_params:  # Additional algorithm parameters
        knn_query["content_vector"].update(algorithm_params)

    search_body = {
        "size": k,
        "query": {"knn": knn_query},
        "_source": ["title", "content", "category", "timestamp"],
        "explain": False  # Set to True for debugging relevance scores
    }

    response = client.search(index=index_name, body=search_body)
    return format_search_results(response)

# Examples of parameter tuning
# High accuracy search (slower)
high_accuracy_results = advanced_vector_search(
    "my_documents",
    query_vector,
    k=10,
    ef_search=200
)

# Balanced search
balanced_results = advanced_vector_search(
    "my_documents",
    query_vector,
    k=10,
    ef_search=100
)

# Fast search (lower accuracy)
fast_results = advanced_vector_search(
    "my_documents",
    query_vector,
    k=10,
    ef_search=50
)

Advanced Search Techniques

Text Search Implementation

def text_search(index_name, query_text, k=10):
    """Perform traditional text search"""

    search_body = {
        "size": k,
        "query": {
            "multi_match": {
                "query": query_text,
                "fields": ["title^2", "content"],
                "type": "best_fields"
            }
        }
    }

    response = client.search(index=index_name, body=search_body)
    return format_search_results(response)

def vector_search(index_name, query_vector, k=10):
    """Perform pure vector search"""
    return simple_vector_search(index_name, query_vector, k)

Rank Fusion for Combined Results

def rank_fusion_search(index_name, query_text, query_vector, k=10):
    """Use reciprocal rank fusion to combine text and vector search results"""

    # Perform separate text and vector searches
    text_results = text_search(index_name, query_text, k=k*2)
    vector_results = vector_search(index_name, query_vector, k=k*2)

    # Apply reciprocal rank fusion
    fused_scores = reciprocal_rank_fusion(
        text_results["results"],
        vector_results["results"],
        k_constant=60
    )

    # Get final results
    final_results = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)[:k]

    return format_fusion_results(final_results, index_name)

def reciprocal_rank_fusion(text_results, vector_results, k_constant=60):
    """Apply reciprocal rank fusion algorithm"""

    doc_scores = {}

    # Add text search scores
    for rank, result in enumerate(text_results, 1):
        doc_id = result["id"]
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k_constant + rank)

    # Add vector search scores
    for rank, result in enumerate(vector_results, 1):
        doc_id = result["id"]
        doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1.0 / (k_constant + rank)

    return doc_scores

Hybrid Search Implementation

def hybrid_search(index_name, text_query, query_vector, k=10, text_boost=1.0, vector_boost=1.0):
    """Combine text and vector search using OpenSearch hybrid query"""

    search_body = {
        "size": k,
        "query": {
            "hybrid": {
                "queries": [
                    {
                        "match": {
                            "content": {
                                "query": text_query,
                                "boost": text_boost
                            }
                        }
                    },
                    {
                        "knn": {
                            "content_vector": {
                                "vector": query_vector,
                                "k": k,
                                "boost": vector_boost
                            }
                        }
                    }
                ]
            }
        }
    }

    response = client.search(index=index_name, body=search_body)
    return format_search_results(response)

# Example usage
results = hybrid_search(
    "my_documents",
    text_query="machine learning algorithms",
    query_vector=generate_embedding("machine learning algorithms"),
    k=10,
    text_boost=1.2,  # Slightly favor text matches
    vector_boost=1.0
)
def adaptive_hybrid_search(index_name, query_text, k=10):
    """Automatically adjust hybrid search strategy based on query characteristics"""

    # Generate query vector
    query_vector = generate_embedding(query_text)

    # Analyze query characteristics
    query_analysis = analyze_query(query_text)

    # Adjust search strategy based on analysis
    if query_analysis["is_factual"]:
        # Factual queries benefit from text search
        text_boost, vector_boost = 2.0, 0.8
    elif query_analysis["is_conceptual"]:
        # Conceptual queries benefit from vector search
        text_boost, vector_boost = 0.8, 2.0
    elif query_analysis["has_specific_terms"]:
        # Queries with specific terms benefit from balanced approach
        text_boost, vector_boost = 1.5, 1.0
    else:
        # Default balanced approach
        text_boost, vector_boost = 1.0, 1.0

    return hybrid_search(index_name, query_text, query_vector, k, text_boost, vector_boost)

def analyze_query(query_text):
    """Analyze query characteristics to inform search strategy"""

    # Simple heuristics (can be replaced with ML models)
    lower_query = query_text.lower()

    factual_indicators = ["what is", "how to", "when did", "where is", "who", "definition"]
    conceptual_indicators = ["similar to", "like", "about", "related to", "concepts"]
    specific_terms = ["api", "function", "class", "method", "error", "code"]

    analysis = {
        "is_factual": any(indicator in lower_query for indicator in factual_indicators),
        "is_conceptual": any(indicator in lower_query for indicator in conceptual_indicators),
        "has_specific_terms": any(term in lower_query for term in specific_terms),
        "query_length": len(query_text.split()),
        "has_quotes": '"' in query_text
    }

    return analysis

# Example usage
results = adaptive_hybrid_search("my_documents", "what is machine learning?", k=10)

Multi-Modal Index Setup

def setup_multimodal_index():
    """Create index supporting multiple content modalities"""

    mapping = {
        "settings": {
            "index": {"knn": True}
        },
        "mappings": {
            "properties": {
                # Content metadata
                "title": {"type": "text"},
                "description": {"type": "text"},
                "content_type": {"type": "keyword"},  # "text", "image", "audio", "video"
                "source_url": {"type": "keyword"},
                "timestamp": {"type": "date"},

                # Raw content storage
                "text_content": {"type": "text"},
                "image_url": {"type": "keyword", "index": False},
                "image_metadata": {
                    "properties": {
                        "width": {"type": "integer"},
                        "height": {"type": "integer"},
                        "format": {"type": "keyword"},
                        "file_size": {"type": "long"}
                    }
                },

                # Unified embedding for cross-modal search
                "unified_vector": {
                    "type": "knn_vector",
                    "dimension": 512,  # Shared embedding dimension
                    "method": {
                        "name": "hnsw",
                        "space_type": "cosinesimil",
                        "parameters": {"ef_construction": 256, "m": 32}
                    }
                },

                # Modality-specific vectors for within-modal search
                "text_vector": {
                    "type": "knn_vector",
                    "dimension": 384,
                    "method": {
                        "name": "hnsw",
                        "space_type": "cosinesimil",
                        "parameters": {"ef_construction": 128, "m": 24}
                    }
                },

                "image_vector": {
                    "type": "knn_vector",
                    "dimension": 512,
                    "method": {
                        "name": "hnsw",
                        "space_type": "l2",
                        "parameters": {"ef_construction": 256, "m": 32}
                    }
                }
            }
        }
    }

    client.indices.create(index="multimodal_content", body=mapping)

Cross-Modal Search Functions

def text_to_image_search(text_query, k=10, similarity_threshold=0.7):
    """Find images using text descriptions"""

    # Generate text embedding using multimodal model
    text_embedding = generate_multimodal_embedding(text_query, modality="text")

    search_body = {
        "size": k,
        "query": {
            "bool": {
                "must": [
                    {
                        "knn": {
                            "unified_vector": {
                                "vector": text_embedding,
                                "k": k * 2  # Get extra results for filtering
                            }
                        }
                    }
                ],
                "filter": [
                    {"term": {"content_type": "image"}}
                ]
            }
        },
        "min_score": similarity_threshold  # Filter low-similarity results
    }

    response = client.search(index="multimodal_content", body=search_body)
    return format_multimodal_results(response)

# Example usage
results = text_to_image_search("sunset over mountains", k=5)
for result in results["results"]:
    print(f"Score: {result['score']:.3f} - {result['title']}")
    print(f"Image: {result['image_url']}")

def image_to_text_search(image_path, k=10):
    """Find text content similar to an image"""

    # Generate image embedding
    image_embedding = generate_multimodal_embedding(image_path, modality="image")

    search_body = {
        "size": k,
        "query": {
            "bool": {
                "must": [
                    {
                        "knn": {
                            "unified_vector": {
                                "vector": image_embedding,
                                "k": k * 2
                            }
                        }
                    }
                ],
                "filter": [
                    {"term": {"content_type": "text"}}
                ]
            }
        }
    }

    response = client.search(index="multimodal_content", body=search_body)
    return format_multimodal_results(response)

def universal_multimodal_search(query_content, query_type, target_types=None, k=10):
    """Search across all modalities with any input type"""

    # Generate embedding based on input type
    if query_type == "text":
        query_embedding = generate_multimodal_embedding(query_content, "text")
    elif query_type == "image":
        query_embedding = generate_multimodal_embedding(query_content, "image")
    elif query_type == "audio":
        query_embedding = generate_multimodal_embedding(query_content, "audio")
    else:
        raise ValueError(f"Unsupported query type: {query_type}")

    # Build search query
    search_query = {
        "knn": {
            "unified_vector": {
                "vector": query_embedding,
                "k": k * 2
            }
        }
    }

    # Filter by target content types if specified
    if target_types:
        search_query = {
            "bool": {
                "must": [search_query],
                "filter": [
                    {"terms": {"content_type": target_types}}
                ]
            }
        }

    search_body = {"size": k, "query": search_query}
    response = client.search(index="multimodal_content", body=search_body)

    return format_multimodal_results(response)

# Examples
# Find any content similar to an image
all_results = universal_multimodal_search("path/to/image.jpg", "image", k=10)

# Find only text and video content similar to audio
text_video_results = universal_multimodal_search(
    "path/to/audio.mp3",
    "audio",
    target_types=["text", "video"],
    k=10
)

Modality-Specific Boosting

def boosted_multimodal_search(query_embedding, query_type, k=10):
    """Apply different boosting strategies based on query modality"""

    # Define boost weights based on typical cross-modal similarities
    boost_matrix = {
        "text": {"text": 2.0, "image": 1.0, "audio": 0.8, "video": 1.2},
        "image": {"text": 1.0, "image": 2.0, "audio": 0.6, "video": 1.8},
        "audio": {"text": 0.8, "image": 0.6, "audio": 2.0, "video": 1.5}
    }

    should_queries = []
    for target_type, boost in boost_matrix[query_type].items():
        should_queries.append({
            "bool": {
                "must": [
                    {
                        "knn": {
                            "unified_vector": {
                                "vector": query_embedding,
                                "k": k
                            }
                        }
                    }
                ],
                "filter": [{"term": {"content_type": target_type}}],
                "boost": boost
            }
        })

    search_body = {
        "size": k,
        "query": {
            "bool": {
                "should": should_queries,
                "minimum_should_match": 1
            }
        }
    }

    return client.search(index="multimodal_content", body=search_body)

Recommendation Systems

User Profile Vector Creation

def create_user_profile_vector(user_id, interaction_history, decay_factor=0.95):
    """Build user preference vector from interaction history"""

    content_vectors = []
    weights = []
    current_time = datetime.now()

    for interaction in interaction_history:
        # Get content vector
        content_doc = client.get(
            index="content_library",
            id=interaction["content_id"]
        )
        content_vector = content_doc["_source"]["content_vector"]

        # Calculate weight based on interaction type and recency
        interaction_weights = {
            "view": 1.0,
            "like": 3.0,
            "share": 5.0,
            "bookmark": 4.0,
            "download": 6.0,
            "purchase": 10.0
        }

        base_weight = interaction_weights.get(interaction["type"], 1.0)

        # Apply temporal decay
        days_ago = (current_time - interaction["timestamp"]).days
        temporal_weight = decay_factor ** days_ago

        final_weight = base_weight * temporal_weight

        content_vectors.append(content_vector)
        weights.append(final_weight)

    # Calculate weighted average
    if not content_vectors:
        return None

    user_vector = weighted_vector_average(content_vectors, weights)
    return normalize_vector(user_vector)

def weighted_vector_average(vectors, weights):
    """Calculate weighted average of vectors"""
    total_weight = sum(weights)
    if total_weight == 0:
        return [0] * len(vectors[0])

    result_vector = [0] * len(vectors[0])
    for vector, weight in zip(vectors, weights):
        for i, value in enumerate(vector):
            result_vector[i] += value * weight

    return [value / total_weight for value in result_vector]

Personalized Recommendations

def generate_personalized_recommendations(user_vector, exclude_content=None,
                                        diversity_factor=0.3, k=20):
    """Generate recommendations balancing relevance and diversity"""

    # Base similarity search
    search_body = {
        "size": k * 3,  # Get extra results for diversity filtering
        "query": {
            "knn": {
                "content_vector": {
                    "vector": user_vector,
                    "k": k * 3
                }
            }
        }
    }

    # Exclude already seen content
    if exclude_content:
        search_body["query"] = {
            "bool": {
                "must": [search_body["query"]],
                "must_not": [
                    {"terms": {"_id": exclude_content}}
                ]
            }
        }

    response = client.search(index="content_library", body=search_body)
    candidates = response["hits"]["hits"]

    # Apply diversity and business logic filtering
    recommendations = diversified_selection(
        candidates,
        diversity_factor=diversity_factor,
        final_count=k
    )

    return format_recommendations(recommendations)

def diversified_selection(candidates, diversity_factor=0.3, final_count=20):
    """Select diverse recommendations using maximal marginal relevance"""

    if len(candidates) <= final_count:
        return candidates

    selected = []
    remaining = list(candidates)

    # Always select the top result first
    selected.append(remaining.pop(0))

    # Select remaining items balancing relevance and diversity
    while len(selected) < final_count and remaining:
        best_score = -1
        best_idx = 0

        for i, candidate in enumerate(remaining):
            # Relevance score (similarity to user vector)
            relevance = candidate["_score"]

            # Diversity score (distance from already selected items)
            if selected:
                similarities = []
                candidate_vector = candidate["_source"]["content_vector"]

                for selected_item in selected:
                    selected_vector = selected_item["_source"]["content_vector"]
                    sim = cosine_similarity(candidate_vector, selected_vector)
                    similarities.append(sim)

                diversity = 1.0 - max(similarities)  # Distance from closest selected
            else:
                diversity = 1.0

            # Combined score
            combined_score = (1 - diversity_factor) * relevance + diversity_factor * diversity

            if combined_score > best_score:
                best_score = combined_score
                best_idx = i

        selected.append(remaining.pop(best_idx))

    return selected

Collaborative Filtering

def collaborative_vector_recommendations(target_user_id, user_vectors_index, k=20):
    """Generate recommendations based on similar users' preferences"""

    # Get target user's profile vector
    target_user_doc = client.get(
        index=user_vectors_index,
        id=target_user_id
    )
    target_vector = target_user_doc["_source"]["user_vector"]

    # Find similar users
    similar_users = client.search(
        index=user_vectors_index,
        body={
            "size": 50,  # Consider top 50 similar users
            "query": {
                "knn": {
                    "user_vector": {
                        "vector": target_vector,
                        "k": 50
                    }
                }
            }
        }
    )

    # Aggregate content preferences from similar users
    content_scores = {}

    for user_hit in similar_users["hits"]["hits"]:
        user_similarity = user_hit["_score"]
        user_preferences = user_hit["_source"]["recent_interactions"]

        for content_id, interaction_strength in user_preferences.items():
            if content_id not in content_scores:
                content_scores[content_id] = 0

            # Weight by user similarity and interaction strength
            content_scores[content_id] += user_similarity * interaction_strength

    # Get top content recommendations
    top_content_ids = sorted(
        content_scores.items(),
        key=lambda x: x[1],
        reverse=True
    )[:k]

    return format_collaborative_recommendations(top_content_ids)

Hybrid Recommendation Systems

def hybrid_recommendations(user_id, content_based_weight=0.6,
                          collaborative_weight=0.3, popularity_weight=0.1, k=20):
    """Combine multiple recommendation signals"""

    # Get different types of recommendations
    content_based = generate_content_based_recs(user_id, k=k*2)
    collaborative = generate_collaborative_recs(user_id, k=k*2)
    popularity_based = get_trending_content(k=k//2)

    # Combine scores using weighted fusion
    combined_scores = {}

    # Content-based recommendations
    for i, rec in enumerate(content_based):
        content_id = rec["content_id"]
        # Decay rank-based score
        score = content_based_weight * (1.0 / (i + 1))
        combined_scores[content_id] = combined_scores.get(content_id, 0) + score

    # Collaborative recommendations
    for i, rec in enumerate(collaborative):
        content_id = rec["content_id"]
        score = collaborative_weight * (1.0 / (i + 1))
        combined_scores[content_id] = combined_scores.get(content_id, 0) + score

    # Popularity boost
    for i, rec in enumerate(popularity_based):
        content_id = rec["content_id"]
        score = popularity_weight * (1.0 / (i + 1))
        combined_scores[content_id] = combined_scores.get(content_id, 0) + score

    # Sort and return top k
    final_recommendations = sorted(
        combined_scores.items(),
        key=lambda x: x[1],
        reverse=True
    )[:k]

    return enrich_recommendations(final_recommendations)

Real-Time Applications

Real-Time Indexing System

import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue

class RealTimeVectorIndexer:
    def __init__(self, index_name, max_workers=4, batch_size=100):
        self.index_name = index_name
        self.max_workers = max_workers
        self.batch_size = batch_size
        self.document_queue = queue.Queue()
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.running = False

    async def start_indexing(self):
        """Start background indexing process"""
        self.running = True

        # Start batch processing coroutines
        tasks = []
        for _ in range(self.max_workers):
            task = asyncio.create_task(self.process_document_batches())
            tasks.append(task)

        await asyncio.gather(*tasks)

    async def process_document_batches(self):
        """Process documents in batches for efficiency"""
        batch = []

        while self.running:
            try:
                # Collect batch
                while len(batch) < self.batch_size:
                    try:
                        doc = self.document_queue.get(timeout=1.0)
                        batch.append(doc)
                    except queue.Empty:
                        break

                if batch:
                    await self.index_batch(batch)
                    batch.clear()
                else:
                    await asyncio.sleep(0.1)  # Brief pause if no documents

            except Exception as e:
                print(f"Error processing batch: {e}")
                await asyncio.sleep(1.0)

    async def index_batch(self, documents):
        """Index a batch of documents"""
        loop = asyncio.get_event_loop()

        # Run indexing in thread pool to avoid blocking
        await loop.run_in_executor(
            self.executor,
            self._sync_index_batch,
            documents
        )

    def _sync_index_batch(self, documents):
        """Synchronous batch indexing"""
        bulk_body = []

        for doc in documents:
            action = {"index": {"_index": self.index_name, "_id": doc["id"]}}
            bulk_body.extend([action, doc["content"]])

        if bulk_body:
            client.bulk(body=bulk_body, refresh=True)

    def add_document(self, document):
        """Add document to indexing queue"""
        self.document_queue.put(document)

    def stop(self):
        """Stop indexing process"""
        self.running = False

# Usage example
indexer = RealTimeVectorIndexer("live_content")

# Start indexing in background
asyncio.create_task(indexer.start_indexing())

# Add documents as they arrive
new_document = {
    "id": "doc_123",
    "content": {
        "title": "Breaking News",
        "content": "Latest developments in AI...",
        "content_vector": generate_embedding("Latest developments in AI..."),
        "timestamp": datetime.now().isoformat()
    }
}

indexer.add_document(new_document)

High-Performance Search API

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
from typing import List, Optional

app = FastAPI()

class SearchRequest(BaseModel):
    query: str
    query_type: str = "hybrid"  # "text", "vector", "hybrid"
    k: int = 10
    filters: Optional[dict] = None
    ef_search: Optional[int] = None

class SearchResult(BaseModel):
    id: str
    title: str
    content: str
    score: float
    timestamp: str

class SearchResponse(BaseModel):
    results: List[SearchResult]
    total_hits: int
    took_ms: int
    query_info: dict

class HighPerformanceSearchAPI:
    def __init__(self, opensearch_client):
        self.client = opensearch_client
        self.embedding_cache = {}

    async def search(self, request: SearchRequest) -> SearchResponse:
        """Main search endpoint with async processing"""
        start_time = asyncio.get_event_loop().time()

        try:
            if request.query_type == "vector":
                results = await self.vector_search(request)
            elif request.query_type == "text":
                results = await self.text_search(request)
            else:  # hybrid
                results = await self.hybrid_search(request)

            end_time = asyncio.get_event_loop().time()
            took_ms = int((end_time - start_time) * 1000)

            return SearchResponse(
                results=results["results"],
                total_hits=results["total_hits"],
                took_ms=took_ms,
                query_info={
                    "query_type": request.query_type,
                    "processed_query": request.query,
                    "k": request.k
                }
            )

        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

    async def vector_search(self, request: SearchRequest):
        """Async vector search with caching"""
        # Get or generate embedding
        query_vector = await self.get_embedding_cached(request.query)

        search_body = self.build_vector_query(
            query_vector,
            request.k,
            request.filters,
            request.ef_search
        )

        # Execute search in thread pool
        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: self.client.search(index="live_content", body=search_body)
        )

        return self.format_results(response)

    async def get_embedding_cached(self, text: str):
        """Get embedding with LRU cache"""
        if text in self.embedding_cache:
            return self.embedding_cache[text]

        # Generate embedding asynchronously
        loop = asyncio.get_event_loop()
        embedding = await loop.run_in_executor(
            None,
            generate_embedding,
            text
        )

        # Cache with size limit
        if len(self.embedding_cache) > 1000:
            # Remove oldest entry
            oldest_key = next(iter(self.embedding_cache))
            del self.embedding_cache[oldest_key]

        self.embedding_cache[text] = embedding
        return embedding

# FastAPI endpoints
search_api = HighPerformanceSearchAPI(client)

@app.post("/search", response_model=SearchResponse)
async def search_endpoint(request: SearchRequest):
    return await search_api.search(request)

@app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

# WebSocket for real-time search
@app.websocket("/ws/search")
async def websocket_search(websocket):
    await websocket.accept()

    try:
        while True:
            # Receive search request
            data = await websocket.receive_json()
            request = SearchRequest(**data)

            # Process search
            response = await search_api.search(request)

            # Send results
            await websocket.send_json(response.dict())

    except Exception as e:
        await websocket.send_json({"error": str(e)})
        await websocket.close()

Performance Monitoring

import time
from collections import defaultdict, deque
import statistics

class SearchPerformanceMonitor:
    def __init__(self, window_size=1000):
        self.window_size = window_size
        self.latencies = deque(maxlen=window_size)
        self.query_types = defaultdict(lambda: deque(maxlen=window_size))
        self.error_counts = defaultdict(int)
        self.total_queries = 0

    def record_query(self, query_type: str, latency_ms: float, success: bool):
        """Record query performance metrics"""
        self.total_queries += 1

        if success:
            self.latencies.append(latency_ms)
            self.query_types[query_type].append(latency_ms)
        else:
            self.error_counts[query_type] += 1

    def get_performance_stats(self):
        """Get current performance statistics"""
        if not self.latencies:
            return {"status": "no_data"}

        overall_stats = {
            "total_queries": self.total_queries,
            "avg_latency_ms": statistics.mean(self.latencies),
            "p95_latency_ms": statistics.quantiles(self.latencies, n=20)[18],  # 95th percentile
            "p99_latency_ms": statistics.quantiles(self.latencies, n=100)[98],  # 99th percentile
            "error_rate": sum(self.error_counts.values()) / self.total_queries
        }

        # Per-query-type stats
        type_stats = {}
        for query_type, latencies in self.query_types.items():
            if latencies:
                type_stats[query_type] = {
                    "count": len(latencies),
                    "avg_latency_ms": statistics.mean(latencies),
                    "p95_latency_ms": statistics.quantiles(latencies, n=20)[18] if len(latencies) >= 20 else max(latencies)
                }

        return {
            "overall": overall_stats,
            "by_type": type_stats,
            "errors": dict(self.error_counts)
        }

# Integration with search API
monitor = SearchPerformanceMonitor()

@app.middleware("http")
async def performance_middleware(request, call_next):
    if request.url.path.startswith("/search"):
        start_time = time.time()

        try:
            response = await call_next(request)
            latency_ms = (time.time() - start_time) * 1000

            # Extract query type from request or response
            query_type = "unknown"  # Extract from actual request

            monitor.record_query(query_type, latency_ms, True)
            return response

        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            monitor.record_query("error", latency_ms, False)
            raise
    else:
        return await call_next(request)

@app.get("/metrics")
async def get_metrics():
    return monitor.get_performance_stats()

This collection provides comprehensive Python implementation examples for building production-ready vector search systems with OpenSearch.


See important disclaimers.