Skip to content

Log Enrichment Pipeline

View in Github

A log processing system using fenic's text extraction and semantic enrichment capabilities to transform unstructured logs into actionable incident response data.

Overview

This pipeline demonstrates log enrichment through multi-stage processing:

  • Template-based parsing without regex
  • Service metadata enrichment via joins
  • LLM-powered error categorization and remediation
  • Incident severity assessment with business context

Prerequisites

  1. Install fenic:

bash pip install fenic

  1. Configure OpenAI API key: bash export OPENAI_API_KEY="your-api-key-here"

Usage

python enrichment.py

Implementation

The pipeline processes logs through three stages:

  1. Parse: Extract structured fields from syslog-format messages
  2. Enrich: Join with service ownership and criticality data
  3. Analyze: Apply LLM operations for incident response

API Structure

from fenic.api.session import Session, SessionConfig, SemanticConfig, OpenAILanguageModel
from fenic.api.functions import col, text, semantic
from pydantic import BaseModel, Field

# Configure session
config = SessionConfig(
   app_name="log_enrichment",
   semantic=SemanticConfig(
      language_models={
         "mini": OpenAILanguageModel(
            model_name="gpt-4o-mini",
            rpm=500,
            tpm=200_000
         )
      }
   )
)


# Define extraction schema with Pydantic
class ErrorAnalysis(BaseModel):
   error_category: str = Field(description="Main category of the error")
   affected_component: str = Field(description="Specific component affected")
   potential_cause: str = Field(description="Most likely root cause")


# Stage 1: Template extraction
parsed = logs_df.select(
   "raw_message", text.extract("${timestamp:none} [${level:none}] ${service:none}: ${message:none}")
)

# Stage 2: Metadata join
enriched = parsed.join(metadata_df, on="service", how="left")

# Stage 3: Semantic enrichment
final = enriched.select(
   semantic.extract("message", ErrorAnalysis).alias("analysis"),
   semantic.classify(
      text.concat(col("message"), lit(" (criticality: "), col("criticality"), lit(")")),
      ["low", "medium", "high", "critical"]
   ).alias("incident_severity"),
   fc.semantic.map(
      (
         "Generate 2-3 specific remediation steps that the on-call team should take to resolve this issue: "
         "{{message}} | Service: {{service}} | Team: {{team_owner}}"
      ),
      message=fc.col("message"),
      service=fc.col("service"),
      team_owner=fc.col("team_owner")
   ).alias("remediation_steps")
)

Output Format

✅ Pipeline Complete! Final enriched logs:
----------------------------------------------------------------------
timestamp           level  service       message                  team_owner      error_category  incident_severity  remediation_steps
2024-01-15 14:32:01 ERROR  payment-api   Connection timeout...    payments-team   database        critical          1. Check Database Connectivity...
2024-01-15 14:32:15 WARN   user-service  Rate limit exceeded...   identity-team   resource        critical          1. Review Rate Limiting Config...

📈 Analytics Examples:

Error Category Distribution:
error_category    count
database          1
resource          5
authentication    4
network           5

High-Priority Incidents (Critical/High severity):
service           team_owner       incident_severity  on_call_channel    remediation_steps
payment-api       payments-team    critical          #payments-oncall   1. Check Database Connectivity...
user-service      identity-team    critical          #identity-alerts   1. Review Rate Limiting Config...

Configuration

Custom Log Templates

Parse different log formats:

# Syslog format
log_template = "${timestamp:none} [${level:none}] ${service:none}: ${message:none}"

# Custom application format
log_template = "${service:none} | ${timestamp:none} | ${level:none} - ${message:none}"

Troubleshooting

Issue: Template extraction returns empty fields Solution: Check template format matches log structure exactly, including spaces

Issue: Missing service metadata after join Solution: Use left join to preserve all logs; add default values for missing metadata

Issue: Generic remediation steps Solution: Include more context in semantic.map prompt (service, team, criticality)