dspy-agent-framework-integration by Qredence
Comprehensive guide to integrating DSPy with Microsoft Agent Framework in AgenticFleet, covering typed signatures, assertions, routing cache, GEPA optimization, and agent handoffs.
Content & Writing
90 Stars
10 Forks
Updated Jan 19, 2026, 03:09 AM
Why Use This
This skill provides specialized capabilities for Qredence's codebase.
Use Cases
- Developing new features in the Qredence repository
- Refactoring existing code to follow Qredence standards
- Understanding and working with Qredence's codebase structure
Install Guide
2 steps- 1
Skip this step if Ananke is already installed.
- 2
Skill Snapshot
Auto scan of skill assets. Informational only.
Valid SKILL.md
Checks against SKILL.md specification
Source & Community
Skill Stats
SKILL.md 683 Lines
Total Files 1
Total Size 0 B
License NOASSERTION
---
name: dspy-agent-framework-integration
description: Comprehensive guide to integrating DSPy with Microsoft Agent Framework in AgenticFleet, covering typed signatures, assertions, routing cache, GEPA optimization, and agent handoffs.
---
# DSPy + Microsoft Agent Framework Integration
A comprehensive guide to the integration patterns between DSPy and Microsoft Agent Framework in AgenticFleet. This skill documents how to leverage DSPy's structured reasoning capabilities with the Agent Framework's orchestration primitives.
## Overview
AgenticFleet combines **DSPy** for intelligent prompt optimization and structured outputs with **Microsoft Agent Framework** for reliable multi-agent orchestration. This integration enables:
- **Typed Signatures**: Pydantic-validated DSPy outputs for type-safe orchestration
- **DSPy-Enhanced Agents**: ChatAgent wrappers with Chain of Thought, ReAct, and Program of Thought reasoning
- **Routing Cache**: TTL-based caching of routing decisions to reduce latency
- **GEPA Optimization**: Offline genetic prompt algorithm optimization
- **Checkpoint Storage**: Workflow resumption via agent-framework storage
- **Agent Handoffs**: Direct agent-to-agent transfers with context preservation
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ AgenticFleet Integration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DSPyReasoner│────►│ AgentFactory│────►│ ChatAgent │ │
│ │ (Signatures)│ │ (YAML Config) │ (Enhanced) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼───────────────────▼───────────────────▼──────┐ │
│ │ Microsoft Agent Framework │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Workflow │ │AgentThread│ │CheckpointStorage │ │ │
│ │ └──────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## Typed Signatures with Pydantic
### Signature Definition Pattern
All DSPy signatures in AgenticFleet use Pydantic models for structured outputs:
```python
# src/agentic_fleet/dspy_modules/signatures.py
import dspy
from pydantic import BaseModel, Field
from typing import Literal
class TaskAnalysis(dspy.Signature):
"""Analyze a task with structured output."""
task: str = dspy.InputField(desc="The user's task description")
analysis: TaskAnalysisOutput = dspy.OutputField(
desc="Structured analysis of the task"
)
class TaskAnalysisOutput(BaseModel):
"""Pydantic model for typed signature output."""
complexity: Literal["low", "medium", "high"] = Field(
description="Estimated task complexity"
)
required_capabilities: list[str] = Field(
description="List of required capabilities"
)
estimated_steps: int = Field(ge=1, le=50)
preferred_tools: list[str] = Field(default_factory=list)
needs_web_search: bool = Field(description="Whether web search needed")
reasoning: str = Field(description="Reasoning behind analysis")
```
### Using TypedPredictor
```python
# src/agentic_fleet/dspy_modules/reasoner.py
from dspy import TypedPredictor
class DSPyReasoner(dspy.Module):
def __init__(self):
super().__init__()
self.analyzer = TypedPredictor(TaskAnalysis)
def analyze(self, task: str) -> TaskAnalysisOutput:
result = self.analyzer(task=task)
return result.analysis
```
### Field Validators
Normalize inputs with Pydantic validators:
```python
class RoutingDecisionOutput(BaseModel):
assigned_to: list[str] = Field(min_length=1)
execution_mode: Literal["delegated", "sequential", "parallel"]
@field_validator("assigned_to", mode="before")
@classmethod
def normalize_agents(cls, v: str | list[str]) -> list[str]:
if isinstance(v, str):
return [a.strip() for a in v.split(",") if a.strip()]
return v
@field_validator("execution_mode", mode="before")
@classmethod
def normalize_mode(cls, v: str) -> str:
mapping = {
"delegate": "delegated",
"single": "delegated",
"sequence": "sequential",
"concurrent": "parallel",
}
return mapping.get(v.strip().lower(), v)
```
## DSPy Assertions for Validation
### Hard and Soft Constraints
DSPy 3.x provides two assertion types for routing validation:
```python
# src/agentic_fleet/dspy_modules/assertions.py
import dspy
# Hard constraint: causes backtracking on failure
dspy.Assert(condition, "error message")
# Soft constraint: guides optimization without failure
dspy.Suggest(condition, "guidance message")
```
### Agent Assignment Validation
```python
def validate_agent_exists(
assigned_agents: list[str],
available_agents: list[str]
) -> bool:
"""Check all assigned agents exist in available pool."""
# Hard constraint: must assign at least one agent
Assert(len(assigned_agents) > 0, "Must assign at least one agent")
# Soft suggestion: prefer matching case
for agent in assigned_agents:
Assert(
agent.lower() in [a.lower() for a in available_agents],
f"Agent '{agent}' not in available pool"
)
return True
```
### Execution Mode Validation
```python
def validate_execution_mode(
assigned_agents: list[str],
execution_mode: str
) -> bool:
"""Ensure execution mode matches agent count."""
if len(assigned_agents) > 1 and execution_mode == "delegated":
Suggest(
len(assigned_agents) == 1,
"Consider using 'parallel' for multiple agents"
)
return True
```
### Usage in Signatures
```python
class TaskRouting(dspy.Signature):
task: str = dspy.InputField(desc="The task to route")
team: str = dspy.InputField(desc="Available agents")
context: str = dspy.InputField(desc="Execution context")
decision: RoutingDecisionOutput = dspy.OutputField()
def __call__(self, task, team, context):
# Extract agent names from team description
available_agents = extract_agent_names(team)
# Validate before finalizing
result = super().__call__(task=task, team=team, context=context)
# Validate routing decision
validate_agent_exists(result.decision.assigned_to, available_agents)
validate_execution_mode(
result.decision.assigned_to,
result.decision.execution_mode
)
return result
```
## DSPy-Enhanced Agents
### Wrapping ChatAgent
```python
# src/agentic_fleet/agents/base.py
from agent_framework._agents import ChatAgent
import dspy
class DSPyEnhancedAgent(ChatAgent):
def __init__(
self,
name: str,
chat_client,
instructions: str = "",
enable_dspy: bool = True,
reasoning_strategy: str = "chain_of_thought",
**kwargs
):
super().__init__(
name=name,
instructions=instructions,
chat_client=chat_client,
**kwargs
)
self.enable_dspy = enable_dspy
self.reasoning_strategy = reasoning_strategy
# Initialize reasoning modules
if enable_dspy:
self._init_reasoning_modules()
def _init_reasoning_modules(self):
"""Initialize DSPy reasoning strategies."""
if self.reasoning_strategy == "react":
self.react_module = dspy.ReAct(
"question -> answer",
tools=self.tools
)
elif self.reasoning_strategy == "program_of_thought":
self.pot_module = dspy.ProgramOfThought("question -> answer")
elif self.reasoning_strategy == "chain_of_thought":
self.cot_module = dspy.ChainOfThought("question -> answer")
```
### Task Enhancement
```python
class DSPyEnhancedAgent(ChatAgent):
def _enhance_task_with_dspy(self, task: str, context: str = "") -> str:
"""Enhance task using DSPy reasoning."""
if not self.enable_dspy:
return task
# Use Chain of Thought for complex tasks
enhancer = dspy.ChainOfThought("task, context -> enhanced_task")
result = enhancer(
task=task,
context=context or "No prior context"
)
return result.enhanced_task
async def run(self, message, **kwargs):
# Enhance task before execution
enhanced_message = self._enhance_task_with_dspy(
message,
kwargs.get("context", "")
)
# Run with enhanced task
return await super().run(enhanced_message, **kwargs)
```
## Routing Cache
### TTL-Based Cache Implementation
```python
# src/agentic_fleet/dspy_modules/reasoner_cache.py
import time
from typing import Any
from collections import OrderedDict
class RoutingCache:
"""TTL-based cache for routing decisions."""
def __init__(self, ttl_seconds: int = 300, max_size: int = 1024):
self.ttl = ttl_seconds
self.max_size = max_size
self._cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
def get(self, key: str) -> Any | None:
"""Get cached value if not expired."""
if key not in self._cache:
return None
value, timestamp = self._cache[key]
# Check TTL
if time.time() - timestamp > self.ttl:
del self._cache[key]
return None
# Move to end (LRU)
self._cache.move_to_end(key)
return value
def set(self, key: str, value: Any) -> None:
"""Cache value with current timestamp."""
# Evict oldest if at capacity
if len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
self._cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear all cached entries."""
self._cache.clear()
```
### Integration with DSPyReasoner
```python
# src/agentic_fleet/dspy_modules/reasoner.py
class DSPyReasoner(dspy.Module):
def __init__(self, enable_routing_cache: bool = True, **kwargs):
super().__init__()
self.enable_routing_cache = enable_routing_cache
self._routing_cache = RoutingCache(
ttl_seconds=kwargs.get("cache_ttl_seconds", 300),
max_size=kwargs.get("cache_max_entries", 1024)
)
def _generate_cache_key(
self,
task: str,
team: str,
context: str
) -> str:
"""Generate cache key from routing inputs."""
import hashlib
content = f"{task}:{team}:{context}"
return hashlib.md5(content.encode()).hexdigest()
def route(self, task: str, team: str, context: str) -> RoutingDecisionOutput:
# Check cache first
if self.enable_routing_cache:
cache_key = self._generate_cache_key(task, team, context)
cached = self._routing_cache.get(cache_key)
if cached:
return cached
# Execute routing
result = self.router(task=task, team=team, context=context)
decision = result.decision
# Cache result
if self.enable_routing_cache:
self._routing_cache.set(cache_key, decision)
return decision
```
## GEPA Optimization
### Configuration
```yaml
# src/agentic_fleet/config/workflow_config.yaml
dspy:
optimization:
enabled: true
examples_path: src/agentic_fleet/data/supervisor_examples.json
use_gepa: true
gepa_auto: light # light|medium|heavy
gepa_reflection_model: gpt-5-mini
gepa_history_min_quality: 8.0
gepa_history_limit: 200
gepa_val_split: 0.2
gepa_seed: 13
gepa_log_dir: .var/logs/dspy/gepa
```
### Optimization Command
```bash
# Run GEPA optimization
agentic-fleet optimize
# Output: .var/cache/dspy/compiled_reasoner.json
```
### Loading Compiled Modules
```python
# src/agentic_fleet/dspy_modules/reasoner.py
def _load_compiled_module(self) -> None:
"""Load optimized prompt weights from disk."""
compiled_path = get_configured_compiled_reasoner_path()
meta_path = Path(f"{compiled_path}.meta")
if compiled_path.exists():
# Verify source hash matches
if meta_path.exists():
meta = json.loads(meta_path.read_text())
expected_hash = meta.get("reasoner_source_hash")
if expected_hash != get_reasoner_source_hash():
logger.info("Compiled reasoner ignored (source hash mismatch)")
return
logger.info(f"Loading compiled reasoner from {compiled_path}")
self.load(str(compiled_path))
```
## Agent Framework Integration
### Creating ChatAgent from YAML
```python
# src/agentic_fleet/agents/coordinator.py
from agent_framework._agents import ChatAgent
class AgentFactory:
def create_agent(self, name: str, config: dict) -> ChatAgent:
"""Create ChatAgent from YAML configuration."""
model_id = config.get("model")
instructions = self._resolve_instructions(config.get("instructions", ""))
tools = self._resolve_tools(config.get("tools", []))
return ChatAgent(
name=name,
description=config.get("description", ""),
instructions=instructions,
chat_client=self._create_chat_client(model_id),
tools=tools
)
def _resolve_instructions(self, instructions_ref: str) -> str:
"""Resolve dynamic prompts or static references."""
if instructions_ref.startswith("prompts."):
# Dynamic DSPy prompt generation
return self._generate_dynamic_prompt(instructions_ref)
# Static prompt lookup
return get_static_prompt(instructions_ref)
```
### Dynamic Prompt Generation
```python
# src/agentic_fleet/agents/coordinator.py
from dspy import ChainOfThought
from agentic_fleet.dspy_modules.signatures import PlannerInstructionSignature
class AgentFactory:
def __init__(self):
self.instruction_generator = ChainOfThought(PlannerInstructionSignature)
def _generate_dynamic_prompt(self, ref: str) -> str:
"""Generate prompt using DSPy."""
if ref == "prompts.planner":
result = self.instruction_generator(
available_agents=self._get_agent_descriptions(),
task_goals="Plan and coordinate multi-agent workflows"
)
return result.instructions
return ""
```
### Workflow with Checkpointing
```python
# src/agentic_fleet/workflows/supervisor.py
from agent_framework._workflows import (
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowOutputEvent,
ExecutorCompletedEvent,
RequestInfoEvent, # HITL support
FileCheckpointStorage
)
class SupervisorWorkflow:
def __init__(self, context, checkpoint_dir: str = ".var/checkpoints"):
self.context = context
self.checkpoint_storage = FileCheckpointStorage(checkpoint_dir)
async def run_stream(self, task: str, checkpoint_id: str | None = None):
"""Run workflow with optional checkpoint resume."""
if checkpoint_id:
# Resume from checkpoint
await self._resume_from_checkpoint(checkpoint_id)
else:
# Start fresh
async for event in self._execute_pipeline(task):
yield event
async def _resume_from_checkpoint(self, checkpoint_id: str):
"""Resume workflow execution from checkpoint."""
state = self.checkpoint_storage.load(checkpoint_id)
# Restore workflow state
self.context.restore_from_state(state)
# Continue execution
async for event in self._continue_pipeline():
yield event
```
### Agent Handoffs
```python
# src/agentic_fleet/workflows/strategies.py
from agent_framework._agents import ChatAgent
class HandoffManager:
"""Manage agent-to-agent transfers with context preservation."""
def __init__(self):
self._handoff_history: list[dict] = []
def prepare_handoff(
self,
from_agent: ChatAgent,
to_agent: ChatAgent,
context: dict
) -> dict:
"""Prepare handoff input with accumulated context."""
handoff_input = {
"task": context.get("original_task"),
"findings": context.get("findings", []),
"decisions": context.get("decisions", []),
"remaining_work": context.get("remaining_work", []),
"from_agent_summary": self._summarize_agent_work(from_agent)
}
self._handoff_history.append({
"from": from_agent.name,
"to": to_agent.name,
"input": handoff_input
})
return handoff_input
def execute_sequential_with_handoffs(
self,
agents: list[ChatAgent],
tasks: list[str]
) -> list[dict]:
"""Execute tasks with agent handoffs."""
context = {"original_task": tasks[0], "findings": [], "decisions": []}
results = []
for i, (agent, task) in enumerate(zip(agents, tasks)):
context["remaining_work"] = tasks[i + 1:]
handoff_input = self.prepare_handoff(
from_agent=agents[i - 1] if i > 0 else None,
to_agent=agent,
context=context
)
result = self._run_agent_with_context(agent, task, handoff_input)
context["findings"].extend(result.get("findings", []))
context["decisions"].extend(result.get("decisions", []))
results.append(result)
return results
```
## Configuration Reference
### workflow_config.yaml
```yaml
# DSPy Configuration
dspy:
model: gpt-5-mini
routing_model: gpt-5-mini
use_typed_signatures: true
enable_routing_cache: true
routing_cache_ttl_seconds: 300
require_compiled: false # true in production
# Dynamic Prompts
dynamic_prompts:
enabled: true
signatures_path: src/agentic_fleet/dspy_modules/signatures.py
# GEPA Optimization
optimization:
enabled: true
use_gepa: true
gepa_auto: light
# Workflow Configuration
workflow:
supervisor:
max_rounds: 15
enable_streaming: true
checkpointing:
checkpoint_dir: .var/checkpoints
# Agent Configuration
agents:
researcher:
model: gpt-4.1-mini
tools: [TavilySearchTool]
reasoning:
effort: medium
verbosity: normal
```
## Common Patterns
### 1. Simple Task (Fast-Path)
```python
# src/agentic_fleet/workflows/helpers.py
def is_simple_task(task: str) -> bool:
"""Check if task qualifies for fast-path processing."""
simple_patterns = [
r"^(hi|hello|hey|how are you|what's up)",
r"^\d+\s*[\+\-\*/]\s*\d+$", # Simple math
r"^(what is|who is|where is|when did)\s+\w+", # Simple facts
]
return any(re.match(p, task.lower()) for p in simple_patterns)
```
### 2. Multi-Agent Parallel Execution
```python
# src/agentic_fleet/workflows/strategies.py
async def execute_parallel(
agents: list[ChatAgent],
task: str
) -> list[dict]:
"""Execute task across multiple agents concurrently."""
async def run_agent(agent):
return {
"agent": agent.name,
"result": await agent.run(task)
}
results = await asyncio.gather(*[run_agent(a) for a in agents])
return results
```
### 3. Quality-Based Refinement Loop
```python
# src/agentic_fleet/workflows/executors.py
async def run_quality_phase(
task: str,
result: str,
threshold: float = 7.0
) -> tuple[str, bool]:
"""Evaluate quality and refine if needed."""
assessment = await self.reasoner.assess_quality(task, result)
if assessment.score < threshold:
# Refine the result
refined = await self._refine_result(task, result, assessment.feedback)
return refined, True
return result, False
```
## Debugging Tips
1. **Routing issues**: Check `.var/logs/execution_history.jsonl` for routing decisions
2. **Slow workflows**: Reduce `gepa_max_metric_calls` in config
3. **DSPy fallback**: If no compiled cache, system uses zero-shot
4. **Type errors**: Run `make type-check` before commits
## Related Documentation
- [DSPy Documentation](https://dspy.ai)
- [Microsoft Agent Framework](https://github.com/microsoft/agent-framework)
- AgenticFleet: `docs/guides/dspy-agent-framework-integration.md`
Name Size