orchestrating-agents

$npx mdskill add oaustegard/claude-skills/orchestrating-agents

Orchestrates parallel API instances for complex multi-agent workflows.

  • Enables simultaneous independent streams for parallel analysis tasks.
  • Integrates with Anthropic API for programmatic invocations.
  • Decides execution by delegating subtasks to specialized instances.
  • Delivers results through streaming responses with tool-enabled patterns.

SKILL.md

.github/skills/orchestrating-agentsView on GitHub ↗
---
name: orchestrating-agents
description: Orchestrates parallel API instances, delegated sub-tasks, and multi-agent workflows with streaming and tool-enabled delegation patterns. Use for parallel analysis, multi-perspective reviews, or complex task decomposition.
metadata:
  version: 0.3.0
---

# Orchestrating Agents

This skill enables programmatic API invocations for advanced workflows including parallel processing, task delegation, and multi-agent analysis using the Anthropic API.

## When to Use This Skill

**Primary use cases:**
- **Parallel sub-tasks**: Break complex analysis into simultaneous independent streams
- **Multi-perspective analysis**: Get 3-5 different expert viewpoints concurrently
- **Delegation**: Offload specific subtasks to specialized API instances
- **Recursive workflows**: Orchestrator coordinating multiple API instances
- **High-volume processing**: Batch process multiple items concurrently

**Trigger patterns:**
- "Parallel analysis", "multi-perspective review", "concurrent processing"
- "Delegate subtasks", "coordinate multiple agents"
- "Run analyses from different perspectives"
- "Get expert opinions from multiple angles"

## Quick Start

### Single Invocation

```python
import sys
sys.path.append('/home/user/claude-skills/orchestrating-agents/scripts')
from claude_client import invoke_claude

response = invoke_claude(
    prompt="Analyze this code for security vulnerabilities: ...",
    model="claude-sonnet-4-6"
)
print(response)
```

### Parallel Multi-Perspective Analysis

```python
from claude_client import invoke_parallel

prompts = [
    {
        "prompt": "Analyze from security perspective: ...",
        "system": "You are a security expert"
    },
    {
        "prompt": "Analyze from performance perspective: ...",
        "system": "You are a performance optimization expert"
    },
    {
        "prompt": "Analyze from maintainability perspective: ...",
        "system": "You are a software architecture expert"
    }
]

results = invoke_parallel(prompts, model="claude-sonnet-4-6")

for i, result in enumerate(results):
    print(f"\n=== Perspective {i+1} ===")
    print(result)
```

### Parallel with Shared Cached Context (Recommended)

For parallel operations with shared base context, use caching to reduce costs by up to 90%:

```python
from claude_client import invoke_parallel

# Large context shared across all sub-agents (e.g., codebase, documentation)
base_context = """
<codebase>
...large codebase or documentation (1000+ tokens)...
</codebase>
"""

prompts = [
    {"prompt": "Find security vulnerabilities in the authentication module"},
    {"prompt": "Identify performance bottlenecks in the API layer"},
    {"prompt": "Suggest refactoring opportunities in the database layer"}
]

# First sub-agent creates cache, subsequent ones reuse it
results = invoke_parallel(
    prompts,
    shared_system=base_context,
    cache_shared_system=True  # 90% cost reduction for cached content
)
```

### Multi-Turn Conversation with Auto-Caching

For sub-agents that need multiple rounds of conversation:

```python
from claude_client import ConversationThread

# Create a conversation thread (auto-caches history)
agent = ConversationThread(
    system="You are a code refactoring expert with access to the codebase",
    cache_system=True
)

# Turn 1: Initial analysis
response1 = agent.send("Analyze the UserAuth class for issues")
print(response1)

# Turn 2: Follow-up (reuses cached system + turn 1)
response2 = agent.send("How would you refactor the login method?")
print(response2)

# Turn 3: Implementation (reuses all previous context)
response3 = agent.send("Show me the refactored code")
print(response3)
```

### Streaming Responses

For real-time feedback from sub-agents:

```python
from claude_client import invoke_claude_streaming

def show_progress(chunk):
    print(chunk, end='', flush=True)

response = invoke_claude_streaming(
    "Write a comprehensive security analysis...",
    callback=show_progress
)
```

### Parallel Streaming

Monitor multiple sub-agents simultaneously:

```python
from claude_client import invoke_parallel_streaming

def agent1_callback(chunk):
    print(f"[Security] {chunk}", end='', flush=True)

def agent2_callback(chunk):
    print(f"[Performance] {chunk}", end='', flush=True)

results = invoke_parallel_streaming(
    [
        {"prompt": "Security review: ..."},
        {"prompt": "Performance review: ..."}
    ],
    callbacks=[agent1_callback, agent2_callback]
)
```

