Skip to content

Error Handling Guide

Learn how to handle errors gracefully in SteadyText, implement robust fallback strategies, and build resilient applications.

Overview

SteadyText follows a "never fail" philosophy (with v2.1.0+ updates):

  • Functions return None when models are unavailable (v2.1.0+)
  • No exceptions are raised during normal operations
  • Graceful degradation with predictable behavior
  • Clear error indicators for proper handling
  • Deterministic fallbacks respect seed values

Breaking Change in v2.1.0

The deterministic fallback behavior has been disabled. Functions now return None instead of generating fallback text/embeddings when models are unavailable.

Table of Contents

Error Types and Handling

Common Error Scenarios

import steadytext
import logging
from typing import Optional, Union

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_generation_result(result: Optional[str], prompt: str) -> str:
    """Handle generation result with proper error checking."""
    if result is None:
        logger.error(f"Generation failed for prompt: {prompt}")
        # Implement your fallback strategy
        return f"[Error: Unable to generate response for: {prompt}]"

    if not result.strip():
        logger.warning(f"Empty generation for prompt: {prompt}")
        return "[Error: Empty response generated]"

    return result

# Usage example
prompt = "Write a summary"
result = steadytext.generate(prompt, seed=42)
handled_result = handle_generation_result(result, prompt)
print(handled_result)

Error Categories

from enum import Enum
from dataclasses import dataclass
from typing import Optional, Any

class ErrorType(Enum):
    """Types of errors in SteadyText operations."""
    MODEL_NOT_LOADED = "model_not_loaded"
    INVALID_INPUT = "invalid_input"
    DAEMON_UNAVAILABLE = "daemon_unavailable"
    CACHE_ERROR = "cache_error"
    TIMEOUT = "timeout"
    MEMORY_ERROR = "memory_error"
    UNKNOWN = "unknown"

@dataclass
class SteadyTextError:
    """Structured error information."""
    error_type: ErrorType
    message: str
    context: dict
    recoverable: bool
    suggested_action: Optional[str] = None

class ErrorHandler:
    """Centralized error handling for SteadyText operations."""

    def __init__(self):
        self.error_log = []
        self.error_counts = {error_type: 0 for error_type in ErrorType}

    def handle_error(self, error_type: ErrorType, message: str, 
                     context: dict = None, recoverable: bool = True) -> SteadyTextError:
        """Handle and log an error."""
        error = SteadyTextError(
            error_type=error_type,
            message=message,
            context=context or {},
            recoverable=recoverable,
            suggested_action=self._get_suggested_action(error_type)
        )

        self.error_log.append(error)
        self.error_counts[error_type] += 1

        logger.error(f"{error_type.value}: {message}", extra=context)

        return error

    def _get_suggested_action(self, error_type: ErrorType) -> str:
        """Get suggested action for error type."""
        actions = {
            ErrorType.MODEL_NOT_LOADED: "Run 'st models download' to download models",
            ErrorType.INVALID_INPUT: "Check input format and constraints",
            ErrorType.DAEMON_UNAVAILABLE: "Start daemon with 'st daemon start'",
            ErrorType.CACHE_ERROR: "Clear cache with 'st cache --clear'",
            ErrorType.TIMEOUT: "Increase timeout or retry operation",
            ErrorType.MEMORY_ERROR: "Reduce batch size or restart daemon",
            ErrorType.UNKNOWN: "Check logs for more information"
        }
        return actions.get(error_type, "")

    def get_error_summary(self) -> dict:
        """Get summary of all errors."""
        return {
            "total_errors": len(self.error_log),
            "error_counts": dict(self.error_counts),
            "recent_errors": self.error_log[-10:],
            "most_common": max(self.error_counts.items(), key=lambda x: x[1])
        }

# Global error handler
error_handler = ErrorHandler()

Generation Error Handling

Basic Error Handling

import steadytext
from typing import Optional

def safe_generate(prompt: str, seed: int = 42, max_retries: int = 3) -> Optional[str]:
    """Generate text with retry logic and error handling."""
    for attempt in range(max_retries):
        try:
            result = steadytext.generate(prompt, seed=seed)

            if result is None:
                logger.warning(f"Generation returned None (attempt {attempt + 1}/{max_retries})")
                if attempt < max_retries - 1:
                    time.sleep(0.5 * (attempt + 1))  # Exponential backoff
                    continue
                else:
                    error_handler.handle_error(
                        ErrorType.MODEL_NOT_LOADED,
                        "Generation failed after all retries",
                        {"prompt": prompt, "seed": seed, "attempts": max_retries}
                    )
                    return None

            return result

        except Exception as e:
            logger.exception(f"Unexpected error in generation: {e}")
            error_handler.handle_error(
                ErrorType.UNKNOWN,
                str(e),
                {"prompt": prompt, "seed": seed, "attempt": attempt + 1}
            )

            if attempt < max_retries - 1:
                time.sleep(0.5 * (attempt + 1))
            else:
                return None

    return None

# Usage
result = safe_generate("Write a poem", seed=123)
if result:
    print(result)
else:
    print("Failed to generate text. Please check the error log.")

Advanced Generation Error Handling

import steadytext
from typing import Optional, Dict, Any, Callable
import time
import hashlib

