EnfinitOSEnfinitOS
DevelopersRobotics & flight
Production-ready scaffold

Robotics SDK — Python

The robotics SDK most fleets will start with — ROS 2, Starship, Nuro, Refraction are all Python-heavy.

enfinitos-sdk-roboticsSubstrate ROBOTICSPython
Install

Get the SDK

pip install enfinitos-sdk-robotics

About this status badge

Typed, tested, documented, and grounded in the 2026 platform reality. Awaiting first customer-integration validation.

README

The developer-facing documentation in full

Rendered from packages/sdks/robotics-py/README.md at build time — the same source the package ships with.

enfinitos-sdk-robotics (Python)

EnfinitOS reference SDK for the ROBOTICS substrate — Python edition.

Python is the most-used language in real robotic fleets — Starship, Nuro, Refraction, and ROS 2 nodes are all Python-heavy. This is the SDK most customers will start with.

The Python SDK has the same surface and contract as the TypeScript SDK at packages/sdks/robotics-ts/; the two are kept in sync. See the TS README for the long-form research synthesis, architecture diagram, and adapter pattern — duplicated below in condensed form so a Python-only reader has everything in one place.

Platform-side counterpart. This SDK consumes the platform's /v1/robotics/connect WebSocket endpoint. That endpoint is a separate engineering item. The SDK targets the documented wire contract in src/enfinitos_robotics/types.py; the platform-side service to accept these connections is its own tranche.

Architecture

                          ┌──────────────────────────┐
                          │   EnfinitOS Platform     │
                          │  (rights, policy, audit) │
                          └────────────┬─────────────┘
                                       │  WebSocket (JWT in handshake)
                                       │
   ┌────────────── (1) policy_push ────▼─────────────┐
   │                                                  │
   │    ┌────────────────────────────────────┐        │
   │    │      EnfinitOSRobotClient          │        │
   │    │  Lifecycle | Policy | Reporter | E-stop     │
   │    └────────────────────────────────────┘        │
   │                                                  │
   └──────────────────────────────────────────────────┘
                   │                  ▲
   (2) policy_ack  │                  │ (4) emergency_stop
   (3) behavioural │                  │     (priority channel —
       event       │                  │      never queued)
   (5) telemetry   │                  │
                   ▼                  │
   ┌──────────────────────────────────┴───────────────┐
   │           Adapter (ROS 2 / VDA 5050 /            │
   │            Starship-style / custom)              │
   └──────────────────────────────────────────────────┘

Research synthesis — why this shape

The SDK borrows from five existing patterns:

Pattern sourceWhat we adopted
ROS 2 managed-node lifecycleFive-state machine; ERROR must convalesce through SUSPENDED (ISO 10218).
VDA 5050 (Toyota / Linde / BMW)Priority channel for emergency stops; our send_immediate() is the instantActions equivalent.
Starship Technologies (sidewalk delivery)REST + WebSocket telemetry @ 1–5 Hz; remote-operator handoff via state transition.
Boston Dynamics Spot SDKE-Stop never queued behind other traffic.
Nuro / Refraction AI / CobaltRules-based geofencing + remote oversight ⇒ exclusion-zone reporting + forced flag for safety-system overrides.

Getting started

Install

pip install enfinitos-sdk-robotics

Five-minute hello-world

import asyncio
from datetime import datetime, timezone

from enfinitos_robotics import (
    EnfinitOSRobotClient,
    EnfinitOSRobotClientOptions,
    GeoCoordinate,
    HeartbeatOptions,
    TelemetryReport,
)


async def main() -> None:
    client = EnfinitOSRobotClient(
        EnfinitOSRobotClientOptions(
            org_id="org_acme",
            robot_id="robot_001",
            fleet_id="fleet_main",
            auth_token="<JWT>",
            api_base_url="https://api.enfinitos.com",
        )
    )

    # 1) wire policy receipt
    def on_policy(policy):
        print(f"policy v{policy.version} with {len(policy.rules)} rules")
        asyncio.create_task(client.acknowledge_policy(policy.version))

    client.subscribe_policy(on_policy)

    # 2) wire emergency stop receipt
    def on_estop(cmd):
        print(f"E-STOP from {cmd.issuedBy}: {cmd.reason}")
        # halt motion, transition to SUSPENDED.

    client.on_emergency_stop_received(on_estop)

    # 3) connect
    await client.connect()

    # 4) heartbeat
    client.start_telemetry_heartbeat(
        HeartbeatOptions(
            interval_ms=1000,
            build_report=lambda: TelemetryReport(
                reportedAt=datetime.now(timezone.utc)
                .isoformat()
                .replace("+00:00", "Z"),
                location=GeoCoordinate(lat=51.5, lng=-0.1),
                batteryPct=87.0,
                speedKph=5.4,
                headingDeg=90.0,
                state=client.get_state(),
            ),
        )
    )

    await asyncio.sleep(30)
    await client.disconnect()


if __name__ == "__main__":
    asyncio.run(main())

Lifecycle states

StateWhat it meansReachable from
CONFIGUREDConnected and authenticated; awaiting first policy.(initial)
ACTIVEHas a policy and is executing missions.CONFIGURED, SUSPENDED
SUSPENDEDPaused (operator intervention, low battery, soft fault).CONFIGURED, ACTIVE, ERROR
ERRORHard fault. Must transition through SUSPENDED before re-activating.(any non-terminal)
TERMINATEDTerminal. Client must be reconstructed to reconnect.(any non-terminal)

The ERROR → SUSPENDED requirement is intentional: it matches ISO 10218 robotics-safety guidance, forcing an operator to acknowledge the fault before the robot can move again.

