Caching Guide¶
Learn how to configure and optimize SteadyText's caching system for maximum performance.
Overview¶
SteadyText uses a sophisticated frecency cache (frequency + recency) that combines: - LRU (Least Recently Used): Recent items stay cached - Frequency counting: Popular items are retained longer - Disk persistence: Cache survives restarts - Thread safety: Safe for concurrent access
Cache Architecture¶
Two-Tier Cache System¶
┌─────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────┤
│ Generation Cache │ Embedding │
│ (256 entries, 50MB) │ Cache │
│ │(512, 100MB)│
├─────────────────────────────────────┤
│ SQLite Backend (Thread-Safe) │
└─────────────────────────────────────┘
Cache Files Location¶
import steadytext
from pathlib import Path
# Get cache directory
cache_dir = Path.home() / ".cache" / "steadytext" / "caches"
print(f"Cache location: {cache_dir}")
# Cache files
generation_cache = cache_dir / "generation_cache.db"
embedding_cache = cache_dir / "embedding_cache.db"
Configuration¶
Environment Variables¶
# Generation cache settings
export STEADYTEXT_GENERATION_CACHE_CAPACITY=256 # Max entries
export STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=50.0 # Max file size
# Embedding cache settings
export STEADYTEXT_EMBEDDING_CACHE_CAPACITY=512 # Max entries
export STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB=100.0 # Max file size
# Disable cache entirely (not recommended)
export STEADYTEXT_DISABLE_CACHE=1
Python Configuration¶
import os
import steadytext
# Configure before importing/using steadytext
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = '1024'
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = '200.0'
# Verify configuration
from steadytext import get_cache_manager
cache_manager = get_cache_manager()
stats = cache_manager.get_cache_stats()
print(f"Generation cache capacity: {stats['generation']['capacity']}")
Cache Management¶
Monitoring Cache Performance¶
from steadytext import get_cache_manager
import time
class CacheMonitor:
"""Monitor cache performance and hit rates."""
def __init__(self):
self.cache_manager = get_cache_manager()
self.initial_stats = self.cache_manager.get_cache_stats()
def get_hit_rate(self, cache_type='generation'):
"""Calculate cache hit rate."""
stats = self.cache_manager.get_cache_stats()[cache_type]
hits = stats.get('hits', 0)
misses = stats.get('misses', 0)
total = hits + misses
if total == 0:
return 0.0
return hits / total * 100
def monitor_operation(self, operation, *args, **kwargs):
"""Monitor a single operation's cache behavior."""
stats_before = self.cache_manager.get_cache_stats()
start_time = time.time()
result = operation(*args, **kwargs)
duration = time.time() - start_time
stats_after = self.cache_manager.get_cache_stats()
# Determine if it was a cache hit
gen_hits_diff = stats_after['generation']['hits'] - stats_before['generation']['hits']
emb_hits_diff = stats_after['embedding']['hits'] - stats_before['embedding']['hits']
cache_hit = gen_hits_diff > 0 or emb_hits_diff > 0
return {
'result': result,
'duration': duration,
'cache_hit': cache_hit,
'stats_delta': {
'generation_hits': gen_hits_diff,
'embedding_hits': emb_hits_diff
}
}
def print_summary(self):
"""Print cache performance summary."""
stats = self.cache_manager.get_cache_stats()
print("=== Cache Performance Summary ===")
for cache_type in ['generation', 'embedding']:
cache_stats = stats[cache_type]
hit_rate = self.get_hit_rate(cache_type)
print(f"\n{cache_type.title()} Cache:")
print(f" Size: {cache_stats['size']} entries")
print(f" Hit Rate: {hit_rate:.1f}%")
print(f" Hits: {cache_stats.get('hits', 0)}")
print(f" Misses: {cache_stats.get('misses', 0)}")
# Usage example
monitor = CacheMonitor()
# Monitor text generation
result1 = monitor.monitor_operation(
steadytext.generate,
"Write a haiku about caching"
)
print(f"First call: {result1['duration']:.3f}s (cache hit: {result1['cache_hit']})")
# Same prompt - should be cached
result2 = monitor.monitor_operation(
steadytext.generate,
"Write a haiku about caching"
)
print(f"Second call: {result2['duration']:.3f}s (cache hit: {result2['cache_hit']})")
monitor.print_summary()
Cache Warming¶
import steadytext
from typing import List
import concurrent.futures
def warm_cache_sequential(prompts: List[str], seeds: List[int] = None):
"""Warm cache with common prompts sequentially."""
if seeds is None:
seeds = [42] # Default seed only
warmed = 0
for prompt in prompts:
for seed in seeds:
_ = steadytext.generate(prompt, seed=seed, max_new_tokens=100)
warmed += 1
return warmed
def warm_cache_parallel(prompts: List[str], seeds: List[int] = None, max_workers: int = 4):
"""Warm cache with parallel generation."""
if seeds is None:
seeds = [42]
tasks = [(prompt, seed) for prompt in prompts for seed in seeds]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(steadytext.generate, prompt, seed=seed, max_new_tokens=100)
for prompt, seed in tasks
]
# Wait for all to complete
completed = 0
for future in concurrent.futures.as_completed(futures):
future.result() # Get result to ensure completion
completed += 1
return completed
# Common prompts to cache
common_prompts = [
"Write a Python function",
"Explain this error",
"Generate test data",
"Create documentation",
"Write unit tests",
"Optimize this code",
"Review this pull request",
"Suggest improvements"
]
# Common seeds if using multiple
common_seeds = [42, 100, 200] # Add your common seeds
# Warm cache
print("Warming cache...")
warmed = warm_cache_parallel(common_prompts, common_seeds)
print(f"Cache warmed with {warmed} entries")
# Verify cache is warm
from steadytext import get_cache_manager
stats = get_cache_manager().get_cache_stats()
print(f"Generation cache size: {stats['generation']['size']}")
Cache Optimization Strategies¶
import steadytext
from collections import defaultdict
from datetime import datetime, timedelta
class CacheOptimizer:
"""Optimize cache usage patterns."""
def __init__(self):
self.usage_patterns = defaultdict(lambda: {
'count': 0,
'last_used': None,
'avg_generation_time': 0
})
def track_usage(self, prompt: str, seed: int, generation_time: float):
"""Track prompt usage patterns."""
key = f"{prompt}:{seed}"
pattern = self.usage_patterns[key]
pattern['count'] += 1
pattern['last_used'] = datetime.now()
# Update average generation time
avg = pattern['avg_generation_time']
count = pattern['count']
pattern['avg_generation_time'] = (avg * (count - 1) + generation_time) / count
def get_cache_priorities(self, top_n: int = 20):
"""Get prompts that should be prioritized for caching."""
# Score based on frequency and recency
now = datetime.now()
scores = []
for key, pattern in self.usage_patterns.items():
# Frequency score
freq_score = pattern['count']
# Recency score (higher for more recent)
if pattern['last_used']:
age = (now - pattern['last_used']).total_seconds()
recency_score = 1 / (1 + age / 3600) # Decay over hours
else:
recency_score = 0
# Generation time score (prioritize slow generations)
time_score = pattern['avg_generation_time']
# Combined score
score = freq_score * 0.5 + recency_score * 0.3 + time_score * 0.2
scores.append((score, key, pattern))
# Sort by score
scores.sort(reverse=True)
return scores[:top_n]
def recommend_cache_size(self):
"""Recommend optimal cache size based on usage."""
total_unique = len(self.usage_patterns)
frequently_used = sum(1 for p in self.usage_patterns.values() if p['count'] > 5)
# Recommend 1.5x frequently used items + buffer
recommended = int(frequently_used * 1.5 + 50)
return {
'total_unique_prompts': total_unique,
'frequently_used': frequently_used,
'recommended_size': recommended,
'current_default': 256
}
# Example usage
optimizer = CacheOptimizer()
# Simulate usage tracking
import time
test_prompts = [
("Write a function to sort a list", 42),
("Explain machine learning", 42),
("Write a function to sort a list", 42), # Repeated
("Generate test cases", 100),
("Write a function to sort a list", 42), # Popular
]
for prompt, seed in test_prompts:
start = time.time()
_ = steadytext.generate(prompt, seed=seed)
duration = time.time() - start
optimizer.track_usage(prompt, seed, duration)
# Get optimization recommendations
print("=== Cache Optimization Report ===")
priorities = optimizer.get_cache_priorities(5)
print("\nTop prompts to keep cached:")
for score, key, pattern in priorities:
prompt, seed = key.rsplit(':', 1)
print(f" Score: {score:.2f} - {prompt[:50]}... (seed: {seed})")
print(f" Used: {pattern['count']}x, Avg time: {pattern['avg_generation_time']:.3f}s")
recommendations = optimizer.recommend_cache_size()
print(f"\nCache size recommendations:")
print(f" Total unique: {recommendations['total_unique_prompts']}")
print(f" Frequently used: {recommendations['frequently_used']}")
print(f" Recommended size: {recommendations['recommended_size']}")
Advanced Cache Patterns¶
Hierarchical Caching¶
import steadytext
from typing import Dict, Any, Optional
import json
import hashlib
class HierarchicalCache:
"""Implement hierarchical caching for complex workflows."""
def __init__(self):
self.memory_cache = {} # Fast in-memory cache
self.cache_manager = steadytext.get_cache_manager()
def _generate_cache_key(self, category: str, subcategory: str,
prompt: str, seed: int) -> str:
"""Generate hierarchical cache key."""
components = [category, subcategory, prompt, str(seed)]
combined = ":".join(components)
# Create hash for consistent key length
key_hash = hashlib.md5(combined.encode()).hexdigest()
return f"{category}:{subcategory}:{key_hash}"
def get_or_generate(self, category: str, subcategory: str,
prompt: str, seed: int = 42, **kwargs) -> str:
"""Get from cache or generate with hierarchical key."""
cache_key = self._generate_cache_key(category, subcategory, prompt, seed)
# Check memory cache first
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# Generate and cache
result = steadytext.generate(prompt, seed=seed, **kwargs)
# Store in memory cache
self.memory_cache[cache_key] = result
return result
def preload_category(self, category: str, items: List[Dict[str, Any]]):
"""Preload entire category into cache."""
loaded = 0
for item in items:
result = self.get_or_generate(
category,
item.get('subcategory', 'default'),
item['prompt'],
item.get('seed', 42),
**item.get('kwargs', {})
)
loaded += 1
return loaded
def clear_category(self, category: str):
"""Clear all cache entries for a category."""
keys_to_remove = [k for k in self.memory_cache if k.startswith(f"{category}:")]
for key in keys_to_remove:
del self.memory_cache[key]
return len(keys_to_remove)
# Usage example
h_cache = HierarchicalCache()
# Generate with hierarchy
email_subject = h_cache.get_or_generate(
"emails",
"marketing",
"Write a subject line for Black Friday sale",
seed=100
)
email_body = h_cache.get_or_generate(
"emails",
"marketing",
"Write email body for Black Friday sale",
seed=100
)
# Preload documentation category
docs_to_cache = [
{
'subcategory': 'api',
'prompt': 'Document a REST API endpoint',
'seed': 42,
'kwargs': {'max_new_tokens': 200}
},
{
'subcategory': 'functions',
'prompt': 'Document a Python function',
'seed': 42,
'kwargs': {'max_new_tokens': 150}
}
]
loaded = h_cache.preload_category('documentation', docs_to_cache)
print(f"Preloaded {loaded} documentation templates")
Cache-Aware Generation¶
import steadytext
from typing import Optional, Tuple
import time
class CacheAwareGenerator:
"""Generator that adapts based on cache state."""
def __init__(self):
self.cache_manager = steadytext.get_cache_manager()
self.performance_threshold = 0.1 # 100ms
def is_likely_cached(self, prompt: str, seed: int = 42) -> bool:
"""Check if a prompt is likely cached without generating."""
# This is a heuristic - actual implementation would need
# to check cache internals
stats = self.cache_manager.get_cache_stats()
# Simple heuristic: if we have items in cache and
# this is a common prompt pattern
if stats['generation']['size'] > 0:
common_patterns = ['Write a', 'Explain', 'Create', 'Generate']
return any(prompt.startswith(p) for p in common_patterns)
return False
def generate_with_fallback(self, primary_prompt: str,
fallback_prompt: Optional[str] = None,
seed: int = 42, **kwargs) -> Tuple[str, bool]:
"""Generate with fallback if primary isn't cached."""
start_time = time.time()
# Try primary prompt
result = steadytext.generate(primary_prompt, seed=seed, **kwargs)
duration = time.time() - start_time
# If slow (not cached) and we have fallback
if duration > self.performance_threshold and fallback_prompt:
# Check if fallback might be cached
if self.is_likely_cached(fallback_prompt, seed):
fallback_result = steadytext.generate(fallback_prompt, seed=seed, **kwargs)
return fallback_result, True
return result, False
def batch_generate_optimized(self, prompts: List[str], seed: int = 42, **kwargs):
"""Generate batch with cache-aware ordering."""
results = {}
timings = {}
# First pass: try all prompts and measure timing
for prompt in prompts:
start = time.time()
result = steadytext.generate(prompt, seed=seed, **kwargs)
duration = time.time() - start
results[prompt] = result
timings[prompt] = duration
# Analyze cache performance
cached_prompts = [p for p, t in timings.items() if t < self.performance_threshold]
uncached_prompts = [p for p, t in timings.items() if t >= self.performance_threshold]
stats = {
'total': len(prompts),
'cached': len(cached_prompts),
'uncached': len(uncached_prompts),
'cache_rate': len(cached_prompts) / len(prompts) * 100,
'avg_cached_time': sum(timings[p] for p in cached_prompts) / len(cached_prompts) if cached_prompts else 0,
'avg_uncached_time': sum(timings[p] for p in uncached_prompts) / len(uncached_prompts) if uncached_prompts else 0
}
return results, stats
# Usage
cache_gen = CacheAwareGenerator()
# Single generation with fallback
primary = "Generate a complex analysis of quantum computing applications in cryptography"
fallback = "Explain quantum computing" # Likely cached
result, used_fallback = cache_gen.generate_with_fallback(
primary,
fallback,
max_new_tokens=200
)
print(f"Used fallback: {used_fallback}")
# Batch generation with analysis
test_prompts = [
"Write a Python function", # Likely cached
"Explain machine learning", # Likely cached
"Analyze the socioeconomic impact of automation on rural communities", # Unlikely
"Generate test data", # Possibly cached
"Describe the philosophical implications of consciousness in AI systems" # Unlikely
]
results, stats = cache_gen.batch_generate_optimized(test_prompts, max_new_tokens=100)
print("\n=== Batch Generation Cache Stats ===")
print(f"Total prompts: {stats['total']}")
print(f"Cached: {stats['cached']} ({stats['cache_rate']:.1f}%)")
print(f"Average cached time: {stats['avg_cached_time']:.3f}s")
print(f"Average uncached time: {stats['avg_uncached_time']:.3f}s")
print(f"Speed improvement: {stats['avg_uncached_time'] / stats['avg_cached_time']:.1f}x")
Cache Persistence Patterns¶
import steadytext
import json
from pathlib import Path
from typing import Dict, List
import pickle
class CachePersistenceManager:
"""Manage cache persistence and restoration."""
def __init__(self, backup_dir: str = "./cache_backups"):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
self.cache_manager = steadytext.get_cache_manager()
def export_cache_metadata(self) -> Dict:
"""Export cache metadata for analysis."""
stats = self.cache_manager.get_cache_stats()
metadata = {
'timestamp': datetime.now().isoformat(),
'generation_cache': {
'size': stats['generation']['size'],
'capacity': stats['generation'].get('capacity', 256),
'hit_rate': self._calculate_hit_rate(stats['generation'])
},
'embedding_cache': {
'size': stats['embedding']['size'],
'capacity': stats['embedding'].get('capacity', 512),
'hit_rate': self._calculate_hit_rate(stats['embedding'])
}
}
return metadata
def _calculate_hit_rate(self, cache_stats: Dict) -> float:
"""Calculate cache hit rate."""
hits = cache_stats.get('hits', 0)
misses = cache_stats.get('misses', 0)
total = hits + misses
return (hits / total * 100) if total > 0 else 0.0
def save_cache_state(self, name: str):
"""Save current cache state metadata."""
metadata = self.export_cache_metadata()
filename = self.backup_dir / f"cache_state_{name}.json"
with open(filename, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Cache state saved to {filename}")
return filename
def analyze_cache_history(self) -> Dict:
"""Analyze cache performance over time."""
history_files = list(self.backup_dir.glob("cache_state_*.json"))
if not history_files:
return {"error": "No cache history found"}
history = []
for file in sorted(history_files):
with open(file, 'r') as f:
data = json.load(f)
data['filename'] = file.name
history.append(data)
# Analyze trends
analysis = {
'total_snapshots': len(history),
'date_range': {
'start': history[0]['timestamp'],
'end': history[-1]['timestamp']
},
'generation_cache_trend': {
'min_size': min(h['generation_cache']['size'] for h in history),
'max_size': max(h['generation_cache']['size'] for h in history),
'avg_hit_rate': sum(h['generation_cache']['hit_rate'] for h in history) / len(history)
},
'embedding_cache_trend': {
'min_size': min(h['embedding_cache']['size'] for h in history),
'max_size': max(h['embedding_cache']['size'] for h in history),
'avg_hit_rate': sum(h['embedding_cache']['hit_rate'] for h in history) / len(history)
}
}
return analysis
# Usage
persistence = CachePersistenceManager()
# Save current state
persistence.save_cache_state("before_optimization")
# Do some work...
for i in range(10):
steadytext.generate(f"Test prompt {i}", seed=42)
# Save after work
persistence.save_cache_state("after_batch_generation")
# Analyze history
analysis = persistence.analyze_cache_history()
print("\n=== Cache History Analysis ===")
print(json.dumps(analysis, indent=2))
Cache Performance Tuning¶
Benchmark Cache Impact¶
import steadytext
import time
import statistics
from typing import List, Dict
class CacheBenchmark:
"""Benchmark cache performance impact."""
def __init__(self):
self.cache_manager = steadytext.get_cache_manager()
def benchmark_single_prompt(self, prompt: str, seed: int = 42,
iterations: int = 10) -> Dict:
"""Benchmark a single prompt with cold and warm cache."""
# Clear cache for cold start
self.cache_manager.clear_all_caches()
timings = {
'cold': [],
'warm': []
}
# Cold cache timing (first call)
start = time.time()
_ = steadytext.generate(prompt, seed=seed)
timings['cold'].append(time.time() - start)
# Warm cache timings
for _ in range(iterations - 1):
start = time.time()
_ = steadytext.generate(prompt, seed=seed)
timings['warm'].append(time.time() - start)
return {
'prompt': prompt[:50] + '...' if len(prompt) > 50 else prompt,
'cold_time': timings['cold'][0],
'warm_avg': statistics.mean(timings['warm']),
'warm_std': statistics.stdev(timings['warm']) if len(timings['warm']) > 1 else 0,
'speedup': timings['cold'][0] / statistics.mean(timings['warm'])
}
def benchmark_cache_sizes(self, test_prompts: List[str],
cache_sizes: List[int]) -> Dict:
"""Benchmark performance with different cache sizes."""
results = {}
original_capacity = os.environ.get('STEADYTEXT_GENERATION_CACHE_CAPACITY', '256')
try:
for size in cache_sizes:
# Set cache size
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = str(size)
# Restart cache with new size
# Note: In practice, this would require restarting the process
self.cache_manager.clear_all_caches()
# Benchmark with this cache size
hit_count = 0
total_time = 0
for i, prompt in enumerate(test_prompts):
start = time.time()
_ = steadytext.generate(prompt, seed=42)
duration = time.time() - start
total_time += duration
# Simple hit detection (fast = hit)
if duration < 0.1:
hit_count += 1
results[size] = {
'hit_rate': hit_count / len(test_prompts) * 100,
'avg_time': total_time / len(test_prompts),
'total_time': total_time
}
finally:
# Restore original capacity
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = original_capacity
return results
def find_optimal_cache_size(self, typical_prompts: List[str]) -> int:
"""Find optimal cache size for typical usage."""
unique_prompts = len(set(typical_prompts))
prompt_frequency = {}
for prompt in typical_prompts:
prompt_frequency[prompt] = prompt_frequency.get(prompt, 0) + 1
# Prompts that appear more than once
repeated_prompts = sum(1 for count in prompt_frequency.values() if count > 1)
# Recommend size based on usage pattern
if repeated_prompts / unique_prompts > 0.5:
# High repetition - smaller cache OK
optimal = int(unique_prompts * 0.7)
else:
# Low repetition - need larger cache
optimal = int(unique_prompts * 1.2)
# Ensure reasonable bounds
optimal = max(64, min(optimal, 1024))
return optimal
# Run benchmarks
benchmark = CacheBenchmark()
# Single prompt benchmark
prompt = "Write a comprehensive guide to Python decorators"
result = benchmark.benchmark_single_prompt(prompt, iterations=20)
print("=== Single Prompt Benchmark ===")
print(f"Prompt: {result['prompt']}")
print(f"Cold cache: {result['cold_time']:.3f}s")
print(f"Warm cache: {result['warm_avg']:.3f}s ± {result['warm_std']:.3f}s")
print(f"Speedup: {result['speedup']:.1f}x")
# Typical usage pattern
typical_prompts = [
"Write a function",
"Explain this error",
"Write a function", # Repeated
"Generate test data",
"Write a function", # Popular
"Create documentation",
"Explain this error", # Repeated
"Optimize code",
"Write unit tests",
"Write a function" # Very popular
]
optimal = benchmark.find_optimal_cache_size(typical_prompts)
print(f"\nRecommended cache size for your usage: {optimal}")
Cache Debugging¶
Cache Inspector¶
import steadytext
from typing import Optional
import json
class CacheInspector:
"""Debug and inspect cache behavior."""
def __init__(self):
self.cache_manager = steadytext.get_cache_manager()
self.generation_log = []
def trace_generation(self, prompt: str, seed: int = 42, **kwargs):
"""Trace a generation through the cache system."""
# Get initial stats
stats_before = self.cache_manager.get_cache_stats()
# Time the generation
import time
start_time = time.time()
result = steadytext.generate(prompt, seed=seed, **kwargs)
duration = time.time() - start_time
# Get final stats
stats_after = self.cache_manager.get_cache_stats()
# Analyze what happened
gen_cache_before = stats_before['generation']
gen_cache_after = stats_after['generation']
cache_hit = gen_cache_after.get('hits', 0) > gen_cache_before.get('hits', 0)
trace = {
'prompt': prompt,
'seed': seed,
'duration': duration,
'cache_hit': cache_hit,
'cache_size_before': gen_cache_before['size'],
'cache_size_after': gen_cache_after['size'],
'result_preview': result[:100] + '...' if len(result) > 100 else result
}
self.generation_log.append(trace)
return trace
def analyze_cache_behavior(self):
"""Analyze patterns in cache behavior."""
if not self.generation_log:
return "No generation logs to analyze"
total = len(self.generation_log)
hits = sum(1 for log in self.generation_log if log['cache_hit'])
hit_timings = [log['duration'] for log in self.generation_log if log['cache_hit']]
miss_timings = [log['duration'] for log in self.generation_log if not log['cache_hit']]
analysis = {
'total_generations': total,
'cache_hits': hits,
'cache_misses': total - hits,
'hit_rate': hits / total * 100 if total > 0 else 0,
'avg_hit_time': sum(hit_timings) / len(hit_timings) if hit_timings else 0,
'avg_miss_time': sum(miss_timings) / len(miss_timings) if miss_timings else 0,
'time_saved': sum(miss_timings) - sum(hit_timings) if hit_timings else 0
}
return analysis
def export_trace_log(self, filename: str):
"""Export trace log for analysis."""
with open(filename, 'w') as f:
json.dump(self.generation_log, f, indent=2)
print(f"Trace log exported to {filename}")
# Debug cache behavior
inspector = CacheInspector()
# Trace various generations
test_cases = [
("Write a hello world program", 42),
("Write a hello world program", 42), # Should hit
("Explain recursion", 42),
("Write a hello world program", 100), # Different seed
("Explain recursion", 42), # Should hit
]
print("=== Cache Trace Log ===")
for prompt, seed in test_cases:
trace = inspector.trace_generation(prompt, seed)
print(f"Prompt: {prompt[:30]}... | Seed: {seed}")
print(f" Hit: {trace['cache_hit']} | Time: {trace['duration']:.3f}s")
print(f" Cache size: {trace['cache_size_before']} -> {trace['cache_size_after']}")
print()
# Analyze behavior
analysis = inspector.analyze_cache_behavior()
print("\n=== Cache Behavior Analysis ===")
print(f"Hit rate: {analysis['hit_rate']:.1f}%")
print(f"Average hit time: {analysis['avg_hit_time']:.3f}s")
print(f"Average miss time: {analysis['avg_miss_time']:.3f}s")
print(f"Time saved by cache: {analysis['time_saved']:.3f}s")
# Export for further analysis
inspector.export_trace_log("cache_trace.json")
Best Practices¶
1. Cache Configuration¶
# optimal_config.py - Optimal cache configuration
import os
def configure_cache_for_production():
"""Configure cache for production use."""
# Larger cache for production
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = '1024'
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = '200.0'
# Even larger for embeddings (they're smaller)
os.environ['STEADYTEXT_EMBEDDING_CACHE_CAPACITY'] = '2048'
os.environ['STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB'] = '500.0'
def configure_cache_for_development():
"""Configure cache for development."""
# Smaller cache for development
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = '128'
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = '25.0'
os.environ['STEADYTEXT_EMBEDDING_CACHE_CAPACITY'] = '256'
os.environ['STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB'] = '50.0'
def configure_cache_for_testing():
"""Configure cache for testing."""
# Minimal cache for testing
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = '32'
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = '10.0'
os.environ['STEADYTEXT_EMBEDDING_CACHE_CAPACITY'] = '64'
os.environ['STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB'] = '20.0'
2. Cache Warming Strategy¶
# cache_warmer.py - Strategic cache warming
import steadytext
from typing import List, Dict
class StrategicCacheWarmer:
"""Warm cache based on usage patterns."""
def __init__(self):
self.priority_prompts = {
'high': [], # Always cache
'medium': [], # Cache if space
'low': [] # Cache opportunistically
}
def add_prompts(self, prompts: List[str], priority: str = 'medium'):
"""Add prompts to warming queue."""
self.priority_prompts[priority].extend(prompts)
def warm_cache(self, available_time: float = 30.0):
"""Warm cache within time budget."""
import time
start_time = time.time()
warmed = {'high': 0, 'medium': 0, 'low': 0}
# Process by priority
for priority in ['high', 'medium', 'low']:
for prompt in self.priority_prompts[priority]:
if time.time() - start_time > available_time:
break
_ = steadytext.generate(prompt, max_new_tokens=100)
warmed[priority] += 1
return warmed
# Configure warming
warmer = StrategicCacheWarmer()
# High priority - critical paths
warmer.add_prompts([
"Generate error message",
"Create validation response",
"Format API response"
], priority='high')
# Medium priority - common operations
warmer.add_prompts([
"Write documentation",
"Generate test data",
"Create example"
], priority='medium')
# Low priority - nice to have
warmer.add_prompts([
"Explain concept",
"Generate tutorial"
], priority='low')
# Warm with 10 second budget
warmed = warmer.warm_cache(available_time=10.0)
print(f"Cache warmed: {warmed}")
3. Cache Monitoring¶
# monitor_cache.py - Production cache monitoring
import steadytext
import time
import logging
from datetime import datetime
class ProductionCacheMonitor:
"""Monitor cache in production."""
def __init__(self, alert_threshold: float = 50.0):
self.alert_threshold = alert_threshold
self.logger = logging.getLogger(__name__)
def check_cache_health(self) -> Dict:
"""Check cache health metrics."""
cache_manager = steadytext.get_cache_manager()
stats = cache_manager.get_cache_stats()
health = {
'timestamp': datetime.now().isoformat(),
'healthy': True,
'warnings': []
}
# Check generation cache
gen_stats = stats['generation']
gen_hit_rate = self._calculate_hit_rate(gen_stats)
if gen_hit_rate < self.alert_threshold:
health['warnings'].append(
f"Low generation cache hit rate: {gen_hit_rate:.1f}%"
)
health['healthy'] = False
# Check embedding cache
emb_stats = stats['embedding']
emb_hit_rate = self._calculate_hit_rate(emb_stats)
if emb_hit_rate < self.alert_threshold:
health['warnings'].append(
f"Low embedding cache hit rate: {emb_hit_rate:.1f}%"
)
health['healthy'] = False
# Check cache size
if gen_stats['size'] >= gen_stats.get('capacity', 256) * 0.95:
health['warnings'].append("Generation cache near capacity")
if emb_stats['size'] >= emb_stats.get('capacity', 512) * 0.95:
health['warnings'].append("Embedding cache near capacity")
return health
def _calculate_hit_rate(self, stats: Dict) -> float:
"""Calculate hit rate from stats."""
hits = stats.get('hits', 0)
misses = stats.get('misses', 0)
total = hits + misses
return (hits / total * 100) if total > 0 else 0.0
def continuous_monitoring(self, interval: int = 300):
"""Monitor cache continuously."""
while True:
health = self.check_cache_health()
if not health['healthy']:
self.logger.warning(f"Cache health issues: {health['warnings']}")
else:
self.logger.info("Cache healthy")
time.sleep(interval)
# Set up monitoring
monitor = ProductionCacheMonitor(alert_threshold=60.0)
health = monitor.check_cache_health()
print("=== Cache Health Check ===")
print(f"Status: {'Healthy' if health['healthy'] else 'Issues Detected'}")
if health['warnings']:
print("Warnings:")
for warning in health['warnings']:
print(f" - {warning}")
Summary¶
Effective cache management in SteadyText involves:
- Configuration: Size caches appropriately for your workload
- Warming: Pre-populate cache with common prompts
- Monitoring: Track hit rates and performance
- Optimization: Adjust based on usage patterns
- Debugging: Use tools to understand cache behavior
Remember: A well-tuned cache can provide 10-100x speedup for repeated operations!