class RobustGenerator:
    """Robust text generation with comprehensive error handling."""

    def __init__(self, 
                 fallback_strategy: str = "template",
                 cache_fallbacks: bool = True,
                 alert_threshold: int = 5):
        self.fallback_strategy = fallback_strategy
        self.cache_fallbacks = cache_fallbacks
        self.alert_threshold = alert_threshold
        self.fallback_cache = {}
        self.consecutive_failures = 0
        self.success_callbacks = []
        self.failure_callbacks = []

    def on_success(self, callback: Callable):
        """Register success callback."""
        self.success_callbacks.append(callback)

    def on_failure(self, callback: Callable):
        """Register failure callback."""
        self.failure_callbacks.append(callback)

    def generate(self, prompt: str, seed: int = 42, **kwargs) -> str:
        """Generate text with comprehensive error handling."""
        start_time = time.time()

        try:
            # Attempt generation
            result = steadytext.generate(prompt, seed=seed, **kwargs)

            if result is not None:
                # Success
                self.consecutive_failures = 0
                self._notify_success(prompt, result, time.time() - start_time)
                return result

            # Generation failed
            self.consecutive_failures += 1
            self._notify_failure(prompt, "Generation returned None")

            # Check alert threshold
            if self.consecutive_failures >= self.alert_threshold:
                self._trigger_alert(f"Generation failures exceeded threshold: {self.consecutive_failures}")

            # Apply fallback strategy
            return self._apply_fallback(prompt, seed)

        except Exception as e:
            self.consecutive_failures += 1
            self._notify_failure(prompt, str(e))
            return self._apply_fallback(prompt, seed)

    def _apply_fallback(self, prompt: str, seed: int) -> str:
        """Apply fallback strategy based on configuration."""
        # Check cache first
        cache_key = self._get_cache_key(prompt, seed)
        if self.cache_fallbacks and cache_key in self.fallback_cache:
            return self.fallback_cache[cache_key]

        # Generate fallback
        if self.fallback_strategy == "template":
            fallback = self._template_fallback(prompt)
        elif self.fallback_strategy == "hash":
            fallback = self._hash_fallback(prompt, seed)
        elif self.fallback_strategy == "empty":
            fallback = ""
        elif self.fallback_strategy == "error":
            fallback = f"[Error: Unable to generate response for: {prompt[:50]}...]"
        else:
            fallback = "[Generation failed]"

        # Cache fallback
        if self.cache_fallbacks:
            self.fallback_cache[cache_key] = fallback

        return fallback

    def _template_fallback(self, prompt: str) -> str:
        """Generate template-based fallback."""
        templates = {
            "summary": "This is a summary of the requested content.",
            "explanation": "This explains the requested concept.",
            "code": "# Code implementation would go here",
            "story": "Once upon a time, there was a story to be told.",
            "default": "Response generated for: {}"
        }

        # Detect prompt type
        prompt_lower = prompt.lower()
        for key in templates:
            if key in prompt_lower:
                return templates[key].format(prompt[:30] + "...")

        return templates["default"].format(prompt[:30] + "...")

    def _hash_fallback(self, prompt: str, seed: int) -> str:
        """Generate deterministic hash-based fallback."""
        # Create deterministic hash
        hash_input = f"{prompt}:{seed}"
        hash_value = hashlib.sha256(hash_input.encode()).hexdigest()[:8]

        return f"[Fallback response {hash_value} for: {prompt[:30]}...]"

    def _get_cache_key(self, prompt: str, seed: int) -> str:
        """Generate cache key for fallback."""
        return f"{prompt}:{seed}"

    def _notify_success(self, prompt: str, result: str, duration: float):
        """Notify success callbacks."""
        for callback in self.success_callbacks:
            try:
                callback(prompt, result, duration)
            except Exception as e:
                logger.error(f"Error in success callback: {e}")

    def _notify_failure(self, prompt: str, error: str):
        """Notify failure callbacks."""
        for callback in self.failure_callbacks:
            try:
                callback(prompt, error)
            except Exception as e:
                logger.error(f"Error in failure callback: {e}")

    def _trigger_alert(self, message: str):
        """Trigger alert for critical errors."""
        logger.critical(f"ALERT: {message}")
        # Implement your alerting mechanism here
        # e.g., send email, Slack message, PagerDuty alert

    def get_stats(self) -> Dict[str, Any]:
        """Get generator statistics."""
        return {
            "consecutive_failures": self.consecutive_failures,
            "fallback_cache_size": len(self.fallback_cache),
            "fallback_strategy": self.fallback_strategy
        }

# Usage example
generator = RobustGenerator(fallback_strategy="template")

# Register callbacks
generator.on_success(lambda p, r, d: logger.info(f"Generated in {d:.2f}s"))
generator.on_failure(lambda p, e: logger.error(f"Failed: {e}"))

# Generate with robust error handling
result = generator.generate("Write a technical blog post", seed=42)
print(result)

# Check stats
stats = generator.get_stats()
print(f"Generator stats: {stats}")

Embedding Error Handling

Basic Embedding Error Handling

import steadytext
import numpy as np
from typing import Optional

def safe_embed(text: str, seed: int = 42) -> Optional[np.ndarray]:
    """Safely generate embeddings with error handling."""
    try:
        embedding = steadytext.embed(text, seed=seed)

        if embedding is None:
            logger.error(f"Embedding returned None for text: {text[:50]}...")
            return None

        # Validate embedding
        if not isinstance(embedding, np.ndarray):
            logger.error(f"Invalid embedding type: {type(embedding)}")
            return None

        if embedding.shape != (1024,):
            logger.error(f"Invalid embedding shape: {embedding.shape}")
            return None

        # Check for NaN or Inf values
        if np.any(np.isnan(embedding)) or np.any(np.isinf(embedding)):
            logger.error("Embedding contains NaN or Inf values")
            return None

        return embedding

    except Exception as e:
        logger.exception(f"Error generating embedding: {e}")
        return None

# Usage with fallback
def get_embedding_with_fallback(text: str, seed: int = 42) -> np.ndarray:
    """Get embedding with zero-vector fallback."""
    embedding = safe_embed(text, seed=seed)

    if embedding is None:
        logger.warning("Using zero-vector fallback for embedding")
        # Return zero vector with correct shape
        return np.zeros(1024, dtype=np.float32)

    return embedding

Advanced Embedding Error Handling

import steadytext
import numpy as np
from typing import List, Optional, Dict, Tuple
import hashlib

class RobustEmbedder:
    """Robust embedding generation with comprehensive error handling."""

    def __init__(self, 
                 fallback_method: str = "zero",
                 cache_embeddings: bool = True,
                 similarity_threshold: float = 0.95):
        self.fallback_method = fallback_method
        self.cache_embeddings = cache_embeddings
        self.similarity_threshold = similarity_threshold
        self.embedding_cache = {}
        self.error_count = 0
        self.success_count = 0

    def embed(self, text: str, seed: int = 42) -> np.ndarray:
        """Generate embedding with error handling."""
        # Check cache
        cache_key = self._get_cache_key(text, seed)
        if self.cache_embeddings and cache_key in self.embedding_cache:
            return self.embedding_cache[cache_key]

        try:
            # Attempt embedding
            embedding = steadytext.embed(text, seed=seed)

            if embedding is not None and self._validate_embedding(embedding):
                self.success_count += 1

                # Cache successful embedding
                if self.cache_embeddings:
                    self.embedding_cache[cache_key] = embedding

                return embedding

            # Embedding failed
            self.error_count += 1
            return self._generate_fallback(text, seed)

        except Exception as e:
            logger.exception(f"Embedding error: {e}")
            self.error_count += 1
            return self._generate_fallback(text, seed)

    def embed_batch(self, texts: List[str], seed: int = 42) -> List[np.ndarray]:
        """Generate embeddings for multiple texts with error handling."""
        embeddings = []
        failed_indices = []

        for i, text in enumerate(texts):
            try:
                # Use different seed for each text in batch
                text_seed = seed + i
                embedding = self.embed(text, seed=text_seed)
                embeddings.append(embedding)
            except Exception as e:
                logger.error(f"Failed to embed text {i}: {e}")
                failed_indices.append(i)
                embeddings.append(self._generate_fallback(text, seed + i))

        if failed_indices:
            logger.warning(f"Failed to embed {len(failed_indices)} texts: {failed_indices}")

        return embeddings

    def find_similar_cached(self, embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
        """Find similar embeddings from cache."""
        if not self.embedding_cache:
            return []

        similarities = []
        for cache_key, cached_embedding in self.embedding_cache.items():
            similarity = np.dot(embedding, cached_embedding)
            if similarity >= self.similarity_threshold:
                text = cache_key.split(":")[0]  # Extract text from cache key
                similarities.append((text, similarity))

        # Sort by similarity
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]

    def _validate_embedding(self, embedding: np.ndarray) -> bool:
        """Validate embedding array."""
        if not isinstance(embedding, np.ndarray):
            return False

        if embedding.shape != (1024,):
            return False

        if np.any(np.isnan(embedding)) or np.any(np.isinf(embedding)):
            return False

        # Check if embedding is normalized
        norm = np.linalg.norm(embedding)
        if not np.isclose(norm, 1.0, atol=1e-6):
            logger.warning(f"Embedding not normalized: norm={norm}")

        return True

    def _generate_fallback(self, text: str, seed: int) -> np.ndarray:
        """Generate fallback embedding based on method."""
        if self.fallback_method == "zero":
            return np.zeros(1024, dtype=np.float32)

        elif self.fallback_method == "random":
            # Deterministic random based on text and seed
            np.random.seed(hash(f"{text}:{seed}") % (2**32))
            embedding = np.random.randn(1024).astype(np.float32)
            # Normalize
            embedding = embedding / np.linalg.norm(embedding)
            return embedding

        elif self.fallback_method == "hash":
            # Hash-based deterministic embedding
            hash_input = f"{text}:{seed}".encode()
            hash_bytes = hashlib.sha256(hash_input).digest()

            # Convert hash to embedding
            embedding = np.frombuffer(hash_bytes * 32, dtype=np.float32)[:1024]
            # Normalize to [-1, 1] range
            embedding = 2 * (embedding / 255.0) - 1
            # L2 normalize
            embedding = embedding / np.linalg.norm(embedding)
            return embedding

        else:
            return np.zeros(1024, dtype=np.float32)

    def _get_cache_key(self, text: str, seed: int) -> str:
        """Generate cache key."""
        return f"{text}:{seed}"

    def get_stats(self) -> Dict[str, Any]:
        """Get embedder statistics."""
        total = self.success_count + self.error_count
        return {
            "success_count": self.success_count,
            "error_count": self.error_count,
            "success_rate": self.success_count / total if total > 0 else 0,
            "cache_size": len(self.embedding_cache),
            "fallback_method": self.fallback_method
        }

