enfinitos-sdk-robotics (Python)
EnfinitOS reference SDK for the ROBOTICS substrate — Python edition.
Python is the most-used language in real robotic fleets — Starship, Nuro, Refraction, and ROS 2 nodes are all Python-heavy. This is the SDK most customers will start with.
The Python SDK has the same surface and contract as the TypeScript SDK at packages/sdks/robotics-ts/; the two are kept in sync. See the TS README for the long-form research synthesis, architecture diagram, and adapter pattern — duplicated below in condensed form so a Python-only reader has everything in one place.
Platform-side counterpart. This SDK consumes the platform's/v1/robotics/connectWebSocket endpoint. That endpoint is a separate engineering item. The SDK targets the documented wire contract insrc/enfinitos_robotics/types.py; the platform-side service to accept these connections is its own tranche.
Architecture
┌──────────────────────────┐
│ EnfinitOS Platform │
│ (rights, policy, audit) │
└────────────┬─────────────┘
│ WebSocket (JWT in handshake)
│
┌────────────── (1) policy_push ────▼─────────────┐
│ │
│ ┌────────────────────────────────────┐ │
│ │ EnfinitOSRobotClient │ │
│ │ Lifecycle | Policy | Reporter | E-stop │
│ └────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────┘
│ ▲
(2) policy_ack │ │ (4) emergency_stop
(3) behavioural │ │ (priority channel —
event │ │ never queued)
(5) telemetry │ │
▼ │
┌──────────────────────────────────┴───────────────┐
│ Adapter (ROS 2 / VDA 5050 / │
│ Starship-style / custom) │
└──────────────────────────────────────────────────┘
Research synthesis — why this shape
The SDK borrows from five existing patterns:
| Pattern source | What we adopted |
|---|---|
| ROS 2 managed-node lifecycle | Five-state machine; ERROR must convalesce through SUSPENDED (ISO 10218). |
| VDA 5050 (Toyota / Linde / BMW) | Priority channel for emergency stops; our send_immediate() is the instantActions equivalent. |
| Starship Technologies (sidewalk delivery) | REST + WebSocket telemetry @ 1–5 Hz; remote-operator handoff via state transition. |
| Boston Dynamics Spot SDK | E-Stop never queued behind other traffic. |
| Nuro / Refraction AI / Cobalt | Rules-based geofencing + remote oversight ⇒ exclusion-zone reporting + forced flag for safety-system overrides. |
Getting started
Install
pip install enfinitos-sdk-robotics
Five-minute hello-world
import asyncio
from datetime import datetime, timezone
from enfinitos_robotics import (
EnfinitOSRobotClient,
EnfinitOSRobotClientOptions,
GeoCoordinate,
HeartbeatOptions,
TelemetryReport,
)
async def main() -> None:
client = EnfinitOSRobotClient(
EnfinitOSRobotClientOptions(
org_id="org_acme",
robot_id="robot_001",
fleet_id="fleet_main",
auth_token="<JWT>",
api_base_url="https://api.enfinitos.com",
)
)
# 1) wire policy receipt
def on_policy(policy):
print(f"policy v{policy.version} with {len(policy.rules)} rules")
asyncio.create_task(client.acknowledge_policy(policy.version))
client.subscribe_policy(on_policy)
# 2) wire emergency stop receipt
def on_estop(cmd):
print(f"E-STOP from {cmd.issuedBy}: {cmd.reason}")
# halt motion, transition to SUSPENDED.
client.on_emergency_stop_received(on_estop)
# 3) connect
await client.connect()
# 4) heartbeat
client.start_telemetry_heartbeat(
HeartbeatOptions(
interval_ms=1000,
build_report=lambda: TelemetryReport(
reportedAt=datetime.now(timezone.utc)
.isoformat()
.replace("+00:00", "Z"),
location=GeoCoordinate(lat=51.5, lng=-0.1),
batteryPct=87.0,
speedKph=5.4,
headingDeg=90.0,
state=client.get_state(),
),
)
)
await asyncio.sleep(30)
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Lifecycle states
| State | What it means | Reachable from |
|---|---|---|
CONFIGURED | Connected and authenticated; awaiting first policy. | (initial) |
ACTIVE | Has a policy and is executing missions. | CONFIGURED, SUSPENDED |
SUSPENDED | Paused (operator intervention, low battery, soft fault). | CONFIGURED, ACTIVE, ERROR |
ERROR | Hard fault. Must transition through SUSPENDED before re-activating. | (any non-terminal) |
TERMINATED | Terminal. Client must be reconstructed to reconnect. | (any non-terminal) |
The ERROR → SUSPENDED requirement is intentional: it matches ISO 10218 robotics-safety guidance, forcing an operator to acknowledge the fault before the robot can move again.
Policy update flow
PLATFORM ROBOT (SDK)
│ │
│ policy_push { version: 7, rules: […] } │
│ ─────────────────────────────────────────► │
│ │ ─┐ apply()
│ │ │ fan out to
│ │ │ subscribers
│ │ ◄┘
│ │
│ policy_ack { policyVersion: 7, │
│ robotState: "ACTIVE", ackedAt: … } │
│ ◄───────────────────────────────────────── │
The platform considers a policy "live" only after the robot acks. Out-of-order pushes are silently dropped by the resolver; stale ack attempts are also dropped client-side.
Behavioural rule kinds
The eleven BehaviourRule variants (mirroring the platform's operational schema at apps/api/src/modules/rights/contracts/scope.ts):
| Kind | Convenience helper |
|---|---|
YIELD_TO_PEDESTRIANS | report_yield_event(...) |
MAX_SPEED_KPH | report_speed_exceedance(...) |
EXCLUSION_ZONE | report_exclusion_zone_entry(...) |
OPERATING_HOURS | report_behavioural_event(...) (generic) |
REQUIRE_HUMAN_OPERATOR | generic |
DONT_DISPLAY_WHILE_MOVING / DRIVER_ATTENTION_REQUIRED / INTEGRATE_WITH_VEHICLE_DISTRACTION_SYSTEM / PASSENGER_DISPLAY_ONLY / REQUIRE_HANDS_FREE | generic — AUTOMOTIVE substrate cross-over |
CUSTOM | generic |
Event types: rule_fired, rule_violated, rule_skipped, rule_unknown.
Adapter pattern
Each fleet ships an adapter that implements four protocols in src/enfinitos_robotics/adapters/_template.py:
| Protocol | Responsibility |
|---|---|
FleetCommandSink | apply EnfinitOS policy onto the fleet's native command vocabulary |
FleetEventSource | translate native fleet events into BehaviouralEvents |
FleetTelemetrySource | build TelemetryReports from native pose/battery/sensors |
FleetLifecycleHook | optional — react to lifecycle transitions |
Skeletons shipped:
adapters/ros2.py— ROS 2 viarclpy, maps onto managed-node lifecycle, latched topics for policy publication.adapters/vda5050.py— VDA 5050 v2 via MQTT,instantActionsfor most policy rules.
Telemetry pattern
Recommended cadence: 1 Hz for sidewalk delivery and warehouse AGVs; up to 5 Hz for autonomous-vehicle telemetry. The platform throttles ingest above 10 Hz to protect the audit ledger.
TelemetryReport shape: location, battery, speed, heading, state, free-form sensorHealth. Keep sensorHealth keys stable across reports so the SRE dashboard's time series stay coherent.
Emergency stop semantics
- Priority channel.
send_immediate()bypasses the normal outbound queue. An emergency stop fires even if a policy ack or telemetry frame is mid-send. - Idempotent.
commandIdis the dedup key. - Never queued behind reconnect backoff. If the WS is down when
emergency_stop()is called, the message sits at the head of the priority queue and is the FIRST thing flushed when the socket re-opens. - Lifecycle. Robot-side
emergency_stop()auto-transitions to SUSPENDED. Inbound emergency stops do NOT auto-transition.
Sample integration — minimal viable robot
import asyncio
from datetime import datetime, timezone
from enfinitos_robotics import (
EnfinitOSRobotClient,
EnfinitOSRobotClientOptions,
GeoCoordinate,
TelemetryReport,
YieldEventDetail,
)
async def main() -> None:
client = EnfinitOSRobotClient(
EnfinitOSRobotClientOptions(
org_id="org_acme",
robot_id="robot_001",
fleet_id="fleet_main",
auth_token="<JWT>",
api_base_url="https://api.enfinitos.com",
)
)
def on_policy(policy):
print(f"got policy v{policy.version}")
asyncio.create_task(client.acknowledge_policy(policy.version))
client.subscribe_policy(on_policy)
await client.connect()
# one heartbeat
await client.report_telemetry(
TelemetryReport(
reportedAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
location=GeoCoordinate(lat=51.5, lng=-0.1),
batteryPct=87.0,
speedKph=0.0,
headingDeg=90.0,
state=client.get_state(),
)
)
# one behavioural event
await client.report_yield_event(
YieldEventDetail(
occurredAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
location=GeoCoordinate(lat=51.5, lng=-0.1),
detectedPedestrians=1,
closestDistanceM=1.8,
)
)
await asyncio.sleep(0.5)
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
What's SDK-side vs platform-side
| Concern | This SDK | Platform tranche (separate) |
|---|---|---|
Wire protocol (WireMessage) | yes — defined here | yes — implemented by platform-side server |
/v1/robotics/connect WS endpoint | no | pending — separate work item |
| Audit-log persistence | no | pending — separate work item |
| JWT issuance | no | reuses existing /auth/* plane |
| Policy authoring UI | no | already shipped in apps/web/.../compose |
| Adapters (ROS 2, VDA 5050, custom) | yes — skeletons | (customer-owned) |
See also
apps/api/src/modules/rights/contracts/scope.ts—BehaviourRulesource-of-truth on the platform side.packages/sdks/robotics-ts/— TypeScript twin of this SDK.docs/launch/substrate-readiness-matrix.md— ROBOTICS substrate readiness tracker.