Log Enrichment Pipeline
A log processing system using fenic's text extraction and semantic enrichment capabilities to transform unstructured logs into actionable incident response data.
Overview
This pipeline demonstrates log enrichment through multi-stage processing:
- Template-based parsing without regex
- Service metadata enrichment via joins
- LLM-powered error categorization and remediation
- Incident severity assessment with business context
Prerequisites
- Install fenic:
bash
pip install fenic
- Configure OpenAI API key:
bash export OPENAI_API_KEY="your-api-key-here"
Usage
python enrichment.py
Implementation
The pipeline processes logs through three stages:
- Parse: Extract structured fields from syslog-format messages
- Enrich: Join with service ownership and criticality data
- Analyze: Apply LLM operations for incident response
API Structure
from fenic.api.session import Session, SessionConfig, SemanticConfig, OpenAIModelConfig
from fenic.api.functions import col, text, semantic
from pydantic import BaseModel, Field
# Configure session
config = SessionConfig(
app_name="log_enrichment",
semantic=SemanticConfig(
language_models= {
"mini" : OpenAIModelConfig(
model_name="gpt-4o-mini",
rpm=500,
tpm=200_000
)
}
)
)
# Define extraction schema with Pydantic
class ErrorAnalysis(BaseModel):
error_category: str = Field(..., description="Main category of the error")
affected_component: str = Field(..., description="Specific component affected")
potential_cause: str = Field(..., description="Most likely root cause")
# Stage 1: Template extraction
parsed = logs_df.select(
"raw_message", text.extract("${timestamp:none} [${level:none}] ${service:none}: ${message:none}")
)
# Stage 2: Metadata join
enriched = parsed.join(metadata_df, on="service", how="left")
# Stage 3: Semantic enrichment
final = enriched.select(
semantic.extract("message", ErrorAnalysis).alias("analysis"),
semantic.classify(
text.concat(col("message"), lit(" (criticality: "), col("criticality"), lit(")")),
["low", "medium", "high", "critical"]
).alias("incident_severity"),
semantic.map(
"Generate remediation steps for: {message} | Service: {service} | Team: {team_owner}"
).alias("remediation_steps")
)
Output Format
✅ Pipeline Complete! Final enriched logs:
----------------------------------------------------------------------
timestamp level service message team_owner error_category incident_severity remediation_steps
2024-01-15 14:32:01 ERROR payment-api Connection timeout... payments-team database critical 1. Check Database Connectivity...
2024-01-15 14:32:15 WARN user-service Rate limit exceeded... identity-team resource critical 1. Review Rate Limiting Config...
📈 Analytics Examples:
Error Category Distribution:
error_category count
database 1
resource 5
authentication 4
network 5
High-Priority Incidents (Critical/High severity):
service team_owner incident_severity on_call_channel remediation_steps
payment-api payments-team critical #payments-oncall 1. Check Database Connectivity...
user-service identity-team critical #identity-alerts 1. Review Rate Limiting Config...
Configuration
Custom Log Templates
Parse different log formats:
# Syslog format
log_template = "${timestamp:none} [${level:none}] ${service:none}: ${message:none}"
# Custom application format
log_template = "${service:none} | ${timestamp:none} | ${level:none} - ${message:none}"
Troubleshooting
Issue: Template extraction returns empty fields
Solution: Check template format matches log structure exactly, including spaces
Issue: Missing service metadata after join
Solution: Use left join to preserve all logs; add default values for missing metadata
Issue: Generic remediation steps
Solution: Include more context in semantic.map prompt (service, team, criticality)