# Usage example
embedder = RobustEmbedder(fallback_method="hash")

# Single embedding
embedding = embedder.embed("test text", seed=42)
print(f"Embedding shape: {embedding.shape}, norm: {np.linalg.norm(embedding):.4f}")

# Batch embedding
texts = ["text 1", "text 2", "text 3"]
embeddings = embedder.embed_batch(texts, seed=100)
print(f"Generated {len(embeddings)} embeddings")

# Find similar
similar = embedder.find_similar_cached(embedding, top_k=3)
print(f"Similar embeddings: {similar}")

# Stats
print(f"Embedder stats: {embedder.get_stats()}")

Streaming Error Handling

Basic Streaming Error Handling

import steadytext
from typing import Iterator, Optional

def safe_generate_iter(prompt: str, seed: int = 42) -> Iterator[str]:
    """Safely generate streaming text with error handling."""
    try:
        stream = steadytext.generate_iter(prompt, seed=seed)

        # Check if stream is empty (indicates error)
        first_token = None
        try:
            first_token = next(stream)
        except StopIteration:
            logger.error("Empty stream returned")
            yield "[Error: No content generated]"
            return

        # Yield first token
        if first_token:
            yield first_token

        # Yield remaining tokens
        for token in stream:
            yield token

    except Exception as e:
        logger.exception(f"Streaming error: {e}")
        yield f"[Error: {str(e)}]"

# Usage with timeout
def generate_with_timeout(prompt: str, seed: int = 42, timeout: float = 30.0) -> str:
    """Generate with streaming and timeout."""
    import signal

    def timeout_handler(signum, frame):
        raise TimeoutError("Generation timed out")

    # Set timeout
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(int(timeout))

    try:
        result = []
        for token in safe_generate_iter(prompt, seed=seed):
            result.append(token)

        # Cancel timeout
        signal.alarm(0)
        return "".join(result)

    except TimeoutError:
        logger.error(f"Generation timed out after {timeout}s")
        return "[Error: Generation timed out]"
    finally:
        # Ensure timeout is cancelled
        signal.alarm(0)

Advanced Streaming Error Handling

import steadytext
from typing import Iterator, Optional, Callable
import time
import threading
from queue import Queue, Empty

class RobustStreamer:
    """Robust streaming generation with comprehensive error handling."""

    def __init__(self,
                 timeout: float = 30.0,
                 max_tokens: int = 512,
                 heartbeat_interval: float = 1.0):
        self.timeout = timeout
        self.max_tokens = max_tokens
        self.heartbeat_interval = heartbeat_interval
        self.error_handlers = []
        self.token_validators = []

    def on_error(self, handler: Callable):
        """Register error handler."""
        self.error_handlers.append(handler)

    def add_token_validator(self, validator: Callable[[str], bool]):
        """Add token validator."""
        self.token_validators.append(validator)

    def generate_stream(self, prompt: str, seed: int = 42) -> Iterator[str]:
        """Generate streaming text with comprehensive error handling."""
        start_time = time.time()
        tokens_generated = 0
        last_token_time = start_time

        try:
            stream = steadytext.generate_iter(prompt, seed=seed)

            for token in stream:
                current_time = time.time()

                # Check timeout
                if current_time - start_time > self.timeout:
                    self._handle_error("Timeout", prompt, tokens_generated)
                    yield "[Timeout]"
                    break

                # Check token count
                if tokens_generated >= self.max_tokens:
                    logger.warning(f"Max tokens ({self.max_tokens}) reached")
                    break

                # Validate token
                if not self._validate_token(token):
                    logger.warning(f"Invalid token detected: {repr(token)}")
                    continue

                # Check for stalled generation
                if current_time - last_token_time > self.heartbeat_interval * 10:
                    self._handle_error("Stalled generation", prompt, tokens_generated)
                    yield "[Stalled]"
                    break

                # Yield valid token
                yield token
                tokens_generated += 1
                last_token_time = current_time

            # Check if generation completed successfully
            if tokens_generated == 0:
                self._handle_error("No tokens generated", prompt, 0)
                yield "[No content]"

        except Exception as e:
            self._handle_error(str(e), prompt, tokens_generated)
            yield f"[Error: {str(e)}]"

    def generate_async(self, prompt: str, seed: int = 42, 
                      callback: Optional[Callable] = None) -> threading.Thread:
        """Generate asynchronously with error handling."""
        result_queue = Queue()

        def worker():
            try:
                tokens = []
                for token in self.generate_stream(prompt, seed):
                    tokens.append(token)
                    if callback:
                        callback(token)

                result_queue.put(("success", "".join(tokens)))

            except Exception as e:
                result_queue.put(("error", str(e)))

        thread = threading.Thread(target=worker)
        thread.start()

        # Return thread and queue for monitoring
        thread.result_queue = result_queue
        return thread

    def generate_with_fallback(self, prompt: str, seed: int = 42,
                              fallback_prompts: Optional[List[str]] = None) -> Iterator[str]:
        """Generate with fallback prompts on error."""
        fallback_prompts = fallback_prompts or [
            f"Please provide a response about: {prompt[:50]}...",
            "Generate a helpful response.",
            "Provide relevant information."
        ]

        # Try main prompt
        tokens = list(self.generate_stream(prompt, seed))
        if self._is_valid_generation(tokens):
            for token in tokens:
                yield token
            return

        # Try fallback prompts
        for i, fallback in enumerate(fallback_prompts):
            logger.info(f"Trying fallback prompt {i+1}")
            tokens = list(self.generate_stream(fallback, seed + i + 1))
            if self._is_valid_generation(tokens):
                for token in tokens:
                    yield token
                return

        # All attempts failed
        yield "[All generation attempts failed]"

    def _validate_token(self, token: str) -> bool:
        """Validate a token."""
        # Basic validation
        if not isinstance(token, str):
            return False

        # Custom validators
        for validator in self.token_validators:
            if not validator(token):
                return False

        return True

    def _is_valid_generation(self, tokens: List[str]) -> bool:
        """Check if generation is valid."""
        if not tokens:
            return False

        content = "".join(tokens)

        # Check for error markers
        if any(marker in content for marker in ["[Error", "[Timeout", "[Stalled", "[No content"]):
            return False

        # Check minimum length
        if len(content.strip()) < 10:
            return False

        return True

    def _handle_error(self, error: str, prompt: str, tokens_generated: int):
        """Handle streaming error."""
        error_info = {
            "error": error,
            "prompt": prompt,
            "tokens_generated": tokens_generated,
            "timestamp": time.time()
        }

        logger.error(f"Streaming error: {error_info}")

        for handler in self.error_handlers:
            try:
                handler(error_info)
            except Exception as e:
                logger.error(f"Error in error handler: {e}")