### Interruptible Operations

Cancel long-running parallel operations:

```python
from claude_client import invoke_parallel_interruptible, InterruptToken
import threading
import time

token = InterruptToken()

# Run in background
def run_analysis():
    results = invoke_parallel_interruptible(
        prompts=[...],
        interrupt_token=token
    )
    return results

thread = threading.Thread(target=run_analysis)
thread.start()

# Interrupt after 5 seconds
time.sleep(5)
token.interrupt()
```

## Core Functions

### `invoke_claude()`

Single synchronous invocation with full control:

```python
invoke_claude(
    prompt: str | list[dict],
    model: str = "claude-sonnet-4-6",
    system: str | list[dict] | None = None,
    max_tokens: int = 4096,
    temperature: float = 1.0,
    streaming: bool = False,
    cache_system: bool = False,
    cache_prompt: bool = False,
    messages: list[dict] | None = None,
    **kwargs
) -> str
```

**Parameters:**
- `prompt`: The user message (string or list of content blocks)
- `model`: Claude model to use (default: claude-sonnet-4-6)
- `system`: Optional system prompt (string or list of content blocks)
- `max_tokens`: Maximum tokens in response (default: 4096)
- `temperature`: Randomness 0-1 (default: 1.0)
- `streaming`: Enable streaming response (default: False)
- `cache_system`: Add cache_control to system prompt (requires 1024+ tokens, default: False)
- `cache_prompt`: Add cache_control to user prompt (requires 1024+ tokens, default: False)
- `messages`: Pre-built messages list for multi-turn (overrides prompt)
- `**kwargs`: Additional API parameters (top_p, top_k, etc.)

**Returns:** Response text as string

**Note:** Caching requires minimum 1,024 tokens per cache breakpoint. Cache lifetime is 5 minutes (refreshed on use).

### `invoke_parallel()`

Concurrent invocations using lightweight workflow pattern:

```python
invoke_parallel(
    prompts: list[dict],
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 4096,
    max_workers: int = 5,
    shared_system: str | list[dict] | None = None,
    cache_shared_system: bool = False
) -> list[str]
```

**Parameters:**
- `prompts`: List of dicts with 'prompt' (required) and optional 'system', 'temperature', 'cache_system', 'cache_prompt', etc.
- `model`: Claude model for all invocations
- `max_tokens`: Max tokens per response
- `max_workers`: Max concurrent API calls (default: 5, max: 10)
- `shared_system`: System context shared across ALL invocations (for cache efficiency)
- `cache_shared_system`: Add cache_control to shared_system (default: False)

**Returns:** List of response strings in same order as prompts

**Note:** For optimal cost savings, put large common context (1024+ tokens) in `shared_system` with `cache_shared_system=True`. First invocation creates cache, subsequent ones reuse it (90% cost reduction).

### `invoke_claude_streaming()`

Stream responses in real-time with optional callbacks:

```python
invoke_claude_streaming(
    prompt: str | list[dict],
    callback: callable = None,
    model: str = "claude-sonnet-4-6",
    system: str | list[dict] | None = None,
    max_tokens: int = 4096,
    temperature: float = 1.0,
    cache_system: bool = False,
    cache_prompt: bool = False,
    **kwargs
) -> str
```

**Parameters:**
- `callback`: Optional function called with each text chunk (str) as it arrives
- (other parameters same as invoke_claude)

**Returns:** Complete accumulated response text

### `invoke_parallel_streaming()`

Parallel invocations with per-agent streaming callbacks:

```python
invoke_parallel_streaming(
    prompts: list[dict],
    callbacks: list[callable] = None,
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 4096,
    max_workers: int = 5,
    shared_system: str | list[dict] | None = None,
    cache_shared_system: bool = False
) -> list[str]
```

**Parameters:**
- `callbacks`: Optional list of callback functions, one per prompt
- (other parameters same as invoke_parallel)

### `invoke_parallel_interruptible()`

Parallel invocations with cancellation support:

```python
invoke_parallel_interruptible(
    prompts: list[dict],
    interrupt_token: InterruptToken = None,
    # ... same other parameters as invoke_parallel
) -> list[str]
```

**Parameters:**
- `interrupt_token`: Optional InterruptToken to signal cancellation
- (other parameters same as invoke_parallel)

**Returns:** List of response strings (None for interrupted tasks)

### `ConversationThread`

Manages multi-turn conversations with automatic caching:

