pyats-dynamic-test
$
npx mdskill add automateyournetwork/netclaw/pyats-dynamic-testGenerate and execute pyATS aetest scripts for network validation
- Validate interface state, OSPF, BGP, and custom compliance checks
- Uses pyATS and aetest frameworks with Python3 and environment variables
- Executes user-provided Python scripts in a sandboxed environment
- Returns structured test results with pass/fail status and logs
SKILL.md
.github/skills/pyats-dynamic-testView on GitHub ↗
---
name: pyats-dynamic-test
description: "Generate and execute deterministic pyATS aetest validation scripts - interface state, OSPF neighbors, BGP paths, ping matrices, and custom compliance tests. Use when writing a network test, validating post-change state, running pass/fail checks, or building automated regression tests."
license: Apache-2.0
user-invocable: true
metadata:
{ "openclaw": { "requires": { "bins": ["python3"], "env": ["PYATS_TESTBED_PATH"] } } }
---
# Dynamic pyATS Test Execution
## When to Use
- Automated pass/fail validation after configuration changes
- Compliance checks that require multi-step assertions
- Regression testing before and after maintenance windows
- Data-driven validation across multiple interfaces, neighbors, or routes
- Any scenario where you need a formal test report with PASSED/FAILED verdicts
## How the Tool Works
The `pyats_run_dynamic_test` tool accepts a **complete Python aetest script as a string**. The script is executed in a sandboxed environment with a 300-second timeout.
```bash
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_dynamic_test '{"test_script":"<FULL PYTHON SOURCE CODE>"}'
```
The `test_script` parameter takes the **entire Python source code** as a single JSON string. Newlines are encoded as `\n`, quotes as `\"`.
## Script Template Structure
Every aetest script follows this structure:
```
1. Imports (logging, aetest)
2. TEST_DATA dict (all expected values as a Python literal)
3. CommonSetup class (connect to devices)
4. Testcase class(es) (test methods with assertions)
5. CommonCleanup class (disconnect)
6. if __name__ == "__main__": aetest.main()
```
### Rules for TEST_DATA
- Must be a **Python dict literal** defined at module level
- Contains all expected values the test will validate against
- Cannot be loaded from files, network, or environment variables
- Keeps the test deterministic and self-documenting
### Rules for the Script
- **Banned imports:** os, sys, subprocess, shutil, socket, pathlib, pickle, yaml, requests, urllib, http, ssl
- **Banned functions:** `__import__()`, `eval()`, `exec()`, `compile()`, `open()`, `json.loads()`
- The script cannot connect to devices directly; embed all validation data inline in TEST_DATA
- Use `logger.info()` for verbose output and `self.failed()` or `assert` for test verdicts
## Example 1: Interface State Validation
Verify that all expected interfaces are in up/up state.
```bash
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_dynamic_test '{"test_script":"import logging\nfrom pyats import aetest\n\nlogger = logging.getLogger(__name__)\n\nTEST_DATA = {\n \"device\": \"R1\",\n \"expected_interfaces\": {\n \"GigabitEthernet1\": {\"status\": \"up\", \"protocol\": \"up\"},\n \"GigabitEthernet2\": {\"status\": \"up\", \"protocol\": \"up\"},\n \"GigabitEthernet3\": {\"status\": \"up\", \"protocol\": \"up\"},\n \"Loopback0\": {\"status\": \"up\", \"protocol\": \"up\"}\n }\n}\n\nclass CommonSetup(aetest.CommonSetup):\n @aetest.subsection\n def connect_to_device(self, testbed):\n device = testbed.devices[TEST_DATA[\"device\"]]\n device.connect(learn_hostname=True, log_stdout=False)\n self.parent.parameters[\"device\"] = device\n\nclass InterfaceStateValidation(aetest.Testcase):\n @aetest.setup\n def gather_interface_data(self):\n device = self.parent.parameters[\"device\"]\n self.parsed = device.parse(\"show ip interface brief\")\n logger.info(\"Parsed interface data: %s\", self.parsed)\n\n @aetest.test\n def verify_interface_status(self):\n interfaces = self.parsed.get(\"interface\", {})\n failed_interfaces = []\n for intf_name, expected in TEST_DATA[\"expected_interfaces\"].items():\n if intf_name not in interfaces:\n failed_interfaces.append(f\"{intf_name}: NOT FOUND in device output\")\n continue\n actual = interfaces[intf_name]\n actual_status = actual.get(\"status\", \"unknown\")\n actual_protocol = actual.get(\"protocol\", \"unknown\")\n if actual_status != expected[\"status\"] or actual_protocol != expected[\"protocol\"]:\n failed_interfaces.append(\n f\"{intf_name}: expected {expected[\"status\"]}/{expected[\"protocol\"]}, \"\n f\"got {actual_status}/{actual_protocol}\"\n )\n else:\n logger.info(\"PASS: %s is %s/%s\", intf_name, actual_status, actual_protocol)\n if failed_interfaces:\n self.failed(\"Interface state failures:\\n\" + \"\\n\".join(failed_interfaces))\n else:\n self.passed(\"All interfaces in expected state\")\n\nclass CommonCleanup(aetest.CommonCleanup):\n @aetest.subsection\n def disconnect(self, testbed):\n for device in testbed.devices.values():\n if device.connected:\n device.disconnect()\n\nif __name__ == \"__main__\":\n aetest.main()"}'
```
**What this tests:**
- Connects to R1 via the testbed
- Parses `show ip interface brief` with Genie
- Checks each expected interface exists and is up/up
- Reports per-interface PASS/FAIL with details
## Example 2: OSPF Neighbor Validation
Verify that expected OSPF neighbors are present and in FULL state.
```bash
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_dynamic_test '{"test_script":"import logging\nfrom pyats import aetest\n\nlogger = logging.getLogger(__name__)\n\nTEST_DATA = {\n \"device\": \"R1\",\n \"ospf_process\": \"1\",\n \"expected_neighbors\": [\n {\"neighbor_id\": \"2.2.2.2\", \"interface\": \"GigabitEthernet1\", \"state\": \"FULL\"},\n {\"neighbor_id\": \"3.3.3.3\", \"interface\": \"GigabitEthernet2\", \"state\": \"FULL\"}\n ]\n}\n\nclass CommonSetup(aetest.CommonSetup):\n @aetest.subsection\n def connect_to_device(self, testbed):\n device = testbed.devices[TEST_DATA[\"device\"]]\n device.connect(learn_hostname=True, log_stdout=False)\n self.parent.parameters[\"device\"] = device\n\nclass OSPFNeighborValidation(aetest.Testcase):\n @aetest.setup\n def gather_ospf_data(self):\n device = self.parent.parameters[\"device\"]\n self.parsed = device.parse(\"show ip ospf neighbor\")\n logger.info(\"OSPF neighbor data: %s\", self.parsed)\n\n @aetest.test\n def verify_neighbor_count(self):\n interfaces = self.parsed.get(\"interfaces\", {})\n total_neighbors = sum(\n len(nbrs.get(\"neighbors\", {})) for nbrs in interfaces.values()\n )\n expected_count = len(TEST_DATA[\"expected_neighbors\"])\n if total_neighbors < expected_count:\n self.failed(\n f\"Expected at least {expected_count} OSPF neighbors, found {total_neighbors}\"\n )\n else:\n self.passed(f\"Found {total_neighbors} OSPF neighbors (expected {expected_count})\")\n\n @aetest.test\n def verify_each_neighbor(self):\n interfaces = self.parsed.get(\"interfaces\", {})\n failures = []\n for expected in TEST_DATA[\"expected_neighbors\"]:\n intf = expected[\"interface\"]\n nbr_id = expected[\"neighbor_id\"]\n if intf not in interfaces:\n failures.append(f\"{intf}: interface not found in OSPF output\")\n continue\n neighbors = interfaces[intf].get(\"neighbors\", {})\n if nbr_id not in neighbors:\n failures.append(f\"{nbr_id} on {intf}: neighbor not found\")\n continue\n actual_state = neighbors[nbr_id].get(\"state\", \"unknown\")\n if expected[\"state\"] not in actual_state.upper():\n failures.append(\n f\"{nbr_id} on {intf}: expected {expected[\"state\"]}, got {actual_state}\"\n )\n else:\n logger.info(\"PASS: %s on %s is %s\", nbr_id, intf, actual_state)\n if failures:\n self.failed(\"OSPF neighbor failures:\\n\" + \"\\n\".join(failures))\n else:\n self.passed(\"All expected OSPF neighbors verified\")\n\nclass CommonCleanup(aetest.CommonCleanup):\n @aetest.subsection\n def disconnect(self, testbed):\n for device in testbed.devices.values():\n if device.connected:\n device.disconnect()\n\nif __name__ == \"__main__\":\n aetest.main()"}'
```
**What this tests:**
- Parses `show ip ospf neighbor` with Genie structured parser
- Verifies total neighbor count meets minimum expected
- Checks each expected neighbor by router ID, interface, and adjacency state
- Distinguishes between missing neighbors and wrong-state neighbors
## Example 3: BGP Path Validation
Verify that a specific route exists in the BGP table with the expected next-hop and AS path.
```bash
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_dynamic_test '{"test_script":"import logging\nfrom pyats import aetest\n\nlogger = logging.getLogger(__name__)\n\nTEST_DATA = {\n \"device\": \"R1\",\n \"expected_routes\": [\n {\n \"prefix\": \"10.0.0.0/8\",\n \"next_hop\": \"10.1.1.2\",\n \"as_path\": \"65002\",\n \"origin\": \"IGP\"\n },\n {\n \"prefix\": \"172.16.0.0/16\",\n \"next_hop\": \"10.1.1.2\",\n \"as_path\": \"65002 65003\",\n \"origin\": \"IGP\"\n }\n ]\n}\n\nclass CommonSetup(aetest.CommonSetup):\n @aetest.subsection\n def connect_to_device(self, testbed):\n device = testbed.devices[TEST_DATA[\"device\"]]\n device.connect(learn_hostname=True, log_stdout=False)\n self.parent.parameters[\"device\"] = device\n\nclass BGPPathValidation(aetest.Testcase):\n @aetest.setup\n def gather_bgp_data(self):\n device = self.parent.parameters[\"device\"]\n self.parsed = device.parse(\"show ip bgp\")\n logger.info(\"BGP table parsed successfully\")\n\n @aetest.test\n def verify_bgp_routes(self):\n vrf_default = self.parsed.get(\"vrf\", {}).get(\"default\", {})\n address_family = vrf_default.get(\"address_family\", {}).get(\"ipv4 unicast\", {})\n prefixes = address_family.get(\"prefixes\", {})\n failures = []\n for route in TEST_DATA[\"expected_routes\"]:\n prefix = route[\"prefix\"]\n if prefix not in prefixes:\n failures.append(f\"{prefix}: NOT FOUND in BGP table\")\n continue\n paths = prefixes[prefix].get(\"index\", {})\n found_match = False\n for idx, path in paths.items():\n nh = path.get(\"next_hop\", \"\")\n if nh == route[\"next_hop\"]:\n found_match = True\n logger.info(\"PASS: %s via %s found in BGP table\", prefix, nh)\n break\n if not found_match:\n failures.append(\n f\"{prefix}: expected next-hop {route[\"next_hop\"]}, not found in any path\"\n )\n if failures:\n self.failed(\"BGP path failures:\\n\" + \"\\n\".join(failures))\n else:\n self.passed(\"All expected BGP routes verified\")\n\nclass CommonCleanup(aetest.CommonCleanup):\n @aetest.subsection\n def disconnect(self, testbed):\n for device in testbed.devices.values():\n if device.connected:\n device.disconnect()\n\nif __name__ == \"__main__\":\n aetest.main()"}'
```
**What this tests:**
- Parses the full BGP table with Genie
- Navigates the VRF/address-family/prefix hierarchy
- Checks each expected prefix exists with the correct next-hop
- Reports missing prefixes and next-hop mismatches separately
## Example 4: Ping Reachability Matrix
Execute a ping matrix between device pairs and validate reachability.
```bash
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_dynamic_test '{"test_script":"import logging\nfrom pyats import aetest\n\nlogger = logging.getLogger(__name__)\n\nTEST_DATA = {\n \"ping_matrix\": [\n {\"device\": \"R1\", \"destination\": \"10.1.1.2\", \"source\": \"GigabitEthernet1\", \"min_success\": 80},\n {\"device\": \"R1\", \"destination\": \"10.2.2.2\", \"source\": \"GigabitEthernet2\", \"min_success\": 80},\n {\"device\": \"R1\", \"destination\": \"8.8.8.8\", \"source\": \"GigabitEthernet1\", \"min_success\": 100},\n {\"device\": \"R1\", \"destination\": \"1.1.1.1\", \"source\": \"Loopback0\", \"min_success\": 100}\n ]\n}\n\nclass CommonSetup(aetest.CommonSetup):\n @aetest.subsection\n def connect_all_devices(self, testbed):\n devices = {}\n for entry in TEST_DATA[\"ping_matrix\"]:\n dev_name = entry[\"device\"]\n if dev_name not in devices:\n device = testbed.devices[dev_name]\n device.connect(learn_hostname=True, log_stdout=False)\n devices[dev_name] = device\n self.parent.parameters[\"devices\"] = devices\n\nclass PingReachabilityMatrix(aetest.Testcase):\n @aetest.test\n def execute_ping_matrix(self):\n devices = self.parent.parameters[\"devices\"]\n results = []\n failures = []\n for entry in TEST_DATA[\"ping_matrix\"]:\n device = devices[entry[\"device\"]]\n dst = entry[\"destination\"]\n src = entry.get(\"source\", \"\")\n min_pct = entry[\"min_success\"]\n try:\n if src:\n ping_result = device.ping(dst, source=src, count=5, timeout=10)\n else:\n ping_result = device.ping(dst, count=5, timeout=10)\n logger.info(\n \"PASS: %s -> %s from %s: reachable\",\n entry[\"device\"], dst, src or \"default\"\n )\n results.append({\n \"device\": entry[\"device\"],\n \"destination\": dst,\n \"status\": \"PASS\"\n })\n except Exception as e:\n msg = f\"{entry[\"device\"]} -> {dst} from {src or \"default\"}: FAILED ({str(e)[:80]})\"\n logger.error(msg)\n failures.append(msg)\n results.append({\n \"device\": entry[\"device\"],\n \"destination\": dst,\n \"status\": \"FAIL\"\n })\n logger.info(\"\\n=== PING MATRIX RESULTS ===\")\n for r in results:\n logger.info(\"%s -> %s: %s\", r[\"device\"], r[\"destination\"], r[\"status\"])\n if failures:\n self.failed(\n f\"{len(failures)}/{len(TEST_DATA[\"ping_matrix\"])} pings failed:\\n\"\n + \"\\n\".join(failures)\n )\n else:\n self.passed(f\"All {len(results)} pings successful\")\n\nclass CommonCleanup(aetest.CommonCleanup):\n @aetest.subsection\n def disconnect_all(self, testbed):\n for device in testbed.devices.values():\n if device.connected:\n device.disconnect()\n\nif __name__ == \"__main__\":\n aetest.main()"}'
```
**What this tests:**
- Connects to all unique devices referenced in the ping matrix
- Executes pings from each device to each destination with optional source interface
- Collects pass/fail results into a summary matrix
- Fails the test if any ping drops below the minimum success threshold
## Writing Your Own Test Scripts
### Step 1: Collect Data First
Before writing a dynamic test, collect the device data you need using the other pyATS tools:
```bash
# Gather current state
PYATS_TESTBED_PATH=$PYATS_TESTBED_PATH python3 $MCP_CALL "python3 -u $PYATS_MCP_SCRIPT" pyats_run_show_command '{"device_name":"R1","command":"show ip ospf neighbor"}'
```
### Step 2: Build TEST_DATA from Collected Output
Use the collected output to populate the `TEST_DATA` dictionary with the expected values. This makes the test deterministic -- it validates that the network matches the known-good state.
### Step 3: Write the Script Following the Template
```python
import logging
from pyats import aetest
logger = logging.getLogger(__name__)
TEST_DATA = {
# All expected values go here as Python literals
}
class CommonSetup(aetest.CommonSetup):
@aetest.subsection
def connect_to_device(self, testbed):
device = testbed.devices["R1"]
device.connect(learn_hostname=True, log_stdout=False)
self.parent.parameters["device"] = device
class YourTestcase(aetest.Testcase):
@aetest.setup
def gather_data(self):
device = self.parent.parameters["device"]
self.parsed = device.parse("show COMMAND")
@aetest.test
def verify_something(self):
# Compare self.parsed against TEST_DATA
# Use self.passed(), self.failed(), or assert statements
pass
class CommonCleanup(aetest.CommonCleanup):
@aetest.subsection
def disconnect(self, testbed):
for device in testbed.devices.values():
if device.connected:
device.disconnect()
if __name__ == "__main__":
aetest.main()
```
### Step 4: Serialize and Invoke
Convert the script to a single-line JSON string (replace newlines with `\n`, escape quotes) and pass it as the `test_script` parameter.
## Interpreting Results
The tool returns the aetest execution results:
- **Passed** -- All assertions succeeded, the network state matches TEST_DATA
- **Failed** -- One or more assertions did not match; the failure message explains what was wrong
- **Errored** -- The script itself had a runtime error (syntax error, import violation, timeout)
- **Blocked** -- CommonSetup failed (could not connect to device), so testcases were skipped
## Integration with Other Skills
- Use **pyats-health-check** to collect device state, then write dynamic tests to formalize pass/fail criteria for that state
- Use **pyats-config-mgmt** Phase 4 (post-change verification) to trigger dynamic tests after configuration changes
- Use **pyats-routing** data to build BGP/OSPF validation tests with real expected values
- Use **pyats-parallel-ops** to run dynamic tests as part of fleet-wide validation sweeps
More from automateyournetwork/netclaw
- aap-automationRed Hat Ansible Automation Platform — inventory management, job template execution, project SCM sync, ad-hoc commands, host management, Galaxy content discovery. Use when automating infrastructure with Ansible, running playbooks, managing inventories, or searching for Ansible collections and roles.
- aap-edaEvent-Driven Ansible (EDA) — activation lifecycle, rulebook management, decision environments, event stream monitoring. Use when managing event-driven automation triggers, enabling/disabling activations, or reviewing EDA rulebooks.
- aap-lintansible-lint playbook and role validation — syntax checking, best practice enforcement, project-wide analysis, rule filtering. Use when validating Ansible playbooks, checking code quality, or enforcing automation best practices before deployment.
- aci-change-deploySafe ACI policy change deployment - ServiceNow CR lifecycle, pre/post-change fault baselines, APIC policy application, automatic rollback on fault delta, and GAIT audit trail. Use when deploying ACI policy changes, creating tenants or EPGs, pushing config to APIC, or running a change window with rollback protection.
- aci-fabric-auditComprehensive Cisco ACI fabric health audit - node status, tenant/VRF/BD/EPG policy review, contract analysis, fault triage, and endpoint learning verification. Use when auditing ACI fabric health, checking for faults, reviewing tenant policies, or running pre/post-change baselines on APIC.
- arista-cvpArista CloudVision Portal (CVP) automation via REST API — device inventory, events, connectivity monitoring, tag management (4 tools). Use when managing Arista devices, checking CloudVision events, monitoring network connectivity probes, or tagging devices in CVP.
- aruba-cx-configView and manage Aruba CX switch configurations, perform ISSU upgrades, and firmware operations
- aruba-cx-interfacesMonitor Aruba CX switch interface status, LLDP neighbors, and optical transceiver health
- aruba-cx-switchingView and manage Aruba CX switch VLANs and MAC address tables for Layer 2 operations
- aruba-cx-systemDiscover Aruba CX switch system information, firmware versions, and VSF topology