# Usage example
streamer = RobustStreamer(timeout=20.0, max_tokens=300)

# Add custom token validator
streamer.add_token_validator(lambda token: len(token) < 100)

# Add error handler
streamer.on_error(lambda info: print(f"Error handled: {info['error']}"))

# Stream with error handling
print("Streaming with error handling:")
for token in streamer.generate_stream("Write a story", seed=42):
    print(token, end="", flush=True)
print()

# Async generation
print("\nAsync generation:")
thread = streamer.generate_async(
    "Explain quantum computing",
    seed=123,
    callback=lambda token: print(token, end="", flush=True)
)

# Wait for completion
thread.join()
try:
    status, result = thread.result_queue.get(timeout=1)
    print(f"\nAsync result: {status}")
except Empty:
    print("\nAsync generation did not complete")

# Generation with fallbacks
print("\nGeneration with fallbacks:")
for token in streamer.generate_with_fallback("Complex prompt that might fail", seed=456):
    print(token, end="", flush=True)
print()

Daemon Error Handling

Basic Daemon Error Handling

import steadytext
from steadytext.daemon import use_daemon
from steadytext.daemon.client import is_daemon_running
import subprocess
import time

def ensure_daemon_running(max_retries: int = 3) -> bool:
    """Ensure daemon is running with retries."""
    for attempt in range(max_retries):
        if is_daemon_running():
            return True

        logger.info(f"Daemon not running, attempting to start (attempt {attempt + 1}/{max_retries})")

        try:
            # Start daemon
            result = subprocess.run(
                ["st", "daemon", "start"],
                capture_output=True,
                text=True,
                timeout=10
            )

            if result.returncode == 0:
                # Wait for daemon to be ready
                time.sleep(2)
                if is_daemon_running():
                    logger.info("Daemon started successfully")
                    return True
            else:
                logger.error(f"Failed to start daemon: {result.stderr}")

        except subprocess.TimeoutExpired:
            logger.error("Daemon start timed out")
        except Exception as e:
            logger.error(f"Error starting daemon: {e}")

        time.sleep(1)

    return False

def generate_with_daemon_fallback(prompt: str, seed: int = 42) -> Optional[str]:
    """Generate with automatic daemon fallback."""
    try:
        # Try with daemon first
        with use_daemon():
            return steadytext.generate(prompt, seed=seed)
    except Exception as e:
        logger.warning(f"Daemon generation failed: {e}, falling back to direct mode")

        # Fall back to direct generation
        try:
            return steadytext.generate(prompt, seed=seed)
        except Exception as e2:
            logger.error(f"Direct generation also failed: {e2}")
            return None

# Usage
if ensure_daemon_running():
    result = generate_with_daemon_fallback("Hello world", seed=42)
    if result:
        print(result)
    else:
        print("Generation failed")
else:
    print("Could not start daemon, using direct mode")
    result = steadytext.generate("Hello world", seed=42)

Advanced Daemon Error Handling

import steadytext
from steadytext.daemon import use_daemon
from steadytext.daemon.client import DaemonClient, is_daemon_running
import time
import threading
from typing import Optional, Dict, Any, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class DaemonHealth:
    """Daemon health status."""
    is_running: bool
    response_time: Optional[float]
    last_check: datetime
    consecutive_failures: int
    error_message: Optional[str] = None

