添加设备:编写驱动
在 Uni-Lab 中,设备(Device)是实验操作的基础单元。Uni-Lab 使用注册表机制来兼容管理种类繁多的设备驱动程序。抽象的设备对外拥有【话题】【服务】【动作】三种通信机制,因此将设备添加进 Uni-Lab,实际上是将设备驱动中的这三种机制映射到 Uni-Lab 标准指令集上。
💡 提示: 本文档介绍如何使用已有的设备驱动(SDK)。若设备没有现成的驱动程序,需要自己开发驱动,请参考 设备 Driver 开发(无 SDK 设备)。
支持的驱动类型
Uni-Lab 支持以下两种驱动程序:
1. Python Class(推荐)
Python 类设备驱动在完成注册表后可以直接在 Uni-Lab 中使用,无需额外编译。
示例:
class MockGripper:
def __init__(self):
self._position: float = 0.0
self._velocity: float = 2.0
self._torque: float = 0.0
self._status = "Idle"
@property
def position(self) -> float:
return self._position
@property
def velocity(self) -> float:
return self._velocity
@property
def torque(self) -> float:
return self._torque
# 会被自动识别的设备属性,接入 Uni-Lab 时会定时对外广播
@property
def status(self) -> str:
return self._status
@status.setter
def status(self, target):
self._status = target
# 会被自动识别的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
def push_to(self, position: float, torque: float, velocity: float = 0.0):
self._status = "Running"
current_pos = self.position
if velocity == 0.0:
velocity = self.velocity
move_time = abs(position - current_pos) / velocity
for i in range(20):
self._position = current_pos + (position - current_pos) / 20 * (i+1)
self._torque = torque / (20 - i)
self._velocity = velocity
time.sleep(move_time / 20)
self._torque = torque
self._status = "Idle"
2. C# Class
C# 驱动设备在完成注册表后,需要调用 Uni-Lab C# 编译后才能使用(仅需一次)。
示例:
using System;
using System.Threading.Tasks;
public class MockGripper
{
// 会被自动识别的设备属性,接入 Uni-Lab 时会定时对外广播
public double position { get; private set; } = 0.0;
public double velocity { get; private set; } = 2.0;
public double torque { get; private set; } = 0.0;
public string status { get; private set; } = "Idle";
// 需要在注册表添加的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
public async Task PushToAsync(double Position, double Torque, double Velocity = 0.0)
{
status = "Running";
double currentPos = Position;
if (Velocity == 0.0)
{
velocity = Velocity;
}
double moveTime = Math.Abs(Position - currentPos) / velocity;
for (int i = 0; i < 20; i++)
{
position = currentPos + (Position - currentPos) / 20 * (i + 1);
torque = Torque / (20 - i);
velocity = Velocity;
await Task.Delay((int)(moveTime * 1000 / 20));
}
torque = Torque;
status = "Idle";
}
}
快速开始:两种方式添加设备
方式 1:使用注册表编辑器(推荐)
推荐使用 Uni-Lab-OS 自带的可视化编辑器,它能自动分析您的设备驱动并生成大部分配置:
步骤:
启动 Uni-Lab-OS
在浏览器中打开"注册表编辑器"页面
选择您的 Python 设备驱动文件
点击"分析文件",让系统读取类信息
填写基本信息(设备描述、图标等)
点击"生成注册表",复制生成的内容
保存到
devices/目录下
优点:
自动识别设备属性和方法
可视化界面,易于操作
自动生成完整配置
减少手动配置错误
方式 2:手动编写注册表(简化版)
如果需要手动编写,只需要提供两个必需字段,系统会自动补全其余内容:
最小配置示例:
my_device: # 设备唯一标识符
class:
module: unilabos.devices.your_module.my_device:MyDevice # Python 类路径
type: python # 驱动类型
注册表文件位置:
默认路径:
unilabos/registry/devices自定义路径:启动时使用
--registry_path参数指定可将多个设备写在同一个 YAML 文件中
系统自动生成的内容:
系统会自动分析您的 Python 驱动类并生成:
status_types:从@property装饰的方法自动识别状态属性action_value_mappings:从类方法自动生成动作映射init_param_schema:从__init__方法分析初始化参数schema:前端显示用的属性类型定义
完整结构概览:
my_device:
class:
module: unilabos.devices.your_module.my_device:MyDevice
type: python
status_types: {} # 自动生成
action_value_mappings: {} # 自动生成
description: '' # 可选:设备描述
icon: '' # 可选:设备图标
init_param_schema: {} # 自动生成
schema: {} # 自动生成
💡 提示: 详细的注册表编写指南和高级配置,请参考 03_add_device_registry。
Python 类结构要求
Uni-Lab 设备驱动是一个 Python 类,需要遵循以下结构:
from typing import Dict, Any
class MyDevice:
"""设备类文档字符串
说明设备的功能、连接方式等
"""
def __init__(self, config: Dict[str, Any]):
"""初始化设备
Args:
config: 配置字典,来自图文件或注册表
"""
self.port = config.get('port', '/dev/ttyUSB0')
self.baudrate = config.get('baudrate', 9600)
self._status = "idle"
# 初始化硬件连接
@property
def status(self) -> str:
"""设备状态(会自动广播)"""
return self._status
def my_action(self, param: float) -> Dict[str, Any]:
"""执行动作
Args:
param: 参数说明
Returns:
{"success": True, "result": ...}
"""
# 执行设备操作
return {"success": True}
状态属性 vs 动作方法
状态属性(@property)
状态属性会被自动识别并定期广播:
@property
def temperature(self) -> float:
"""当前温度"""
return self._read_temperature()
@property
def status(self) -> str:
"""设备状态: idle, running, error"""
return self._status
@property
def is_ready(self) -> bool:
"""设备是否就绪"""
return self._status == "idle"
特点:
使用
@property装饰器只读,不能有参数
自动添加到注册表的
status_types定期发布到 ROS2 topic
动作方法
动作方法是设备可以执行的操作:
def start_heating(self, target_temp: float, rate: float = 1.0) -> Dict[str, Any]:
"""开始加热
Args:
target_temp: 目标温度(°C)
rate: 升温速率(°C/min)
Returns:
{"success": bool, "message": str}
"""
self._status = "heating"
self._target_temp = target_temp
# 发送命令到硬件
return {"success": True, "message": f"Heating to {target_temp}°C"}
async def async_operation(self, duration: float) -> Dict[str, Any]:
"""异步操作(长时间运行)
Args:
duration: 持续时间(秒)
"""
# 使用 self.sleep 而不是 asyncio.sleep(ROS2 异步机制)
await self.sleep(duration)
return {"success": True}
特点:
普通方法或 async 方法
返回 Dict 类型的结果
自动注册为 ROS2 Action
支持参数和返回值
返回值设计指南
⚠️ 重要:返回值会自动显示在前端
动作方法的返回值(字典)会自动显示在 Web 界面的工作流执行结果中。因此,强烈建议设计结构化、可读的返回值字典。
推荐的返回值结构:
def my_action(self, param: float) -> Dict[str, Any]:
"""执行操作"""
try:
# 执行操作...
result = self._do_something(param)
return {
"success": True, # 必需:操作是否成功
"message": "操作完成", # 推荐:用户友好的消息
"result": result, # 可选:具体结果数据
"param_used": param, # 可选:记录使用的参数
# 其他有用的信息...
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "操作失败"
}
最佳实践示例(参考 host_node.test_latency):
def test_latency(self) -> Dict[str, Any]:
"""测试网络延迟
返回值会在前端显示,包含详细的测试结果
"""
# 执行测试...
avg_rtt_ms = 25.5
avg_time_diff_ms = 10.2
test_count = 5
# 返回结构化的测试结果
return {
"status": "success", # 状态标识
"avg_rtt_ms": avg_rtt_ms, # 平均往返时间
"avg_time_diff_ms": avg_time_diff_ms, # 平均时间差
"max_time_error_ms": 5.3, # 最大误差
"task_delay_ms": 15.7, # 任务延迟
"test_count": test_count, # 测试次数
}
前端显示效果:
当用户在 Web 界面执行工作流时,返回的字典会以 JSON 格式显示在结果面板中:
{
"status": "success",
"avg_rtt_ms": 25.5,
"avg_time_diff_ms": 10.2,
"max_time_error_ms": 5.3,
"task_delay_ms": 15.7,
"test_count": 5
}
返回值设计建议:
始终包含
success字段:布尔值,表示操作是否成功包含
message字段:字符串,提供用户友好的描述使用有意义的键名:使用描述性的键名(如
avg_rtt_ms而不是v1)包含单位:在键名中包含单位(如
_ms、_ml、_celsius)记录重要参数:返回使用的关键参数值,便于追溯
错误信息详细:失败时包含
error字段和详细的错误描述避免返回大数据:不要返回大型数组或二进制数据,这会影响前端性能
错误处理示例:
def risky_operation(self, param: float) -> Dict[str, Any]:
"""可能失败的操作"""
if param < 0:
return {
"success": False,
"error": "参数不能为负数",
"message": f"无效参数: {param}",
"param": param
}
try:
result = self._execute(param)
return {
"success": True,
"message": "操作成功",
"result": result,
"param": param
}
except IOError as e:
return {
"success": False,
"error": "通信错误",
"message": str(e),
"device_status": self._status
}
特殊参数类型:ResourceSlot 和 DeviceSlot
Uni-Lab 提供特殊的参数类型,用于在方法中声明需要选择资源或设备。
导入类型
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List
ResourceSlot - 资源选择
用于需要选择物料资源的场景:
def pipette_liquid(
self,
source: ResourceSlot, # 单个源容器
target: ResourceSlot, # 单个目标容器
volume: float
) -> Dict[str, Any]:
"""从源容器吸取液体到目标容器
Args:
source: 源容器(前端会显示资源选择下拉框)
target: 目标容器(前端会显示资源选择下拉框)
volume: 体积(μL)
"""
print(f"Pipetting {volume}μL from {source.id} to {target.id}")
return {"success": True}
多选示例:
def mix_multiple(
self,
containers: List[ResourceSlot], # 多个容器选择
speed: float
) -> Dict[str, Any]:
"""混合多个容器
Args:
containers: 容器列表(前端会显示多选下拉框)
speed: 混合速度
"""
for container in containers:
print(f"Mixing {container.name}")
return {"success": True}
DeviceSlot - 设备选择
用于需要选择其他设备的场景:
def coordinate_with_device(
self,
other_device: DeviceSlot, # 单个设备选择
command: str
) -> Dict[str, Any]:
"""与另一个设备协同工作
Args:
other_device: 协同设备(前端会显示设备选择下拉框)
command: 命令
"""
print(f"Coordinating with {other_device.name}")
return {"success": True}
多设备示例:
def sync_devices(
self,
devices: List[DeviceSlot], # 多个设备选择
sync_signal: str
) -> Dict[str, Any]:
"""同步多个设备
Args:
devices: 设备列表(前端会显示多选下拉框)
sync_signal: 同步信号
"""
for dev in devices:
print(f"Syncing {dev.name}")
return {"success": True}
完整示例:液体处理工作站
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List, Dict, Any
class LiquidHandler:
"""液体处理工作站"""
def __init__(self, config: Dict[str, Any]):
self.simulation = config.get('simulation', False)
self._status = "idle"
@property
def status(self) -> str:
return self._status
def transfer_liquid(
self,
source: ResourceSlot, # 源容器选择
target: ResourceSlot, # 目标容器选择
volume: float,
tip: ResourceSlot = None # 可选的枪头选择
) -> Dict[str, Any]:
"""转移液体
前端效果:
- source: 下拉框,列出所有可用容器
- target: 下拉框,列出所有可用容器
- volume: 数字输入框
- tip: 下拉框(可选),列出所有枪头
"""
self._status = "transferring"
# source和target会被解析为实际的资源对象
print(f"Transferring {volume}μL")
print(f" From: {source.id} ({source.name})")
print(f" To: {target.id} ({target.name})")
if tip:
print(f" Using tip: {tip.id}")
# 执行实际的液体转移
# ...
self._status = "idle"
return {
"success": True,
"volume_transferred": volume,
"source_id": source.id,
"target_id": target.id
}
def multi_dispense(
self,
source: ResourceSlot, # 单个源
targets: List[ResourceSlot], # 多个目标
volumes: List[float]
) -> Dict[str, Any]:
"""从一个源分配到多个目标
前端效果:
- source: 单选下拉框
- targets: 多选下拉框(可选择多个容器)
- volumes: 数组输入(每个目标对应一个体积)
"""
results = []
for target, vol in zip(targets, volumes):
print(f"Dispensing {vol}μL to {target.name}")
results.append({
"target": target.id,
"volume": vol
})
return {
"success": True,
"dispense_results": results
}
def test_with_balance(
self,
target: ResourceSlot, # 容器
balance: DeviceSlot # 天平设备
) -> Dict[str, Any]:
"""使用天平测量容器
前端效果:
- target: 容器选择下拉框
- balance: 设备选择下拉框(仅显示天平类型)
"""
print(f"Weighing {target.name} on {balance.name}")
# 可以调用balance的方法
# weight = balance.get_weight()
return {
"success": True,
"container": target.id,
"balance_used": balance.id
}
工作原理
1. 类型识别
注册表扫描方法签名时:
def my_method(self, resource: ResourceSlot, device: DeviceSlot):
pass
系统识别到ResourceSlot和DeviceSlot类型。
2. 自动添加 placeholder_keys
在注册表中自动生成:
my_device:
class:
action_value_mappings:
my_method:
goal:
resource: resource
device: device
placeholder_keys:
resource: unilabos_resources # 自动添加!
device: unilabos_devices # 自动添加!
3. 前端 UI 生成
unilabos_resources: 渲染为资源选择下拉框unilabos_devices: 渲染为设备选择下拉框
4. 运行时解析
用户选择资源/设备后,实际调用时会传入完整的资源/设备对象:
# 用户在前端选择了 plate_1
# 运行时,source参数会收到完整的Resource对象
source.id # "plate_1"
source.name # "96孔板"
source.type # "resource"
source.class_ # "corning_96_wellplate_360ul_flat"
支持的通信方式
1. 串口(Serial)
import serial
class SerialDevice:
def __init__(self, config: Dict[str, Any]):
self.port = config['port']
self.baudrate = config.get('baudrate', 9600)
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
def send_command(self, cmd: str) -> str:
"""发送命令并读取响应"""
self.ser.write(f"{cmd}\r\n".encode())
response = self.ser.readline().decode().strip()
return response
def __del__(self):
if hasattr(self, 'ser') and self.ser.is_open:
self.ser.close()
2. TCP/IP Socket
import socket
class TCPDevice:
def __init__(self, config: Dict[str, Any]):
self.host = config['host']
self.port = config['port']
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def send_command(self, cmd: str) -> str:
self.sock.sendall(cmd.encode())
response = self.sock.recv(1024).decode()
return response
3. Modbus
from pymodbus.client import ModbusTcpClient
class ModbusDevice:
def __init__(self, config: Dict[str, Any]):
self.host = config['host']
self.port = config.get('port', 502)
self.client = ModbusTcpClient(self.host, port=self.port)
self.client.connect()
def read_register(self, address: int) -> int:
result = self.client.read_holding_registers(address, 1)
return result.registers[0]
def write_register(self, address: int, value: int):
self.client.write_register(address, value)
4. OPC UA
from opcua import Client
class OPCUADevice:
def __init__(self, config: Dict[str, Any]):
self.url = config['url']
self.client = Client(self.url)
self.client.connect()
def read_node(self, node_id: str):
node = self.client.get_node(node_id)
return node.get_value()
def write_node(self, node_id: str, value):
node = self.client.get_node(node_id)
node.set_value(value)
5. HTTP/RPC
import requests
class HTTPDevice:
def __init__(self, config: Dict[str, Any]):
self.base_url = config['url']
self.auth_token = config.get('token')
def send_command(self, endpoint: str, data: Dict) -> Dict:
url = f"{self.base_url}/{endpoint}"
headers = {'Authorization': f'Bearer {self.auth_token}'}
response = requests.post(url, json=data, headers=headers)
return response.json()
异步 vs 同步方法
同步方法(适合快速操作)
def quick_operation(self, param: float) -> Dict[str, Any]:
"""快速操作,立即返回"""
result = self._do_something(param)
return {"success": True, "result": result}
异步方法(适合耗时操作)
async def long_operation(self, duration: float) -> Dict[str, Any]:
"""长时间运行的操作"""
self._status = "running"
# 使用 ROS2 提供的 sleep 方法(而不是 asyncio.sleep)
await self.sleep(duration)
# 可以在过程中发送feedback
# 需要配合ROS2 Action的feedback机制
self._status = "idle"
return {"success": True, "duration": duration}
⚠️ 重要提示:ROS2 异步机制 vs Python asyncio
Uni-Lab 的设备驱动虽然使用
async def语法,但底层是 ROS2 的异步机制,而不是 Python 的 asyncio。不能使用的 asyncio 功能:
❌
asyncio.sleep()- 会导致 ROS2 事件循环阻塞❌
asyncio.create_task()- 任务不会被 ROS2 正确调度❌
asyncio.gather()- 无法与 ROS2 集成❌ 其他 asyncio 标准库函数
应该使用的方法(继承自 BaseROS2DeviceNode):
✅
await self.sleep(seconds)- ROS2 兼容的睡眠✅
await self.create_task(func, **kwargs)- ROS2 兼容的任务创建✅ ROS2 的 Action/Service 回调机制
示例:
async def complex_operation(self, duration: float) -> Dict[str, Any]: """正确使用 ROS2 异步方法""" self._status = "processing" # ✅ 正确:使用 self.sleep await self.sleep(duration) # ✅ 正确:创建并发任务 task = await self.create_task(self._background_work) # ❌ 错误:不要使用 asyncio # await asyncio.sleep(duration) # 这会导致问题! # task = asyncio.create_task(...) # 这也不行! self._status = "idle" return {"success": True} async def _background_work(self): """后台任务""" await self.sleep(1.0) self.lab_logger().info("Background work completed")为什么不能混用?
ROS2 使用
rclpy的事件循环来管理所有异步操作。如果使用asyncio的函数,这些操作会在不同的事件循环中运行,导致:
ROS2 回调无法正确执行
任务可能永远不会完成
程序可能死锁或崩溃
参考实现:
BaseROS2DeviceNode提供的方法定义(base_device_node.py:563-572):async def sleep(self, rel_time: float, callback_group=None): """ROS2 兼容的异步睡眠""" if callback_group is None: callback_group = self.callback_group await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group) @classmethod async def create_task(cls, func, trace_error=True, **kwargs) -> Task: """ROS2 兼容的任务创建""" return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)
错误处理
基本错误处理
def operation_with_error_handling(self, param: float) -> Dict[str, Any]:
"""带错误处理的操作"""
try:
result = self._risky_operation(param)
return {
"success": True,
"result": result
}
except ValueError as e:
return {
"success": False,
"error": "Invalid parameter",
"message": str(e)
}
except IOError as e:
self._status = "error"
return {
"success": False,
"error": "Communication error",
"message": str(e)
}
自定义异常
class DeviceError(Exception):
"""设备错误基类"""
pass
class DeviceNotReadyError(DeviceError):
"""设备未就绪"""
pass
class DeviceTimeoutError(DeviceError):
"""设备超时"""
pass
class MyDevice:
def operation(self) -> Dict[str, Any]:
if self._status != "idle":
raise DeviceNotReadyError(f"Device is {self._status}")
# 执行操作
return {"success": True}
最佳实践
1. 类型注解
from typing import Dict, Any, Optional, List
def method(
self,
param1: float,
param2: str,
optional_param: Optional[int] = None
) -> Dict[str, Any]:
"""完整的类型注解有助于自动生成注册表"""
pass
2. 文档字符串
def method(self, param: float) -> Dict[str, Any]:
"""方法简短描述
更详细的说明...
Args:
param: 参数说明,包括单位和范围
Returns:
Dict包含:
- success (bool): 是否成功
- result (Any): 结果数据
Raises:
DeviceError: 错误情况说明
"""
pass
3. 配置验证
def __init__(self, config: Dict[str, Any]):
# 验证必需参数
required = ['port', 'baudrate']
for key in required:
if key not in config:
raise ValueError(f"Missing required config: {key}")
self.port = config['port']
self.baudrate = config['baudrate']
4. 资源清理
def __del__(self):
"""析构函数,清理资源"""
if hasattr(self, 'connection') and self.connection:
self.connection.close()
5. 设计前端友好的返回值
记住:返回值会直接显示在 Web 界面
import time
def measure_temperature(self) -> Dict[str, Any]:
"""测量温度
✅ 好的返回值设计:
- 包含 success 状态
- 使用描述性键名
- 在键名中包含单位
- 记录测量时间
"""
temp = self._read_temperature()
return {
"success": True,
"temperature_celsius": temp, # 键名包含单位
"timestamp": time.time(), # 记录时间
"sensor_status": "normal", # 额外状态信息
"message": f"温度测量完成: {temp}°C" # 用户友好的消息
}
def bad_example(self) -> Dict[str, Any]:
"""❌ 不好的返回值设计"""
return {
"s": True, # ❌ 键名不明确
"v": 25.5, # ❌ 没有说明单位
"t": 1234567890, # ❌ 不清楚是什么时间戳
}
参考 host_node.test_latency 方法(第 1216-1340 行),它返回详细的测试结果,在前端清晰显示:
return {
"status": "success",
"avg_rtt_ms": 25.5, # 有意义的键名 + 单位
"avg_time_diff_ms": 10.2,
"max_time_error_ms": 5.3,
"task_delay_ms": 15.7,
"test_count": 5, # 记录重要信息
}
下一步
看完本文档后,建议继续阅读:
添加新动作指令(Action) - 了解如何添加新的动作指令
yaml 注册表编写指南 - 学习如何编写和完善 YAML 注册表
进阶主题:
03_add_device_registry - 了解如何配置注册表
04_add_device_testing - 学习如何测试设备
设备 Driver 开发(无 SDK 设备) - 没有 SDK 时如何开发设备驱动
参考
ROS2 rclpy 异步编程 - Uni-Lab 使用 ROS2 的异步机制
注意: 虽然设备驱动使用
async def语法,但请不要参考 Python 标准的 asyncio 文档。Uni-Lab 使用的是 ROS2 的异步机制,两者不兼容。请使用self.sleep()和self.create_task()等 BaseROS2DeviceNode 提供的方法。