```python
thread = ConversationThread(
    system: str | list[dict] | None = None,
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 4096,
    temperature: float = 1.0,
    cache_system: bool = True
)

response = thread.send(
    user_message: str | list[dict],
    cache_history: bool = True
) -> str
```

**Methods:**
- `send(message, cache_history=True)`: Send message and get response
- `get_messages()`: Get conversation history
- `clear()`: Clear conversation history
- `__len__()`: Get number of turns

**New in 0.3.0:**
- `turn_count` property: Number of completed turn pairs
- `send_continuation(guidance, cache_history)`: Lightweight continuation turn (requires prior `send()`)
- `max_turns` constructor parameter: Optional turn limit
- `continuation_prompt` constructor parameter: Default continuation guidance

### `StallDetector`

Monitors activity timestamps and detects unresponsive operations:

```python
from claude_client import StallDetector

def handle_stall(task_id, idle_seconds):
    print(f"Task {task_id} stalled for {idle_seconds:.1f}s")

detector = StallDetector(timeout=60.0, on_stall=handle_stall)
detector.register("task-1")
detector.start_monitoring(poll_interval=5.0)

# Call heartbeat() during streaming/progress
detector.heartbeat("task-1")

# When done
detector.unregister("task-1")
detector.stop_monitoring()
```

### `TaskTracker` (task_state module)

Formal task lifecycle state machine with enforced transitions:

```python
from task_state import TaskTracker, TaskState

tracker = TaskTracker(max_retries=3)
tracker.add("task-1", category="security")

tracker.claim("task-1")    # UNCLAIMED → CLAIMED
tracker.start("task-1")    # CLAIMED → RUNNING (increments attempt)
tracker.complete("task-1")  # RUNNING → COMPLETED

# On failure with retry:
tracker.fail("task-2", error="timeout")
tracker.retry("task-2")     # FAILED → RETRY_QUEUED (if under max_retries)
tracker.claim("task-2")     # RETRY_QUEUED → CLAIMED

# Query state
tracker.active_count(category="security")
tracker.get_by_state(TaskState.RUNNING)
tracker.summary()  # {"completed": 1, "running": 1, ...}
```

### `invoke_with_retry()` (orchestration module)

Single invocation with exponential backoff:

```python
from orchestration import invoke_with_retry

response = invoke_with_retry(
    "Analyze this code...",
    max_retries=3,
    base_delay_ms=1000,   # 1s, 2s, 4s backoff
    max_delay_ms=10000,   # capped at 10s
)
```

### `invoke_parallel_managed()` (orchestration module)

Full-featured parallel invocations with all Symphony patterns:

```python
from orchestration import invoke_parallel_managed, ConcurrencyLimiter

limiter = ConcurrencyLimiter(
    global_limit=10,
    category_limits={"security": 3, "perf": 3}
)

def reconcile(prompts, tracker):
    # Filter out invalid/duplicate work before dispatch
    return [p for p in prompts if should_run(p)]

results = invoke_parallel_managed(
    prompts=[
        {"prompt": "Security review...", "task_id": "sec-1", "category": "security"},
        {"prompt": "Perf review...", "task_id": "perf-1", "category": "perf"},
    ],
    reconcile=reconcile,
    concurrency_limiter=limiter,
    max_retries=3,
    stall_timeout=60.0,
    on_stall=lambda tid, idle: print(f"{tid} stalled"),
)
```

## Example Workflows

See [references/workflows.md](references/workflows.md) for detailed examples including:
- Multi-expert code review
- Parallel document analysis
- Recursive task delegation
- Advanced Agent SDK delegation patterns
- Prompt caching workflows



## Setup

**Prerequisites:**

1. Install anthropic library:
   ```bash
   uv pip install anthropic
   ```

2. Configure API key via project knowledge file:

   **Option 1 (recommended): Individual file**
   - Create document: `ANTHROPIC_API_KEY.txt`
   - Content: Your API key (e.g., `sk-ant-api03-...`)

   **Option 2: Combined file**
   - Create document: `API_CREDENTIALS.json`
   - Content:
     ```json
     {
       "anthropic_api_key": "sk-ant-api03-..."
     }
     ```

   Get your API key: https://console.anthropic.com/settings/keys

Installation check:
```bash
python3 -c "import anthropic; print(f'✓ anthropic {anthropic.__version__}')"
```

## Error Handling

The module provides comprehensive error handling:

```python
from claude_client import invoke_claude, ClaudeInvocationError

try:
    response = invoke_claude("Your prompt here")
except ClaudeInvocationError as e:
    print(f"API Error: {e}")
    print(f"Status: {e.status_code}")
    print(f"Details: {e.details}")
except ValueError as e:
    print(f"Configuration Error: {e}")
```