class ResilientDaemonClient:
    """Resilient daemon client with comprehensive error handling."""

    def __init__(self,
                 health_check_interval: int = 60,
                 max_consecutive_failures: int = 5,
                 auto_restart: bool = True):
        self.health_check_interval = health_check_interval
        self.max_consecutive_failures = max_consecutive_failures
        self.auto_restart = auto_restart
        self.health = DaemonHealth(
            is_running=False,
            response_time=None,
            last_check=datetime.now(),
            consecutive_failures=0
        )
        self._health_check_thread = None
        self._stop_health_check = threading.Event()
        self._fallback_mode = False
        self._callbacks = {
            "daemon_down": [],
            "daemon_up": [],
            "daemon_slow": [],
            "fallback_activated": []
        }

    def on(self, event: str, callback: Callable):
        """Register event callback."""
        if event in self._callbacks:
            self._callbacks[event].append(callback)

    def start_monitoring(self):
        """Start health monitoring thread."""
        if self._health_check_thread is None or not self._health_check_thread.is_alive():
            self._stop_health_check.clear()
            self._health_check_thread = threading.Thread(
                target=self._health_monitor_loop,
                daemon=True
            )
            self._health_check_thread.start()

    def stop_monitoring(self):
        """Stop health monitoring."""
        self._stop_health_check.set()
        if self._health_check_thread:
            self._health_check_thread.join(timeout=5)

    def generate(self, prompt: str, seed: int = 42, **kwargs) -> Optional[str]:
        """Generate with comprehensive error handling."""
        # Check if we should use fallback mode
        if self._fallback_mode or not self.health.is_running:
            return self._fallback_generate(prompt, seed, **kwargs)

        try:
            # Attempt daemon generation
            start_time = time.time()

            with use_daemon():
                result = steadytext.generate(prompt, seed=seed, **kwargs)

            # Update response time
            response_time = time.time() - start_time
            self._update_health(True, response_time)

            # Check for slow responses
            if response_time > 5.0:
                self._trigger_event("daemon_slow", {"response_time": response_time})

            return result

        except Exception as e:
            logger.error(f"Daemon generation error: {e}")
            self._update_health(False, error=str(e))

            # Check if we should switch to fallback mode
            if self.health.consecutive_failures >= self.max_consecutive_failures:
                self._activate_fallback_mode()

            # Try direct generation
            return self._fallback_generate(prompt, seed, **kwargs)

    def embed(self, text: str, seed: int = 42) -> Optional[np.ndarray]:
        """Embed with comprehensive error handling."""
        # Similar implementation to generate
        pass

    def _health_monitor_loop(self):
        """Background health monitoring loop."""
        while not self._stop_health_check.is_set():
            try:
                self._perform_health_check()
            except Exception as e:
                logger.error(f"Health check error: {e}")

            # Wait for next check
            self._stop_health_check.wait(self.health_check_interval)

    def _perform_health_check(self):
        """Perform daemon health check."""
        try:
            start_time = time.time()

            # Check if daemon is running
            if not is_daemon_running():
                self._update_health(False, error="Daemon not running")

                if self.auto_restart and self.health.consecutive_failures > 0:
                    self._attempt_restart()
                return

            # Test daemon responsiveness
            client = DaemonClient()
            response = client._send_request({"type": "ping"})

            if response and response.get("success"):
                response_time = time.time() - start_time
                self._update_health(True, response_time)

                # Check if we can deactivate fallback mode
                if self._fallback_mode and self.health.consecutive_failures == 0:
                    self._deactivate_fallback_mode()
            else:
                self._update_health(False, error="Ping failed")

        except Exception as e:
            self._update_health(False, error=str(e))

    def _update_health(self, success: bool, response_time: Optional[float] = None,
                      error: Optional[str] = None):
        """Update health status."""
        self.health.last_check = datetime.now()

        if success:
            self.health.is_running = True
            self.health.response_time = response_time
            self.health.error_message = None

            if self.health.consecutive_failures > 0:
                # Daemon recovered
                self.health.consecutive_failures = 0
                self._trigger_event("daemon_up", {"response_time": response_time})
        else:
            self.health.consecutive_failures += 1
            self.health.error_message = error

            if self.health.consecutive_failures == 1:
                # Daemon just went down
                self._trigger_event("daemon_down", {"error": error})

            if self.health.consecutive_failures >= self.max_consecutive_failures:
                self.health.is_running = False

    def _activate_fallback_mode(self):
        """Activate fallback mode."""
        if not self._fallback_mode:
            self._fallback_mode = True
            logger.warning("Activating fallback mode due to daemon failures")
            self._trigger_event("fallback_activated", {
                "consecutive_failures": self.health.consecutive_failures
            })

    def _deactivate_fallback_mode(self):
        """Deactivate fallback mode."""
        if self._fallback_mode:
            self._fallback_mode = False
            logger.info("Deactivating fallback mode - daemon recovered")

    def _fallback_generate(self, prompt: str, seed: int = 42, **kwargs) -> Optional[str]:
        """Fallback generation without daemon."""
        try:
            logger.info("Using direct generation (fallback mode)")
            return steadytext.generate(prompt, seed=seed, **kwargs)
        except Exception as e:
            logger.error(f"Fallback generation error: {e}")
            return None

    def _attempt_restart(self):
        """Attempt to restart daemon."""
        logger.info("Attempting to restart daemon")

        try:
            # Stop daemon if running
            subprocess.run(["st", "daemon", "stop"], timeout=5)
            time.sleep(1)

            # Start daemon
            result = subprocess.run(
                ["st", "daemon", "start"],
                capture_output=True,
                text=True,
                timeout=10
            )

            if result.returncode == 0:
                logger.info("Daemon restart successful")
                time.sleep(2)  # Wait for startup
            else:
                logger.error(f"Daemon restart failed: {result.stderr}")

        except Exception as e:
            logger.error(f"Error restarting daemon: {e}")

    def _trigger_event(self, event: str, data: Dict[str, Any]):
        """Trigger event callbacks."""
        for callback in self._callbacks.get(event, []):
            try:
                callback(data)
            except Exception as e:
                logger.error(f"Error in {event} callback: {e}")

    def get_status(self) -> Dict[str, Any]:
        """Get current status."""
        return {
            "health": {
                "is_running": self.health.is_running,
                "response_time": self.health.response_time,
                "last_check": self.health.last_check.isoformat(),
                "consecutive_failures": self.health.consecutive_failures,
                "error_message": self.health.error_message
            },
            "fallback_mode": self._fallback_mode,
            "monitoring": self._health_check_thread.is_alive() if self._health_check_thread else False
        }

# Usage example
client = ResilientDaemonClient(auto_restart=True)

# Register event handlers
client.on("daemon_down", lambda data: print(f"Daemon down: {data}"))
client.on("daemon_up", lambda data: print(f"Daemon recovered: {data}"))
client.on("daemon_slow", lambda data: print(f"Slow response: {data}"))
client.on("fallback_activated", lambda data: print(f"Fallback mode: {data}"))

# Start monitoring
client.start_monitoring()

# Use with automatic error handling
result = client.generate("Write a poem", seed=42)
if result:
    print(result)
else:
    print("Generation failed")

# Check status
status = client.get_status()
print(f"Client status: {status}")

# Stop monitoring when done
client.stop_monitoring()

CLI Error Handling

Shell Script Error Handling

#!/bin/bash
# robust_cli.sh - Robust CLI usage with error handling

set -euo pipefail  # Exit on error, undefined variable, pipe failure

# Error handling function
handle_error() {
    local exit_code=$?
    local line_number=$1
    echo "Error on line $line_number: Command exited with status $exit_code" >&2

    # Log error
    echo "[$(date)] Error on line $line_number, exit code $exit_code" >> steadytext_errors.log

    # Cleanup if needed
    cleanup

    exit $exit_code
}

# Set error trap
trap 'handle_error ${LINENO}' ERR

# Cleanup function
cleanup() {
    # Remove temporary files
    rm -f /tmp/steadytext_temp_*
}

# Function to safely generate text
safe_generate() {
    local prompt="$1"
    local seed="${2:-42}"
    local max_retries=3
    local retry_count=0

    while [ $retry_count -lt $max_retries ]; do
        if result=$(st generate "$prompt" --seed "$seed" --json 2>/dev/null); then
            # Extract text from JSON
            if text=$(echo "$result" | jq -r '.text' 2>/dev/null); then
                echo "$text"
                return 0
            else
                echo "Error: Invalid JSON response" >&2
            fi
        else
            echo "Error: Generation failed (attempt $((retry_count + 1))/$max_retries)" >&2
        fi

        retry_count=$((retry_count + 1))
        sleep 1
    done

    return 1
}

