Daemon Usage Guide¶
Learn how to use SteadyText's daemon mode for persistent model serving and 160x faster response times.
Overview¶
The SteadyText daemon is a background service that keeps models loaded in memory, eliminating the 2-3 second startup overhead for each operation. It provides:
- 160x faster first response - No model loading delay
- Shared cache - All clients benefit from cached results
- Automatic fallback - Operations work without daemon
- Zero configuration - Used by default when available
- Thread-safe - Handles concurrent requests efficiently
Table of Contents¶
- Understanding the Daemon
- Starting and Stopping
- Configuration
- Python SDK Usage
- CLI Integration
- Production Deployment
- Monitoring and Debugging
- Performance Optimization
- Troubleshooting
- Best Practices
Understanding the Daemon¶
Architecture¶
┌─────────────────────────────────────────────┐
│ Client Applications │
├─────────────────────────────────────────────┤
│ Python SDK │ CLI Tools │ Custom Apps │
├─────────────────────────────────────────────┤
│ ZeroMQ Client Layer │
│ (Automatic Fallback) │
├─────────────────────────────────────────────┤
│ ZeroMQ REP Server │
│ (TCP Port 5557) │
├─────────────────────────────────────────────┤
│ Daemon Server Process │
│ ┌─────────────────────────────────────┐ │
│ │ Loaded Models │ Shared Cache │ │
│ │ - Gemma-3n │ - Generation │ │
│ │ - Qwen3 │ - Embeddings │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
How It Works¶
- First Request: Client checks if daemon is running
- Daemon Available: Sends request via ZeroMQ
- Daemon Unavailable: Falls back to direct model loading
- Response: Client receives result (cached or generated)
Starting and Stopping¶
Basic Commands¶
# Start daemon in background (default)
st daemon start
# Start with custom settings
st daemon start --host 0.0.0.0 --port 5557 --seed 42
# Check status
st daemon status
# Stop daemon
st daemon stop
# Restart daemon
st daemon restart
Foreground Mode (Debugging)¶
# Run in foreground to see logs
st daemon start --foreground
# Output:
# SteadyText daemon starting...
# Loading generation model...
# Loading embedding model...
# Daemon ready on tcp://127.0.0.1:5557
# [2024-01-15 10:23:45] Request: generate (seed=42)
# [2024-01-15 10:23:45] Cache hit for generation
Systemd Service (Production)¶
# /etc/systemd/system/steadytext.service
[Unit]
Description=SteadyText Daemon
After=network.target
[Service]
Type=simple
User=steadytext
Group=steadytext
WorkingDirectory=/var/lib/steadytext
ExecStart=/usr/local/bin/st daemon start --foreground
ExecStop=/usr/local/bin/st daemon stop
Restart=always
RestartSec=10
StandardOutput=append:/var/log/steadytext/daemon.log
StandardError=append:/var/log/steadytext/daemon.error.log
# Environment
Environment="STEADYTEXT_GENERATION_CACHE_CAPACITY=1024"
Environment="STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=200"
Environment="PYTHONUNBUFFERED=1"
[Install]
WantedBy=multi-user.target
Enable and start:
Configuration¶
Environment Variables¶
# Daemon settings
export STEADYTEXT_DAEMON_HOST=0.0.0.0 # Bind address
export STEADYTEXT_DAEMON_PORT=5557 # Port number
export STEADYTEXT_DISABLE_DAEMON=1 # Disable daemon usage
# Cache settings (shared by daemon)
export STEADYTEXT_GENERATION_CACHE_CAPACITY=1024
export STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=200
export STEADYTEXT_EMBEDDING_CACHE_CAPACITY=2048
export STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB=400
# Model settings
export STEADYTEXT_DEFAULT_SEED=42
export STEADYTEXT_MODEL_DIR=/path/to/models
Configuration File¶
# steadytext_config.py
import os
# Daemon configuration
DAEMON_CONFIG = {
"host": os.getenv("STEADYTEXT_DAEMON_HOST", "127.0.0.1"),
"port": int(os.getenv("STEADYTEXT_DAEMON_PORT", 5557)),
"timeout": 5000, # milliseconds
"max_retries": 3,
"retry_delay": 0.1 # seconds
}
# Cache configuration
CACHE_CONFIG = {
"generation": {
"capacity": int(os.getenv("STEADYTEXT_GENERATION_CACHE_CAPACITY", 256)),
"max_size_mb": float(os.getenv("STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB", 50.0))
},
"embedding": {
"capacity": int(os.getenv("STEADYTEXT_EMBEDDING_CACHE_CAPACITY", 512)),
"max_size_mb": float(os.getenv("STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB", 100.0))
}
}
# Apply configuration
os.environ.update({
"STEADYTEXT_DAEMON_HOST": DAEMON_CONFIG["host"],
"STEADYTEXT_DAEMON_PORT": str(DAEMON_CONFIG["port"]),
"STEADYTEXT_GENERATION_CACHE_CAPACITY": str(CACHE_CONFIG["generation"]["capacity"]),
"STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB": str(CACHE_CONFIG["generation"]["max_size_mb"]),
"STEADYTEXT_EMBEDDING_CACHE_CAPACITY": str(CACHE_CONFIG["embedding"]["capacity"]),
"STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB": str(CACHE_CONFIG["embedding"]["max_size_mb"])
})
Python SDK Usage¶
Automatic Daemon Usage¶
import steadytext
# Daemon is used automatically if available
text = steadytext.generate("Hello world", seed=42) # Fast if daemon running
embedding = steadytext.embed("test text", seed=123) # Uses daemon
# Check if daemon was used
from steadytext.daemon.client import is_daemon_running
if is_daemon_running():
print("Using daemon for fast responses")
else:
print("Daemon not available, using direct mode")
Explicit Daemon Context¶
from steadytext.daemon import use_daemon
import steadytext
# Force daemon usage (raises error if not available)
with use_daemon():
text = steadytext.generate("Hello world", seed=42)
embedding = steadytext.embed("test", seed=123)
# All operations in this context use daemon
for i in range(100):
result = steadytext.generate(f"Item {i}", seed=i)
Connection Management¶
from steadytext.daemon.client import DaemonClient
import time
class ManagedDaemonClient:
"""Daemon client with connection pooling and retries."""
def __init__(self, max_retries=3, timeout=5000):
self.max_retries = max_retries
self.timeout = timeout
self._client = None
def _get_client(self):
"""Get or create daemon client."""
if self._client is None:
self._client = DaemonClient(timeout=self.timeout)
return self._client
def generate_with_retry(self, prompt, **kwargs):
"""Generate with automatic retry on failure."""
for attempt in range(self.max_retries):
try:
client = self._get_client()
return client.generate(prompt, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
print(f"Retry {attempt + 1}/{self.max_retries} after error: {e}")
time.sleep(0.1 * (attempt + 1))
self._client = None # Reset connection
def close(self):
"""Close daemon connection."""
if self._client:
self._client.close()
self._client = None
# Usage
client = ManagedDaemonClient()
try:
text = client.generate_with_retry("Hello world", seed=42)
print(text)
finally:
client.close()
Streaming with Daemon¶
import steadytext
from steadytext.daemon import use_daemon
# Streaming works identically with daemon
with use_daemon():
print("Streaming with daemon:")
for token in steadytext.generate_iter("Tell me a story", seed=42):
print(token, end="", flush=True)
print()
# The daemon handles streaming efficiently:
# 1. Client sends streaming request
# 2. Daemon generates tokens
# 3. Tokens sent with acknowledgment protocol
# 4. Client controls flow with ACK messages
Batch Operations¶
import concurrent.futures
import steadytext
from steadytext.daemon import use_daemon
import time
def benchmark_daemon_performance():
"""Compare daemon vs direct performance."""
prompts = [f"Generate text for item {i}" for i in range(20)]
# Test without daemon
start = time.time()
results_direct = []
for prompt in prompts:
# Force direct mode
import os
os.environ["STEADYTEXT_DISABLE_DAEMON"] = "1"
result = steadytext.generate(prompt, seed=42)
results_direct.append(result)
del os.environ["STEADYTEXT_DISABLE_DAEMON"]
direct_time = time.time() - start
# Test with daemon
start = time.time()
results_daemon = []
with use_daemon():
for prompt in prompts:
result = steadytext.generate(prompt, seed=42)
results_daemon.append(result)
daemon_time = time.time() - start
print(f"Direct mode: {direct_time:.2f}s")
print(f"Daemon mode: {daemon_time:.2f}s")
print(f"Speedup: {direct_time/daemon_time:.1f}x")
# Parallel batch processing
def process_batch_parallel(prompts, max_workers=4):
"""Process prompts in parallel using daemon."""
with use_daemon():
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
futures = {
executor.submit(steadytext.generate, prompt, seed=idx): (prompt, idx)
for idx, prompt in enumerate(prompts)
}
# Collect results
results = {}
for future in concurrent.futures.as_completed(futures):
prompt, idx = futures[future]
try:
result = future.result()
results[idx] = result
except Exception as e:
print(f"Error processing {prompt}: {e}")
results[idx] = None
# Return in order
return [results[i] for i in range(len(prompts))]
# Usage
prompts = ["Task 1", "Task 2", "Task 3", "Task 4"]
results = process_batch_parallel(prompts)
CLI Integration¶
Automatic Daemon Usage¶
# CLI automatically uses daemon if available
st generate "Hello world" --seed 42
# Check if daemon is being used
st daemon status && echo "Daemon active" || echo "No daemon"
# Force direct mode (bypass daemon)
STEADYTEXT_DISABLE_DAEMON=1 st generate "Hello world"
Shell Script Integration¶
#!/bin/bash
# daemon_batch.sh - Batch processing with daemon
# Ensure daemon is running
ensure_daemon() {
if ! st daemon status >/dev/null 2>&1; then
echo "Starting daemon..."
st daemon start
sleep 2 # Wait for startup
fi
}
# Process files with daemon
process_files() {
local files=("$@")
ensure_daemon
for file in "${files[@]}"; do
echo "Processing: $file"
# Generate summary using daemon
summary=$(cat "$file" | st generate "Summarize this text" --wait --seed 42)
# Generate embedding using daemon
embedding=$(cat "$file" | st embed --format json --seed 42)
# Save results
echo "$summary" > "${file%.txt}_summary.txt"
echo "$embedding" > "${file%.txt}_embedding.json"
done
}
# Main
if [ $# -eq 0 ]; then
echo "Usage: $0 file1.txt file2.txt ..."
exit 1
fi
process_files "$@"
echo "Processing complete!"
Performance Monitoring¶
#!/bin/bash
# monitor_daemon.sh - Monitor daemon performance
# Function to time operations
time_operation() {
local operation="$1"
local start=$(date +%s.%N)
eval "$operation" >/dev/null 2>&1
local end=$(date +%s.%N)
echo "$(echo "$end - $start" | bc)"
}
# Monitor daemon performance
monitor_daemon() {
echo "Daemon Performance Monitor"
echo "========================="
# Check daemon status
if st daemon status --json | jq -e '.running' >/dev/null; then
echo "✓ Daemon is running"
else
echo "✗ Daemon is not running"
return 1
fi
# Test generation speed
echo -e "\nGeneration Performance:"
for i in {1..5}; do
time=$(time_operation "echo 'test' | st --seed $i")
echo " Request $i: ${time}s"
done
# Test embedding speed
echo -e "\nEmbedding Performance:"
for i in {1..5}; do
time=$(time_operation "st embed 'test text' --seed $i")
echo " Request $i: ${time}s"
done
# Cache statistics
echo -e "\nCache Statistics:"
st cache --status
}
# Run monitoring
monitor_daemon
Production Deployment¶
Docker Deployment¶
# Dockerfile
FROM python:3.11-slim
# Install dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create app user
RUN useradd -m -s /bin/bash steadytext
# Install SteadyText
RUN pip install steadytext
# Create directories
RUN mkdir -p /var/log/steadytext /var/lib/steadytext && \
chown -R steadytext:steadytext /var/log/steadytext /var/lib/steadytext
# Switch to app user
USER steadytext
WORKDIR /home/steadytext
# Download models during build
RUN st models download --all
# Expose daemon port
EXPOSE 5557
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD st daemon status || exit 1
# Start daemon
CMD ["st", "daemon", "start", "--foreground", "--host", "0.0.0.0"]
# docker-compose.yml
version: '3.8'
services:
steadytext:
build: .
ports:
- "5557:5557"
volumes:
- steadytext-cache:/home/steadytext/.cache/steadytext
- ./logs:/var/log/steadytext
environment:
- STEADYTEXT_GENERATION_CACHE_CAPACITY=1024
- STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=200
- STEADYTEXT_EMBEDDING_CACHE_CAPACITY=2048
- STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB=400
- STEADYTEXT_DEFAULT_SEED=42
restart: unless-stopped
healthcheck:
test: ["CMD", "st", "daemon", "status"]
interval: 30s
timeout: 3s
retries: 3
volumes:
steadytext-cache:
Kubernetes Deployment¶
# steadytext-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: steadytext-daemon
labels:
app: steadytext
spec:
replicas: 3
selector:
matchLabels:
app: steadytext
template:
metadata:
labels:
app: steadytext
spec:
containers:
- name: steadytext
image: steadytext:latest
ports:
- containerPort: 5557
env:
- name: STEADYTEXT_GENERATION_CACHE_CAPACITY
value: "2048"
- name: STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB
value: "500"
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
livenessProbe:
exec:
command:
- st
- daemon
- status
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
tcpSocket:
port: 5557
initialDelaySeconds: 15
periodSeconds: 5
volumeMounts:
- name: cache
mountPath: /home/steadytext/.cache/steadytext
volumes:
- name: cache
persistentVolumeClaim:
claimName: steadytext-cache-pvc
---
apiVersion: v1
kind: Service
metadata:
name: steadytext-service
spec:
selector:
app: steadytext
ports:
- protocol: TCP
port: 5557
targetPort: 5557
type: LoadBalancer
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: steadytext-cache-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 50Gi
High Availability Setup¶
# ha_daemon_client.py - High availability daemon client
import random
import time
from typing import List, Optional
import steadytext
from steadytext.daemon.client import DaemonClient
class HADaemonClient:
"""High availability client with multiple daemon endpoints."""
def __init__(self, endpoints: List[tuple]):
"""
Initialize with multiple endpoints.
Args:
endpoints: List of (host, port) tuples
"""
self.endpoints = endpoints
self.clients = {}
self.failed_endpoints = set()
self.last_health_check = 0
self.health_check_interval = 60 # seconds
def _get_client(self, endpoint: tuple) -> Optional[DaemonClient]:
"""Get or create client for endpoint."""
if endpoint not in self.clients:
try:
host, port = endpoint
client = DaemonClient(host=host, port=port, timeout=2000)
# Test connection
client._send_request({"type": "ping"})
self.clients[endpoint] = client
except Exception:
return None
return self.clients.get(endpoint)
def _health_check(self):
"""Periodic health check of failed endpoints."""
if time.time() - self.last_health_check > self.health_check_interval:
recovered = set()
for endpoint in self.failed_endpoints:
if self._get_client(endpoint):
recovered.add(endpoint)
self.failed_endpoints -= recovered
self.last_health_check = time.time()
def _get_available_endpoint(self) -> Optional[tuple]:
"""Get random available endpoint."""
self._health_check()
available = [ep for ep in self.endpoints if ep not in self.failed_endpoints]
return random.choice(available) if available else None
def generate(self, prompt: str, **kwargs):
"""Generate with automatic failover."""
attempts = 0
endpoints_tried = set()
while attempts < len(self.endpoints):
endpoint = self._get_available_endpoint()
if not endpoint or endpoint in endpoints_tried:
break
endpoints_tried.add(endpoint)
client = self._get_client(endpoint)
if client:
try:
return client.generate(prompt, **kwargs)
except Exception as e:
print(f"Failed on {endpoint}: {e}")
self.failed_endpoints.add(endpoint)
if endpoint in self.clients:
del self.clients[endpoint]
attempts += 1
# All endpoints failed, fall back to direct mode
print("All daemon endpoints failed, using direct mode")
return steadytext.generate(prompt, **kwargs)
def embed(self, text: str, **kwargs):
"""Embed with automatic failover."""
# Similar implementation to generate
pass
# Usage
ha_client = HADaemonClient([
("daemon1.example.com", 5557),
("daemon2.example.com", 5557),
("daemon3.example.com", 5557)
])
# Automatic failover
result = ha_client.generate("Hello world", seed=42)
Monitoring and Debugging¶
Logging Configuration¶
# logging_config.py
import logging
import sys
from pathlib import Path
def setup_daemon_logging(log_dir="/var/log/steadytext"):
"""Configure comprehensive daemon logging."""
log_dir = Path(log_dir)
log_dir.mkdir(parents=True, exist_ok=True)
# Configure formatters
detailed_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
simple_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
# File handler for all logs
all_handler = logging.FileHandler(log_dir / "daemon.log")
all_handler.setLevel(logging.DEBUG)
all_handler.setFormatter(detailed_formatter)
# File handler for errors only
error_handler = logging.FileHandler(log_dir / "daemon.error.log")
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(detailed_formatter)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(simple_formatter)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(all_handler)
root_logger.addHandler(error_handler)
root_logger.addHandler(console_handler)
# Configure specific loggers
logging.getLogger("steadytext.daemon").setLevel(logging.DEBUG)
logging.getLogger("zmq").setLevel(logging.WARNING)
return root_logger
# Request logging middleware
class RequestLogger:
"""Log all daemon requests for debugging."""
def __init__(self, daemon_server):
self.daemon_server = daemon_server
self.logger = logging.getLogger("steadytext.daemon.requests")
def log_request(self, request_id, request_type, request_data):
"""Log incoming request."""
self.logger.info(f"Request {request_id}: {request_type}", extra={
"request_id": request_id,
"request_type": request_type,
"seed": request_data.get("seed"),
"prompt_length": len(request_data.get("prompt", "")),
"timestamp": time.time()
})
def log_response(self, request_id, response_data, duration):
"""Log outgoing response."""
self.logger.info(f"Response {request_id}: {duration:.3f}s", extra={
"request_id": request_id,
"success": response_data.get("success"),
"cached": response_data.get("cached", False),
"duration": duration,
"timestamp": time.time()
})
Performance Metrics¶
# metrics.py - Daemon performance metrics
import time
import psutil
import json
from collections import deque
from datetime import datetime
from typing import Dict, Any
class DaemonMetrics:
"""Collect and report daemon performance metrics."""
def __init__(self, window_size=1000):
self.request_times = deque(maxlen=window_size)
self.cache_hits = 0
self.cache_misses = 0
self.total_requests = 0
self.errors = 0
self.start_time = time.time()
self.process = psutil.Process()
def record_request(self, duration: float, cached: bool, success: bool):
"""Record request metrics."""
self.total_requests += 1
self.request_times.append(duration)
if cached:
self.cache_hits += 1
else:
self.cache_misses += 1
if not success:
self.errors += 1
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics snapshot."""
uptime = time.time() - self.start_time
# Calculate percentiles
if self.request_times:
sorted_times = sorted(self.request_times)
p50 = sorted_times[len(sorted_times) // 2]
p95 = sorted_times[int(len(sorted_times) * 0.95)]
p99 = sorted_times[int(len(sorted_times) * 0.99)]
avg_time = sum(sorted_times) / len(sorted_times)
else:
p50 = p95 = p99 = avg_time = 0
# System metrics
cpu_percent = self.process.cpu_percent()
memory_info = self.process.memory_info()
return {
"timestamp": datetime.now().isoformat(),
"uptime_seconds": uptime,
"total_requests": self.total_requests,
"requests_per_second": self.total_requests / uptime if uptime > 0 else 0,
"cache_hit_rate": self.cache_hits / self.total_requests if self.total_requests > 0 else 0,
"error_rate": self.errors / self.total_requests if self.total_requests > 0 else 0,
"response_times": {
"average": avg_time,
"p50": p50,
"p95": p95,
"p99": p99
},
"system": {
"cpu_percent": cpu_percent,
"memory_mb": memory_info.rss / 1024 / 1024,
"threads": self.process.num_threads()
}
}
def export_prometheus(self) -> str:
"""Export metrics in Prometheus format."""
metrics = self.get_metrics()
lines = []
# Request metrics
lines.append(f'steadytext_requests_total {metrics["total_requests"]}')
lines.append(f'steadytext_requests_per_second {metrics["requests_per_second"]:.2f}')
lines.append(f'steadytext_cache_hit_rate {metrics["cache_hit_rate"]:.4f}')
lines.append(f'steadytext_error_rate {metrics["error_rate"]:.4f}')
# Response time metrics
lines.append(f'steadytext_response_time_seconds{{quantile="0.5"}} {metrics["response_times"]["p50"]:.4f}')
lines.append(f'steadytext_response_time_seconds{{quantile="0.95"}} {metrics["response_times"]["p95"]:.4f}')
lines.append(f'steadytext_response_time_seconds{{quantile="0.99"}} {metrics["response_times"]["p99"]:.4f}')
# System metrics
lines.append(f'steadytext_cpu_percent {metrics["system"]["cpu_percent"]:.2f}')
lines.append(f'steadytext_memory_megabytes {metrics["system"]["memory_mb"]:.2f}')
lines.append(f'steadytext_threads {metrics["system"]["threads"]}')
return '\n'.join(lines)
# HTTP metrics endpoint
from flask import Flask, Response
app = Flask(__name__)
metrics = DaemonMetrics()
@app.route('/metrics')
def prometheus_metrics():
return Response(metrics.export_prometheus(), mimetype='text/plain')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9090)
Debug Tools¶
#!/bin/bash
# debug_daemon.sh - Comprehensive daemon debugging
# Function to trace daemon requests
trace_requests() {
echo "Tracing daemon requests..."
# Start tcpdump on daemon port
sudo tcpdump -i lo -w daemon_trace.pcap port 5557 &
TCPDUMP_PID=$!
# Run test requests
for i in {1..10}; do
st generate "Test $i" --seed $i &
done
wait
# Stop tcpdump
sudo kill $TCPDUMP_PID
echo "Trace saved to daemon_trace.pcap"
}
# Function to profile daemon
profile_daemon() {
echo "Profiling daemon performance..."
# Get daemon PID
DAEMON_PID=$(st daemon status --json | jq -r '.pid')
if [ -z "$DAEMON_PID" ]; then
echo "Daemon not running"
return 1
fi
# CPU profiling
echo "CPU profiling for 30 seconds..."
sudo perf record -F 99 -p $DAEMON_PID -g -- sleep 30
sudo perf report > daemon_cpu_profile.txt
# Memory profiling
echo "Memory snapshot..."
sudo gcore -o daemon_memory $DAEMON_PID
# Strace
echo "System call trace for 10 seconds..."
sudo strace -p $DAEMON_PID -o daemon_strace.log -f -T &
STRACE_PID=$!
sleep 10
sudo kill $STRACE_PID
echo "Profiling complete"
}
# Function to stress test daemon
stress_test() {
local concurrent=${1:-10}
local requests=${2:-100}
echo "Stress testing with $concurrent concurrent clients, $requests requests each"
# Start monitoring
st daemon status --json > stress_test_before.json
# Run concurrent requests
for i in $(seq 1 $concurrent); do
(
for j in $(seq 1 $requests); do
st generate "Stress test $i-$j" --seed $((i*1000+j)) >/dev/null 2>&1
done
echo "Client $i completed"
) &
done
# Wait for completion
wait
# Get final status
st daemon status --json > stress_test_after.json
echo "Stress test complete"
}
# Main menu
echo "SteadyText Daemon Debug Tools"
echo "1. Trace requests"
echo "2. Profile daemon"
echo "3. Stress test"
echo "4. View logs"
echo "5. Export metrics"
read -p "Select option: " choice
case $choice in
1) trace_requests ;;
2) profile_daemon ;;
3)
read -p "Concurrent clients (default 10): " concurrent
read -p "Requests per client (default 100): " requests
stress_test ${concurrent:-10} ${requests:-100}
;;
4)
tail -f /var/log/steadytext/daemon.log
;;
5)
curl -s http://localhost:9090/metrics
;;
*) echo "Invalid option" ;;
esac
Performance Optimization¶
Cache Warming¶
# cache_warmer.py - Pre-populate daemon cache
import steadytext
from steadytext.daemon import use_daemon
import json
from pathlib import Path
class DaemonCacheWarmer:
"""Warm up daemon cache with common requests."""
def __init__(self, warmup_file="warmup_prompts.json"):
self.warmup_file = Path(warmup_file)
self.load_prompts()
def load_prompts(self):
"""Load warmup prompts from file."""
if self.warmup_file.exists():
with open(self.warmup_file) as f:
self.warmup_data = json.load(f)
else:
# Default warmup prompts
self.warmup_data = {
"generation": [
{"prompt": "Hello", "seed": 42},
{"prompt": "Write a summary", "seed": 42},
{"prompt": "Explain this concept", "seed": 42},
{"prompt": "Generate code", "seed": 42},
{"prompt": "Create documentation", "seed": 42}
],
"embedding": [
{"text": "search query", "seed": 42},
{"text": "document text", "seed": 42},
{"text": "user input", "seed": 42}
]
}
def warm_generation_cache(self):
"""Warm up generation cache."""
print("Warming generation cache...")
with use_daemon():
for item in self.warmup_data["generation"]:
try:
result = steadytext.generate(
item["prompt"],
seed=item.get("seed", 42),
max_new_tokens=item.get("max_tokens", 512)
)
print(f"✓ Cached: {item['prompt'][:30]}...")
except Exception as e:
print(f"✗ Failed: {item['prompt'][:30]}... - {e}")
def warm_embedding_cache(self):
"""Warm up embedding cache."""
print("\nWarming embedding cache...")
with use_daemon():
for item in self.warmup_data["embedding"]:
try:
result = steadytext.embed(
item["text"],
seed=item.get("seed", 42)
)
print(f"✓ Cached: {item['text'][:30]}...")
except Exception as e:
print(f"✗ Failed: {item['text'][:30]}... - {e}")
def run(self):
"""Run complete cache warming."""
print("Starting daemon cache warming...")
self.warm_generation_cache()
self.warm_embedding_cache()
print("\nCache warming complete!")
def save_common_prompts(self, prompts_file="access.log"):
"""Extract common prompts from access logs."""
# Parse access logs to find common prompts
prompt_counts = {}
with open(prompts_file) as f:
for line in f:
# Extract prompt from log line
# Adjust parsing based on your log format
if "prompt:" in line:
prompt = line.split("prompt:")[1].strip()
prompt_counts[prompt] = prompt_counts.get(prompt, 0) + 1
# Get top prompts
top_prompts = sorted(
prompt_counts.items(),
key=lambda x: x[1],
reverse=True
)[:50]
# Update warmup data
self.warmup_data["generation"] = [
{"prompt": prompt, "seed": 42}
for prompt, _ in top_prompts
]
# Save to file
with open(self.warmup_file, 'w') as f:
json.dump(self.warmup_data, f, indent=2)
# Usage
if __name__ == "__main__":
warmer = DaemonCacheWarmer()
warmer.run()
Connection Pooling¶
# connection_pool.py - Daemon connection pooling
import queue
import threading
from contextlib import contextmanager
from steadytext.daemon.client import DaemonClient
class DaemonConnectionPool:
"""Thread-safe connection pool for daemon clients."""
def __init__(self, host="127.0.0.1", port=5557, pool_size=10, timeout=5000):
self.host = host
self.port = port
self.timeout = timeout
self.pool_size = pool_size
self._pool = queue.Queue(maxsize=pool_size)
self._all_connections = []
self._lock = threading.Lock()
self._initialize_pool()
def _initialize_pool(self):
"""Create initial connections."""
for _ in range(self.pool_size):
conn = self._create_connection()
if conn:
self._pool.put(conn)
self._all_connections.append(conn)
def _create_connection(self):
"""Create new daemon connection."""
try:
return DaemonClient(
host=self.host,
port=self.port,
timeout=self.timeout
)
except Exception as e:
print(f"Failed to create connection: {e}")
return None
@contextmanager
def get_connection(self, timeout=None):
"""Get connection from pool."""
connection = None
try:
connection = self._pool.get(timeout=timeout)
yield connection
finally:
if connection:
self._pool.put(connection)
def close_all(self):
"""Close all connections."""
with self._lock:
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.close()
except:
pass
self._all_connections.clear()
# Global connection pool
_connection_pool = None
def get_connection_pool():
"""Get or create global connection pool."""
global _connection_pool
if _connection_pool is None:
_connection_pool = DaemonConnectionPool()
return _connection_pool
# Usage example
def parallel_generate(prompts):
"""Generate text in parallel using connection pool."""
pool = get_connection_pool()
results = {}
def process_prompt(idx, prompt):
with pool.get_connection() as conn:
if conn:
try:
result = conn.generate(prompt, seed=idx)
results[idx] = result
except Exception as e:
results[idx] = f"Error: {e}"
threads = []
for idx, prompt in enumerate(prompts):
t = threading.Thread(target=process_prompt, args=(idx, prompt))
t.start()
threads.append(t)
for t in threads:
t.join()
return [results[i] for i in range(len(prompts))]
Memory Optimization¶
# memory_optimization.py - Optimize daemon memory usage
import gc
import resource
import psutil
from steadytext import get_cache_manager
class DaemonMemoryOptimizer:
"""Optimize memory usage for long-running daemons."""
def __init__(self, max_memory_mb=4096):
self.max_memory_mb = max_memory_mb
self.process = psutil.Process()
self.cache_manager = get_cache_manager()
def set_memory_limits(self):
"""Set process memory limits."""
# Convert MB to bytes
max_memory_bytes = self.max_memory_mb * 1024 * 1024
# Set soft and hard limits
resource.setrlimit(
resource.RLIMIT_AS,
(max_memory_bytes, max_memory_bytes)
)
print(f"Memory limit set to {self.max_memory_mb}MB")
def get_memory_usage(self):
"""Get current memory usage."""
memory_info = self.process.memory_info()
return {
"rss_mb": memory_info.rss / 1024 / 1024,
"vms_mb": memory_info.vms / 1024 / 1024,
"percent": self.process.memory_percent()
}
def optimize_caches(self):
"""Optimize cache sizes based on memory usage."""
usage = self.get_memory_usage()
if usage["percent"] > 80:
# Reduce cache sizes
print("High memory usage, reducing cache sizes...")
# Get current stats
stats = self.cache_manager.get_cache_stats()
# Clear least recently used entries
self.cache_manager.clear_old_entries(keep_ratio=0.5)
# Force garbage collection
gc.collect()
def periodic_optimization(self, interval=300):
"""Run periodic memory optimization."""
import time
import threading
def optimize():
while True:
try:
self.optimize_caches()
usage = self.get_memory_usage()
print(f"Memory: {usage['rss_mb']:.1f}MB ({usage['percent']:.1f}%)")
except Exception as e:
print(f"Optimization error: {e}")
time.sleep(interval)
thread = threading.Thread(target=optimize, daemon=True)
thread.start()
# Apply optimizations at daemon startup
optimizer = DaemonMemoryOptimizer(max_memory_mb=4096)
optimizer.set_memory_limits()
optimizer.periodic_optimization()
Troubleshooting¶
Common Issues and Solutions¶
Daemon Won't Start¶
# Problem: Address already in use
$ st daemon start
Error: Address already in use (127.0.0.1:5557)
# Solution 1: Check for existing process
$ lsof -i :5557
$ kill -9 <PID>
# Solution 2: Use different port
$ st daemon start --port 5558
Connection Timeouts¶
# Problem: Timeout errors with daemon
# Solution 1: Increase timeout
from steadytext.daemon.client import DaemonClient
client = DaemonClient(timeout=10000) # 10 seconds
# Solution 2: Check daemon health
import requests
try:
response = requests.get("http://localhost:9090/metrics", timeout=1)
print("Daemon healthy")
except:
print("Daemon unhealthy")
# Solution 3: Restart daemon
import subprocess
subprocess.run(["st", "daemon", "restart"])
Memory Issues¶
# Problem: Daemon using too much memory
# Solution 1: Clear caches
$ st cache --clear
# Solution 2: Reduce cache sizes
$ export STEADYTEXT_GENERATION_CACHE_CAPACITY=128
$ export STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=25
$ st daemon restart
# Solution 3: Monitor memory usage
$ watch -n 1 'ps aux | grep "st daemon" | grep -v grep'
Performance Degradation¶
# diagnose_performance.py
import time
import statistics
import steadytext
from steadytext.daemon import use_daemon
def diagnose_daemon_performance():
"""Diagnose daemon performance issues."""
# Test direct mode
direct_times = []
for i in range(10):
start = time.time()
steadytext.generate("test", seed=i)
direct_times.append(time.time() - start)
# Test daemon mode
daemon_times = []
with use_daemon():
for i in range(10):
start = time.time()
steadytext.generate("test", seed=i+100)
daemon_times.append(time.time() - start)
print("Direct mode:")
print(f" Mean: {statistics.mean(direct_times):.3f}s")
print(f" Stdev: {statistics.stdev(direct_times):.3f}s")
print("\nDaemon mode:")
print(f" Mean: {statistics.mean(daemon_times):.3f}s")
print(f" Stdev: {statistics.stdev(daemon_times):.3f}s")
if statistics.mean(daemon_times) > statistics.mean(direct_times):
print("\nWARNING: Daemon is slower than direct mode!")
print("Possible causes:")
print("- Network latency")
print("- Daemon overloaded")
print("- Cache thrashing")
diagnose_daemon_performance()
Debug Checklist¶
-
Check daemon status
-
Verify connectivity
-
Check logs
-
Monitor resources
-
Test basic operations
Best Practices¶
1. Production Configuration¶
# production.env
STEADYTEXT_DAEMON_HOST=0.0.0.0
STEADYTEXT_DAEMON_PORT=5557
STEADYTEXT_GENERATION_CACHE_CAPACITY=2048
STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=500
STEADYTEXT_EMBEDDING_CACHE_CAPACITY=4096
STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB=1000
STEADYTEXT_DEFAULT_SEED=42
PYTHONUNBUFFERED=1
2. Health Monitoring¶
# health_check.py
def health_check():
"""Comprehensive daemon health check."""
checks = {
"daemon_running": False,
"response_time": None,
"cache_available": False,
"memory_ok": False
}
# Check if daemon is running
try:
result = subprocess.run(
["st", "daemon", "status", "--json"],
capture_output=True,
text=True
)
if result.returncode == 0:
status = json.loads(result.stdout)
checks["daemon_running"] = status.get("running", False)
except:
pass
# Check response time
if checks["daemon_running"]:
start = time.time()
try:
steadytext.generate("health check", seed=42)
checks["response_time"] = time.time() - start
except:
pass
# Check cache
try:
cache_manager = get_cache_manager()
stats = cache_manager.get_cache_stats()
checks["cache_available"] = True
except:
pass
# Check memory
if checks["daemon_running"]:
memory = psutil.Process().memory_info().rss / 1024 / 1024
checks["memory_ok"] = memory < 4096 # 4GB limit
return checks
3. Graceful Degradation¶
# graceful_degradation.py
class ResilientClient:
"""Client with graceful degradation."""
def __init__(self):
self.use_daemon = True
self.fallback_count = 0
self.max_fallbacks = 3
def generate(self, prompt, **kwargs):
"""Generate with automatic fallback."""
if self.use_daemon and self.fallback_count < self.max_fallbacks:
try:
with use_daemon():
return steadytext.generate(prompt, **kwargs)
except Exception as e:
self.fallback_count += 1
print(f"Daemon failed ({self.fallback_count}/{self.max_fallbacks}): {e}")
if self.fallback_count >= self.max_fallbacks:
self.use_daemon = False
print("Disabling daemon due to repeated failures")
# Direct mode fallback
return steadytext.generate(prompt, **kwargs)
4. Security Considerations¶
# secure_daemon.py
import ssl
import secrets
class SecureDaemonConfig:
"""Secure daemon configuration."""
@staticmethod
def generate_auth_token():
"""Generate secure auth token."""
return secrets.token_urlsafe(32)
@staticmethod
def configure_tls(cert_path, key_path):
"""Configure TLS for daemon."""
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(cert_path, key_path)
return context
@staticmethod
def restrict_bind_address():
"""Restrict daemon to localhost only."""
return {
"host": "127.0.0.1", # Never use 0.0.0.0 in production
"port": 5557
}
This comprehensive guide covers all aspects of using SteadyText's daemon mode, from basic usage to advanced production deployments. The daemon provides significant performance benefits while maintaining the simplicity and reliability that SteadyText is known for.