Common errors:
- **API key missing**: Add ANTHROPIC_API_KEY.txt to project knowledge (see Setup above)
- **Rate limits**: Reduce max_workers or add delays
- **Token limits**: Reduce prompt size or max_tokens
- **Network errors**: Automatic retry with exponential backoff


## Prompt Caching

For detailed caching workflows and best practices, see [references/workflows.md](references/workflows.md#prompt-caching-workflows).

## Performance Considerations

**Token efficiency:**
- Parallel calls use more tokens but save wall-clock time
- Use prompt caching for shared context (90% cost reduction)
- Use concise system prompts to reduce overhead
- Consider token budgets when setting max_tokens

**Rate limits:**
- Anthropic API has per-minute rate limits
- Default max_workers=5 is safe for most tiers
- Adjust based on your API tier and rate limits

**Cost management:**
- Each invocation consumes API credits
- Monitor usage in Anthropic Console
- Use smaller models (haiku) for simple tasks
- Use prompt caching for repeated context (90% savings)
- Cache lifetime: 5 minutes, refreshed on each use

## Best Practices

1. **Use parallel invocations for independent tasks only**
   - Don't parallelize sequential dependencies
   - Each parallel task should be self-contained

2. **Set appropriate system prompts**
   - Define clear roles/expertise for each instance
   - Keeps responses focused and relevant

3. **Handle errors gracefully**
   - Always wrap invocations in try-except
   - Provide fallback behavior for failures

4. **Test with small batches first**
   - Verify prompts work before scaling
   - Check token usage and costs

5. **Consider alternatives**
   - Not all tasks benefit from multiple instances
   - Sometimes sequential with context is better

## Token Efficiency

This skill uses ~800 tokens when loaded but enables powerful multi-agent patterns that can dramatically improve complex analysis quality and speed.

## See Also

- [references/api-reference.md](references/api-reference.md) - Detailed API documentation
- [Anthropic API Docs](https://docs.anthropic.com/claude/reference) - Official documentation

More from oaustegard/claude-skills

SkillDescription
accessing-github-reposGitHub repository access in containerized environments using REST API and credential detection. Use when git clone fails, or when accessing private repos/writing files via API.
api-credentialsSecurely manages API credentials for multiple providers (Anthropic Claude, Google Gemini, GitHub). Use when skills need to access stored API keys for external service invocations.
asking-questionsGuidance for asking clarifying questions when user requests are ambiguous, have multiple valid approaches, or require critical decisions. Use when implementation choices exist that could significantly affect outcomes.
browsing-blueskyBrowse Bluesky content via API and firehose - search posts, fetch user activity, sample trending topics, read feeds and lists, analyze and categorize accounts. Supports authenticated access for personalized feeds. Use for Bluesky research, user monitoring, trend analysis, feed reading, firehose sampling, account categorization.
building-github-indexGenerate progressive disclosure indexes for GitHub repositories to use as Claude project knowledge. Use when setting up projects referencing external documentation, creating searchable indexes of technical blogs or knowledge bases, combining multiple repos into one index, or when user mentions "index", "github repo", "project knowledge", or "documentation reference".
categorizing-bsky-accountsAnalyze and categorize Bluesky accounts by topic using keyword extraction. Use when users mention Bluesky account analysis, following/follower lists, topic discovery, account curation, or network analysis.
chartingSelect the right Python charting library (seaborn, matplotlib, graphviz) and produce publication-quality static visualizations. Use when creating charts, plots, graphs, diagrams, heatmaps, visualizations from data, or when choosing between matplotlib/seaborn/graphviz. Also triggers for network diagrams, flowcharts, dependency trees, state machines, and entity-relationship diagrams. For interactive browser-rendered charts or uploaded data exploration, defer to charting-vega-lite instead.
charting-vega-liteCreate interactive data visualizations using Vega-Lite declarative JSON grammar. Supports 20+ chart types (bar, line, scatter, histogram, boxplot, grouped/stacked variations, etc.) via templates and programmatic builders. Use when users upload data for charting, request specific chart types, or mention visualizations. Produces portable JSON specs with inline data islands that work in Claude artifacts and can be adapted for production.
check-toolsValidates development tool installations across Python, Node.js, Java, Go, Rust, C/C++, Git, and system utilities. Use when verifying environments or troubleshooting dependencies.
cloning-projectExports project instructions and knowledge files from the current Claude project. Use when users want to clone, copy, backup, or export a project's configuration and files.