# Function to check daemon status
check_daemon() {
    if st daemon status >/dev/null 2>&1; then
        echo "Daemon is running"
        return 0
    else
        echo "Daemon is not running"
        return 1
    fi
}

# Function to ensure daemon is running
ensure_daemon() {
    if ! check_daemon; then
        echo "Starting daemon..."
        if st daemon start; then
            sleep 2
            if check_daemon; then
                echo "Daemon started successfully"
                return 0
            fi
        fi
        echo "Failed to start daemon" >&2
        return 1
    fi
    return 0
}

# Main script
main() {
    echo "SteadyText Robust CLI Example"
    echo "============================="

    # Ensure daemon is running (optional)
    if ensure_daemon; then
        echo "Using daemon mode"
    else
        echo "Using direct mode"
    fi

    # Generate with error handling
    echo -e "\nGenerating text..."
    if text=$(safe_generate "Write a haiku about error handling" 123); then
        echo "Generated text:"
        echo "$text"
    else
        echo "Failed to generate text"
        exit 1
    fi

    # Batch processing with error handling
    echo -e "\nBatch processing..."
    prompts=("Task 1" "Task 2" "Task 3")

    for i in "${!prompts[@]}"; do
        prompt="${prompts[$i]}"
        echo -n "Processing '$prompt': "

        if result=$(safe_generate "$prompt" $((100 + i))); then
            echo "Success"
            echo "$result" > "output_$i.txt"
        else
            echo "Failed"
            # Continue with next prompt instead of exiting
        fi
    done

    echo -e "\nCompleted successfully"
}

# Run main function
main "$@"

Python CLI Wrapper

#!/usr/bin/env python3
"""
robust_cli_wrapper.py - Robust wrapper for SteadyText CLI
"""

import subprocess
import json
import time
import sys
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SteadyTextCLI:
    """Robust wrapper for SteadyText CLI with error handling."""

    def __init__(self, 
                 timeout: int = 30,
                 max_retries: int = 3,
                 retry_delay: float = 1.0):
        self.timeout = timeout
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.daemon_checked = False
        self.daemon_available = False

    def _run_command(self, cmd: List[str], input_text: Optional[str] = None) -> subprocess.CompletedProcess:
        """Run CLI command with timeout and error handling."""
        try:
            result = subprocess.run(
                cmd,
                input=input_text,
                capture_output=True,
                text=True,
                timeout=self.timeout
            )
            return result

        except subprocess.TimeoutExpired as e:
            logger.error(f"Command timed out: {' '.join(cmd)}")
            raise
        except Exception as e:
            logger.error(f"Command error: {e}")
            raise

    def check_daemon(self) -> bool:
        """Check if daemon is running."""
        if not self.daemon_checked:
            try:
                result = self._run_command(["st", "daemon", "status", "--json"])
                if result.returncode == 0:
                    status = json.loads(result.stdout)
                    self.daemon_available = status.get("running", False)
            except:
                self.daemon_available = False

            self.daemon_checked = True
            logger.info(f"Daemon available: {self.daemon_available}")

        return self.daemon_available

    def generate(self, prompt: str, seed: int = 42, **kwargs) -> Optional[str]:
        """Generate text with error handling and retries."""
        cmd = ["st", "generate", prompt, "--seed", str(seed), "--json", "--wait"]

        # Add additional options
        if "max_new_tokens" in kwargs:
            cmd.extend(["--max-new-tokens", str(kwargs["max_new_tokens"])])

        for attempt in range(self.max_retries):
            try:
                result = self._run_command(cmd)

                if result.returncode == 0:
                    # Parse JSON response
                    try:
                        data = json.loads(result.stdout)
                        return data.get("text")
                    except json.JSONDecodeError:
                        logger.error(f"Invalid JSON response: {result.stdout}")
                else:
                    logger.error(f"Generation failed: {result.stderr}")

            except subprocess.TimeoutExpired:
                logger.error(f"Generation timed out (attempt {attempt + 1}/{self.max_retries})")
            except Exception as e:
                logger.error(f"Generation error: {e}")

            if attempt < self.max_retries - 1:
                time.sleep(self.retry_delay * (attempt + 1))

        return None

    def generate_stream(self, prompt: str, seed: int = 42, callback=None) -> bool:
        """Stream generation with error handling."""
        cmd = ["st", "generate", prompt, "--seed", str(seed)]

        try:
            process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                bufsize=1
            )

            # Read output character by character
            while True:
                char = process.stdout.read(1)
                if not char:
                    break

                if callback:
                    callback(char)
                else:
                    print(char, end="", flush=True)

            # Wait for process to complete
            process.wait(timeout=self.timeout)

            return process.returncode == 0

        except subprocess.TimeoutExpired:
            logger.error("Streaming generation timed out")
            process.kill()
            return False
        except Exception as e:
            logger.error(f"Streaming error: {e}")
            return False

    def embed(self, text: str, seed: int = 42) -> Optional[List[float]]:
        """Generate embedding with error handling."""
        cmd = ["st", "embed", text, "--seed", str(seed), "--format", "json"]

        for attempt in range(self.max_retries):
            try:
                result = self._run_command(cmd)

                if result.returncode == 0:
                    # Parse JSON array
                    try:
                        embedding = json.loads(result.stdout)
                        if isinstance(embedding, list) and len(embedding) == 1024:
                            return embedding
                        else:
                            logger.error("Invalid embedding format")
                    except json.JSONDecodeError:
                        logger.error("Invalid JSON embedding")
                else:
                    logger.error(f"Embedding failed: {result.stderr}")

            except Exception as e:
                logger.error(f"Embedding error: {e}")

            if attempt < self.max_retries - 1:
                time.sleep(self.retry_delay)

        return None

    def batch_generate(self, prompts: List[str], seeds: Optional[List[int]] = None) -> List[Optional[str]]:
        """Batch generate with parallel processing."""
        import concurrent.futures

        if seeds is None:
            seeds = [42 + i for i in range(len(prompts))]

        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = [
                executor.submit(self.generate, prompt, seed)
                for prompt, seed in zip(prompts, seeds)
            ]

            for future in concurrent.futures.as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logger.error(f"Batch generation error: {e}")
                    results.append(None)

        return results

def main():
    """Example usage of robust CLI wrapper."""
    cli = SteadyTextCLI()

    # Check daemon
    if cli.check_daemon():
        print("✓ Daemon is running")
    else:
        print("✗ Daemon not available, using direct mode")

    # Single generation
    print("\nGenerating text...")
    text = cli.generate("Write a short poem", seed=123)
    if text:
        print(f"Generated: {text}")
    else:
        print("Generation failed")

    # Streaming generation
    print("\nStreaming generation...")
    success = cli.generate_stream("Tell me a story", seed=456)
    print(f"\nStreaming {'succeeded' if success else 'failed'}")

    # Batch generation
    print("\nBatch generation...")
    prompts = ["Task 1", "Task 2", "Task 3"]
    results = cli.batch_generate(prompts)
    for i, (prompt, result) in enumerate(zip(prompts, results)):
        status = "✓" if result else "✗"
        print(f"{status} {prompt}: {result[:50] if result else 'Failed'}...")