Policy update flow

   PLATFORM                                     ROBOT (SDK)
       │                                            │
       │   policy_push { version: 7, rules: […] }   │
       │ ─────────────────────────────────────────► │
       │                                            │ ─┐  apply()
       │                                            │  │  fan out to
       │                                            │  │  subscribers
       │                                            │ ◄┘
       │                                            │
       │     policy_ack { policyVersion: 7,         │
       │       robotState: "ACTIVE", ackedAt: … }   │
       │ ◄───────────────────────────────────────── │

The platform considers a policy "live" only after the robot acks. Out-of-order pushes are silently dropped by the resolver; stale ack attempts are also dropped client-side.

Behavioural rule kinds

The eleven BehaviourRule variants (mirroring the platform's operational schema at apps/api/src/modules/rights/contracts/scope.ts):

KindConvenience helper
YIELD_TO_PEDESTRIANSreport_yield_event(...)
MAX_SPEED_KPHreport_speed_exceedance(...)
EXCLUSION_ZONEreport_exclusion_zone_entry(...)
OPERATING_HOURSreport_behavioural_event(...) (generic)
REQUIRE_HUMAN_OPERATORgeneric
DONT_DISPLAY_WHILE_MOVING / DRIVER_ATTENTION_REQUIRED / INTEGRATE_WITH_VEHICLE_DISTRACTION_SYSTEM / PASSENGER_DISPLAY_ONLY / REQUIRE_HANDS_FREEgeneric — AUTOMOTIVE substrate cross-over
CUSTOMgeneric

Event types: rule_fired, rule_violated, rule_skipped, rule_unknown.

Adapter pattern

Each fleet ships an adapter that implements four protocols in src/enfinitos_robotics/adapters/_template.py:

ProtocolResponsibility
FleetCommandSinkapply EnfinitOS policy onto the fleet's native command vocabulary
FleetEventSourcetranslate native fleet events into BehaviouralEvents
FleetTelemetrySourcebuild TelemetryReports from native pose/battery/sensors
FleetLifecycleHookoptional — react to lifecycle transitions

Skeletons shipped:

  • adapters/ros2.py — ROS 2 via rclpy, maps onto managed-node lifecycle, latched topics for policy publication.
  • adapters/vda5050.py — VDA 5050 v2 via MQTT, instantActions for most policy rules.

Telemetry pattern

Recommended cadence: 1 Hz for sidewalk delivery and warehouse AGVs; up to 5 Hz for autonomous-vehicle telemetry. The platform throttles ingest above 10 Hz to protect the audit ledger.

TelemetryReport shape: location, battery, speed, heading, state, free-form sensorHealth. Keep sensorHealth keys stable across reports so the SRE dashboard's time series stay coherent.

Emergency stop semantics

  • Priority channel. send_immediate() bypasses the normal outbound queue. An emergency stop fires even if a policy ack or telemetry frame is mid-send.
  • Idempotent. commandId is the dedup key.
  • Never queued behind reconnect backoff. If the WS is down when emergency_stop() is called, the message sits at the head of the priority queue and is the FIRST thing flushed when the socket re-opens.
  • Lifecycle. Robot-side emergency_stop() auto-transitions to SUSPENDED. Inbound emergency stops do NOT auto-transition.

Sample integration — minimal viable robot

import asyncio
from datetime import datetime, timezone

from enfinitos_robotics import (
    EnfinitOSRobotClient,
    EnfinitOSRobotClientOptions,
    GeoCoordinate,
    TelemetryReport,
    YieldEventDetail,
)


async def main() -> None:
    client = EnfinitOSRobotClient(
        EnfinitOSRobotClientOptions(
            org_id="org_acme",
            robot_id="robot_001",
            fleet_id="fleet_main",
            auth_token="<JWT>",
            api_base_url="https://api.enfinitos.com",
        )
    )

    def on_policy(policy):
        print(f"got policy v{policy.version}")
        asyncio.create_task(client.acknowledge_policy(policy.version))

    client.subscribe_policy(on_policy)
    await client.connect()

    # one heartbeat
    await client.report_telemetry(
        TelemetryReport(
            reportedAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            location=GeoCoordinate(lat=51.5, lng=-0.1),
            batteryPct=87.0,
            speedKph=0.0,
            headingDeg=90.0,
            state=client.get_state(),
        )
    )

    # one behavioural event
    await client.report_yield_event(
        YieldEventDetail(
            occurredAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            location=GeoCoordinate(lat=51.5, lng=-0.1),
            detectedPedestrians=1,
            closestDistanceM=1.8,
        )
    )

    await asyncio.sleep(0.5)
    await client.disconnect()


if __name__ == "__main__":
    asyncio.run(main())

What's SDK-side vs platform-side

ConcernThis SDKPlatform tranche (separate)
Wire protocol (WireMessage)yes — defined hereyes — implemented by platform-side server
/v1/robotics/connect WS endpointnopending — separate work item
Audit-log persistencenopending — separate work item
JWT issuancenoreuses existing /auth/* plane
Policy authoring UInoalready shipped in apps/web/.../compose
Adapters (ROS 2, VDA 5050, custom)yes — skeletons(customer-owned)

See also

  • apps/api/src/modules/rights/contracts/scope.tsBehaviourRule source-of-truth on the platform side.
  • packages/sdks/robotics-ts/ — TypeScript twin of this SDK.
  • docs/launch/substrate-readiness-matrix.md — ROBOTICS substrate readiness tracker.
API reference

Hit the HTTP surface directly

The Robotics SDK — Python is a thin client over the same governed HTTP API every other SDK calls. The full OpenAPI 3.1 reference lives on the docs site.

Sandbox

Run this SDK against a real tenant

The hosted sandbox is the fastest way to verify Robotics SDK — Python against a real EnfinitOS tenant before committing to a pilot. Launching Q4 2026.