if __name__ == "__main__":
    main()

Production Patterns

Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta
from typing import Optional, Callable, Any
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for SteadyText operations."""

    def __init__(self,
                 failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = threading.Lock()

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection."""
        with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self) -> bool:
        """Check if we should attempt to reset circuit."""
        return (self.last_failure_time and
                datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout))

    def _on_success(self):
        """Handle successful call."""
        with self._lock:
            self.failure_count = 0
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED

    def _on_failure(self):
        """Handle failed call."""
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            elif self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.OPEN

    def get_state(self) -> Dict[str, Any]:
        """Get circuit breaker state."""
        with self._lock:
            return {
                "state": self.state.value,
                "failure_count": self.failure_count,
                "last_failure": self.last_failure_time.isoformat() if self.last_failure_time else None
            }

# Usage with SteadyText
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def protected_generate(prompt: str, seed: int = 42) -> Optional[str]:
    """Generate with circuit breaker protection."""
    try:
        return circuit_breaker.call(steadytext.generate, prompt, seed=seed)
    except Exception as e:
        logger.error(f"Circuit breaker triggered: {e}")
        return None

Retry with Exponential Backoff

import time
import random
from typing import TypeVar, Callable, Optional, Any

T = TypeVar('T')

def retry_with_backoff(
    func: Callable[..., T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> Callable[..., Optional[T]]:
    """Decorator for retry with exponential backoff."""

    def wrapper(*args, **kwargs) -> Optional[T]:
        last_exception = None

        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e

                if attempt == max_retries - 1:
                    logger.error(f"All {max_retries} attempts failed: {e}")
                    break

                # Calculate delay with exponential backoff
                delay = min(base_delay * (exponential_base ** attempt), max_delay)

                # Add jitter
                if jitter:
                    delay = delay * (0.5 + random.random())

                logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}")
                time.sleep(delay)

        return None

    return wrapper

# Apply to SteadyText functions
@retry_with_backoff
def robust_generate(prompt: str, seed: int = 42) -> Optional[str]:
    """Generate with automatic retry."""
    result = steadytext.generate(prompt, seed=seed)
    if result is None:
        raise Exception("Generation returned None")
    return result

@retry_with_backoff
def robust_embed(text: str, seed: int = 42) -> Optional[np.ndarray]:
    """Embed with automatic retry."""
    result = steadytext.embed(text, seed=seed)
    if result is None:
        raise Exception("Embedding returned None")
    return result

Monitoring and Alerting

Error Monitoring System

import time
import json
from datetime import datetime, timedelta
from collections import deque, defaultdict
from typing import Dict, List, Any, Optional
import smtplib
from email.mime.text import MIMEText

class ErrorMonitor:
    """Comprehensive error monitoring for SteadyText."""

    def __init__(self,
                 window_size: int = 1000,
                 alert_threshold: int = 10,
                 alert_window: int = 300):  # 5 minutes
        self.window_size = window_size
        self.alert_threshold = alert_threshold
        self.alert_window = alert_window
        self.errors = deque(maxlen=window_size)
        self.error_counts = defaultdict(int)
        self.alert_sent = {}
        self.metrics = {
            "total_errors": 0,
            "errors_by_type": defaultdict(int),
            "errors_by_hour": defaultdict(int),
            "recent_error_rate": 0.0
        }

    def record_error(self, error_type: str, error_message: str,
                    context: Optional[Dict[str, Any]] = None):
        """Record an error occurrence."""
        error = {
            "timestamp": datetime.now(),
            "type": error_type,
            "message": error_message,
            "context": context or {}
        }

        self.errors.append(error)
        self.error_counts[error_type] += 1
        self.metrics["total_errors"] += 1
        self.metrics["errors_by_type"][error_type] += 1

        # Update hourly metrics
        hour = datetime.now().strftime("%Y-%m-%d %H:00")
        self.metrics["errors_by_hour"][hour] += 1

        # Check if alert needed
        self._check_alert_condition(error_type)

    def _check_alert_condition(self, error_type: str):
        """Check if we should send an alert."""
        # Count recent errors of this type
        cutoff_time = datetime.now() - timedelta(seconds=self.alert_window)
        recent_errors = sum(
            1 for error in self.errors
            if error["type"] == error_type and error["timestamp"] > cutoff_time
        )

        # Check threshold
        if recent_errors >= self.alert_threshold:
            last_alert = self.alert_sent.get(error_type)
            if not last_alert or datetime.now() - last_alert > timedelta(seconds=self.alert_window):
                self._send_alert(error_type, recent_errors)
                self.alert_sent[error_type] = datetime.now()

    def _send_alert(self, error_type: str, error_count: int):
        """Send alert notification."""
        message = f"""
        SteadyText Error Alert

        Error Type: {error_type}
        Count: {error_count} errors in last {self.alert_window} seconds
        Threshold: {self.alert_threshold}
        Time: {datetime.now().isoformat()}

        Recent errors:
        """

        # Add recent errors
        recent = [e for e in self.errors if e["type"] == error_type][-5:]
        for error in recent:
            message += f"\n- {error['timestamp']}: {error['message']}"

        logger.critical(f"ALERT: {message}")

        # Implement your alert mechanism here
        # e.g., send email, Slack, PagerDuty, etc.

    def get_error_rate(self, window_seconds: int = 60) -> float:
        """Calculate error rate in errors per second."""
        cutoff_time = datetime.now() - timedelta(seconds=window_seconds)
        recent_errors = sum(
            1 for error in self.errors
            if error["timestamp"] > cutoff_time
        )
        return recent_errors / window_seconds

    def get_report(self) -> Dict[str, Any]:
        """Generate error report."""
        return {
            "summary": {
                "total_errors": self.metrics["total_errors"],
                "unique_error_types": len(self.error_counts),
                "error_rate_per_minute": self.get_error_rate(60) * 60,
                "most_common_error": max(self.error_counts.items(), key=lambda x: x[1]) if self.error_counts else None
            },
            "errors_by_type": dict(self.metrics["errors_by_type"]),
            "recent_errors": [
                {
                    "timestamp": e["timestamp"].isoformat(),
                    "type": e["type"],
                    "message": e["message"]
                }
                for e in list(self.errors)[-10:]
            ],
            "alerts_sent": {
                error_type: timestamp.isoformat()
                for error_type, timestamp in self.alert_sent.items()
            }
        }

    def export_metrics(self, filepath: str):
        """Export metrics to file."""
        with open(filepath, 'w') as f:
            json.dump(self.get_report(), f, indent=2)

# Global error monitor
error_monitor = ErrorMonitor(alert_threshold=5, alert_window=300)

# Integration with SteadyText operations
def monitored_generate(prompt: str, seed: int = 42) -> Optional[str]:
    """Generate with error monitoring."""
    try:
        result = steadytext.generate(prompt, seed=seed)

        if result is None:
            error_monitor.record_error(
                "generation_failed",
                "Generation returned None",
                {"prompt": prompt[:50], "seed": seed}
            )

        return result

    except Exception as e:
        error_monitor.record_error(
            "generation_exception",
            str(e),
            {"prompt": prompt[:50], "seed": seed}
        )
        return None

Recovery Strategies

Graceful Degradation

class GracefulDegradationManager:
    """Manage graceful degradation strategies."""

    def __init__(self):
        self.degradation_levels = {
            0: "full_service",
            1: "reduced_quality",
            2: "cached_only",
            3: "static_responses",
            4: "maintenance_mode"
        }
        self.current_level = 0
        self.level_thresholds = {
            "error_rate": [0.1, 0.3, 0.5, 0.7, 0.9],
            "response_time": [2.0, 5.0, 10.0, 20.0, 30.0]
        }

    def evaluate_service_health(self, metrics: Dict[str, float]) -> int:
        """Evaluate service health and determine degradation level."""
        error_rate = metrics.get("error_rate", 0.0)
        response_time = metrics.get("response_time", 0.0)

        # Determine level based on metrics
        level = 0
        for i, (error_threshold, time_threshold) in enumerate(
            zip(self.level_thresholds["error_rate"], 
                self.level_thresholds["response_time"])
        ):
            if error_rate > error_threshold or response_time > time_threshold:
                level = i + 1

        return min(level, 4)

    def apply_degradation_strategy(self, level: int, operation: str, **kwargs) -> Any:
        """Apply appropriate degradation strategy."""
        self.current_level = level
        strategy = self.degradation_levels[level]

        logger.info(f"Applying degradation strategy: {strategy}")

        if strategy == "full_service":
            return self._full_service(operation, **kwargs)
        elif strategy == "reduced_quality":
            return self._reduced_quality(operation, **kwargs)
        elif strategy == "cached_only":
            return self._cached_only(operation, **kwargs)
        elif strategy == "static_responses":
            return self._static_responses(operation, **kwargs)
        else:  # maintenance_mode
            return self._maintenance_mode(operation, **kwargs)

    def _full_service(self, operation: str, **kwargs):
        """Normal operation."""
        if operation == "generate":
            return steadytext.generate(**kwargs)
        elif operation == "embed":
            return steadytext.embed(**kwargs)

    def _reduced_quality(self, operation: str, **kwargs):
        """Reduced quality but faster."""
        if operation == "generate":
            # Reduce token count
            kwargs["max_new_tokens"] = min(kwargs.get("max_new_tokens", 512), 100)
            return steadytext.generate(**kwargs)

    def _cached_only(self, operation: str, **kwargs):
        """Return only cached responses."""
        # Check cache directly
        cache_manager = get_cache_manager()
        # Implement cache-only logic
        return None

    def _static_responses(self, operation: str, **kwargs):
        """Return static pre-defined responses."""
        static_responses = {
            "generate": "Service is currently limited. Please try again later.",
            "embed": np.zeros(1024, dtype=np.float32)
        }
        return static_responses.get(operation)

    def _maintenance_mode(self, operation: str, **kwargs):
        """System in maintenance mode."""
        return None

Best Practices

1. Comprehensive Error Handler

class SteadyTextErrorHandler:
    """Comprehensive error handler for all SteadyText operations."""

    def __init__(self):
        self.handlers = {
            "generation": self._handle_generation_error,
            "embedding": self._handle_embedding_error,
            "streaming": self._handle_streaming_error,
            "daemon": self._handle_daemon_error
        }
        self.fallback_strategies = {
            "generation": self._generation_fallback,
            "embedding": self._embedding_fallback
        }

    def handle(self, operation: str, error: Any, context: Dict[str, Any]) -> Any:
        """Central error handling."""
        handler = self.handlers.get(operation, self._default_handler)
        return handler(error, context)

    def _handle_generation_error(self, error: Any, context: Dict[str, Any]):
        """Handle generation errors."""
        logger.error(f"Generation error: {error}", extra=context)

        # Try fallback
        fallback = self.fallback_strategies["generation"]
        return fallback(context)

    def _generation_fallback(self, context: Dict[str, Any]) -> str:
        """Generation fallback strategy."""
        prompt = context.get("prompt", "")
        seed = context.get("seed", 42)

        # Try different approaches
        approaches = [
            lambda: f"[Unable to generate response for: {prompt[:50]}...]",
            lambda: "[Service temporarily unavailable]",
            lambda: ""
        ]

        for approach in approaches:
            try:
                return approach()
            except:
                continue

        return "[Critical error]"

2. Error Context Manager

from contextlib import contextmanager

@contextmanager
def error_handling(operation: str, **context):
    """Context manager for consistent error handling."""
    start_time = time.time()
    try:
        yield
    except Exception as e:
        duration = time.time() - start_time

        # Log error with context
        logger.error(f"{operation} failed after {duration:.2f}s", extra={
            "operation": operation,
            "error": str(e),
            "error_type": type(e).__name__,
            "duration": duration,
            **context
        })

        # Record in monitoring
        error_monitor.record_error(
            f"{operation}_error",
            str(e),
            context
        )

        # Re-raise or handle based on configuration
        if should_reraise(e):
            raise
        else:
            return handle_gracefully(operation, e, context)

# Usage
with error_handling("generation", prompt="test", seed=42):
    result = steadytext.generate("test", seed=42)

3. Testing Error Scenarios

import unittest
from unittest.mock import patch, MagicMock

class TestErrorHandling(unittest.TestCase):
    """Test error handling scenarios."""

    def test_generation_returns_none(self):
        """Test handling when generation returns None."""
        handler = SteadyTextErrorHandler()

        with patch('steadytext.generate', return_value=None):
            result = monitored_generate("test prompt", seed=42)
            self.assertIsNone(result)

            # Check error was recorded
            report = error_monitor.get_report()
            self.assertGreater(report["summary"]["total_errors"], 0)

    def test_daemon_failure_fallback(self):
        """Test daemon failure with fallback."""
        with patch('steadytext.daemon.client.is_daemon_running', return_value=False):
            result = generate_with_daemon_fallback("test", seed=42)
            self.assertIsNotNone(result)  # Should use direct mode

    def test_circuit_breaker_opens(self):
        """Test circuit breaker opening after failures."""
        breaker = CircuitBreaker(failure_threshold=2)

        def failing_func():
            raise Exception("Test failure")

        # First failures
        for _ in range(2):
            with self.assertRaises(Exception):
                breaker.call(failing_func)

        # Circuit should be open
        self.assertEqual(breaker.state, CircuitState.OPEN)

        # Further calls should fail immediately
        with self.assertRaises(Exception) as ctx:
            breaker.call(failing_func)
        self.assertIn("Circuit breaker is OPEN", str(ctx.exception))

This comprehensive guide covers all aspects of error handling in SteadyText, from basic None checks to advanced production patterns like circuit breakers and graceful degradation. The key principle is that SteadyText's "never fail" philosophy requires careful handling of None returns and proper fallback strategies.