添加新实验操作(Protocol)

Uni-Lab 中,实验操作(Protocol)指的是对实验有意义的单个完整动作——加入某种液体多少量;萃取分液;洗涤仪器;机械+末端执行器等等,就像实验步骤文字书写的那样。

而这些对实验有意义的单个完整动作,一般需要多个设备的协同,或者同一设备连续动作,还依赖于他们的物理连接关系(管道相连;机械臂可转运)Protocol 根据实验操作目标和设备物理连接关系,通过 unilabos/compile 中的“编译”过程产生硬件可执行的机器指令,并依次执行。

开发一个 Protocol 一般共需要修改6个文件:

  1. unilabos_msgs/action 中新建实验操作名和参数列表,如 PumpTransfer.action。一个 Action 定义由三个部分组成,分别是目标(Goal)、结果(Result)和反馈(Feedback),之间使用 --- 分隔:

# Organic
Resource from_vessel
Resource to_vessel
float64 volume
string amount
float64 time
bool viscous
string rinsing_solvent
float64 rinsing_volume
int32 rinsing_repeats
bool solid
float64 flowrate
float64 transfer_flowrate
string rate_spec
string event
string through
---
string return_info
bool success
---
string status
string current_device
builtin_interfaces/Duration time_spent
builtin_interfaces/Duration time_remaining
  1. unilabos_msgs/CMakeLists.txt 中添加新定义的 action 因为在指令集中新建了指令,因此调试时需要编译,并在终端环境中加载临时路径:

cd unilabos_msgs
colcon build
source ./install/local_setup.sh
cd ..

调试成功后,发起 pull request,Uni-Lab 的 CI/CD 系统会自动将新的指令集编译打包,mamba执行升级即可永久生效:

mamba update ros-humble-unilabos-msgs -c http://quetz.dp.tech:8088/get/unilab -c robostack-humble -c robostack-staging
  1. unilabos/messages/__init__.py 中添加 Pydantic 定义的实验操作名和参数列表


class PumpTransferProtocol(BaseModel):
    # === 核心参数(保持必需) ===
    from_vessel: dict
    to_vessel: dict
    
    # === 所有其他参数都改为可选,添加默认值 ===
    volume: float = 0.0  # 🔧 改为-1,表示转移全部体积
    amount: str = ""
    time: float = 0.0
    viscous: bool = False
    rinsing_solvent: str = ""
    rinsing_volume: float = 0.0
    rinsing_repeats: int = 0
    solid: bool = False
    flowrate: float = 2.5
    transfer_flowrate: float = 0.5
    
    # === 新版XDL兼容参数(可选) ===
    rate_spec: str = ""
    event: str = ""
    through: str = ""
    
    def model_post_init(self, __context):
        """后处理:智能参数处理和兼容性调整"""
            
        # 如果指定了 amount 但volume是默认值,尝试解析 amount
        if self.amount and self.volume == 0.0:
            parsed_volume = self._parse_amount_to_volume(self.amount)
            if parsed_volume > 0:
                self.volume = parsed_volume
        
        # 如果指定了 time 但没有明确设置流速,根据时间计算流速
        if self.time > 0 and self.volume > 0:
            if self.flowrate == 2.5 and self.transfer_flowrate == 0.5:
                calculated_flowrate = self.volume / self.time
                self.flowrate = min(calculated_flowrate, 10.0)
                self.transfer_flowrate = min(calculated_flowrate, 5.0)
        
        # 🔧 核心修复:如果flowrate为0(ROS2传入),使用默认值
        if self.flowrate <= 0:
            self.flowrate = 2.5
        if self.transfer_flowrate <= 0:
            self.transfer_flowrate = 0.5
            
        # 根据 rate_spec 调整流速
        if self.rate_spec == "dropwise":
            self.flowrate = min(self.flowrate, 0.1)
            self.transfer_flowrate = min(self.transfer_flowrate, 0.1)
        elif self.rate_spec == "slowly":
            self.flowrate = min(self.flowrate, 0.5)
            self.transfer_flowrate = min(self.transfer_flowrate, 0.3)
        elif self.rate_spec == "quickly":
            self.flowrate = max(self.flowrate, 5.0)
            self.transfer_flowrate = max(self.transfer_flowrate, 2.0)
    
    def _parse_amount_to_volume(self, amount: str) -> float:
        """解析 amount 字符串为体积"""
        if not amount:
            return 0.0
        
        amount = amount.lower().strip()
        
        # 处理特殊关键词
        if amount == "all":
            return 0.0  # 🔧 "all"也表示转移全部
        
        # 提取数字
        import re
        numbers = re.findall(r'[\d.]+', amount)
        if numbers:
            volume = float(numbers[0])
            
            # 单位转换
            if 'ml' in amount or 'milliliter' in amount:
                return volume
            elif 'l' in amount and 'ml' not in amount:
                return volume * 1000
            elif 'μl' in amount or 'microliter' in amount:
                return volume / 1000
            else:
                return volume
        
        return 0.0


class CleanProtocol(BaseModel):
    vessel: dict
    solvent: str
    volume: float
    temp: float
    repeats: int = 1


class SeparateProtocol(BaseModel):
    purpose: str
    product_phase: str
    from_vessel: dict
    separation_vessel: dict
    to_vessel: dict
    waste_phase_to_vessel: dict
    solvent: str
    solvent_volume: float
    through: str
    repeats: int
    stir_time: float
    stir_speed: float
    settling_time: float


class EvaporateProtocol(BaseModel):
    # === 核心参数(必需) ===
    vessel: dict = Field(..., description="蒸发容器名称")
    
    # === 所有其他参数都改为可选,添加默认值 ===
    pressure: float = Field(0.1, description="真空度 (bar),默认0.1 bar")
    temp: float = Field(60.0, description="加热温度 (°C),默认60°C")
    time: float = Field(180.0, description="蒸发时间 (秒),默认1800s (30分钟)")
    stir_speed: float = Field(100.0, description="旋转速度 (RPM),默认100 RPM")
    
    # === 新版XDL兼容参数(可选) ===
    solvent: str = Field("", description="溶剂名称(用于识别蒸发的溶剂类型)")
    
    def model_post_init(self, __context):
        """后处理:智能参数处理和兼容性调整"""
        
        # 参数范围验证和修正
        if self.pressure <= 0 or self.pressure > 1.0:
            logger.warning(f"真空度 {self.pressure} bar 超出范围,修正为 0.1 bar")
            self.pressure = 0.1
        
        if self.temp < 10.0 or self.temp > 200.0:
            logger.warning(f"温度 {self.temp}°C 超出范围,修正为 60°C")
            self.temp = 60.0
        
        if self.time <= 0:
            logger.warning(f"时间 {self.time}s 无效,修正为 1800s")
            self.time = 1800.0
        
        if self.stir_speed < 10.0 or self.stir_speed > 300.0:
            logger.warning(f"旋转速度 {self.stir_speed} RPM 超出范围,修正为 100 RPM")
            self.stir_speed = 100.0
        
        # 根据溶剂类型调整参数
        if self.solvent:
            self._adjust_parameters_by_solvent()
    
    def _adjust_parameters_by_solvent(self):
        """根据溶剂类型调整蒸发参数"""
        solvent_lower = self.solvent.lower()
        
        # 水系溶剂:较高温度,较低真空度
        if any(s in solvent_lower for s in ['water', 'aqueous', 'h2o']):
            if self.temp == 60.0:  # 如果是默认值,则调整
                self.temp = 80.0
            if self.pressure == 0.1:
                self.pressure = 0.2
        
        # 有机溶剂:根据沸点调整
        elif any(s in solvent_lower for s in ['ethanol', 'methanol', 'acetone']):
            if self.temp == 60.0:
                self.temp = 50.0
            if self.pressure == 0.1:
                self.pressure = 0.05
        
        # 高沸点溶剂:更高温度
        elif any(s in solvent_lower for s in ['dmso', 'dmi', 'toluene']):
            if self.temp == 60.0:
                self.temp = 100.0
            if self.pressure == 0.1:
                self.pressure = 0.01


class EvacuateAndRefillProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="目标容器名称")
    gas: str = Field(..., description="气体名称")
    
    # 🔧 删除 repeats 参数,直接在代码中硬编码为 3 次
    
    def model_post_init(self, __context):
        """后处理:参数验证和兼容性调整"""
        
        # 验证气体名称
        if not self.gas.strip():
            logger.warning("气体名称为空,使用默认值 'nitrogen'")
            self.gas = "nitrogen"
        
        # 标准化气体名称
        gas_aliases = {
            'n2': 'nitrogen',
            'ar': 'argon', 
            'air': 'air',
            'o2': 'oxygen',
            'co2': 'carbon_dioxide',
            'h2': 'hydrogen'
        }
        
        gas_lower = self.gas.lower().strip()
        if gas_lower in gas_aliases:
            self.gas = gas_aliases[gas_lower]


class AGVTransferProtocol(BaseModel):
    from_repo: dict
    to_repo: dict
    from_repo_position: str
    to_repo_position: str

#=============新添加的新的协议================
class AddProtocol(BaseModel):
    vessel: dict
    reagent: str
    volume: float
    mass: float
    amount: str
    time: float
    stir: bool
    stir_speed: float
    viscous: bool
    purpose: str

class CentrifugeProtocol(BaseModel):
    vessel: dict
    speed: float
    time: float
    temp: float

class FilterProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="过滤容器名称")
    
    # === 可选参数 ===
    filtrate_vessel: dict = Field("", description="滤液容器名称(可选,自动查找)")
    
    def model_post_init(self, __context):
        """后处理:参数验证"""
        # 验证容器名称
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")

class HeatChillProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="加热容器名称")
    
    # === 可选参数 - 温度相关 ===
    temp: float = Field(25.0, description="目标温度 (°C)")
    temp_spec: str = Field("", description="温度规格(如 'room temperature', 'reflux')")
    
    # === 可选参数 - 时间相关 ===
    time: float = Field(300.0, description="加热时间 (秒)")
    time_spec: str = Field("", description="时间规格(如 'overnight', '2 h')")
    
    # === 可选参数 - 其他XDL参数 ===
    pressure: str = Field("", description="压力规格(如 '1 mbar'),不做特殊处理")
    reflux_solvent: str = Field("", description="回流溶剂名称,不做特殊处理")
    
    # === 可选参数 - 搅拌相关 ===
    stir: bool = Field(False, description="是否搅拌")
    stir_speed: float = Field(300.0, description="搅拌速度 (RPM)")
    purpose: str = Field("", description="操作目的")
    
    def model_post_init(self, __context):
        """后处理:参数验证和解析"""
        
        # 验证必需参数
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")
        
        # 温度解析:优先使用 temp_spec,然后是 temp
        if self.temp_spec:
            self.temp = self._parse_temp_spec(self.temp_spec)
        
        # 时间解析:优先使用 time_spec,然后是 time
        if self.time_spec:
            self.time = self._parse_time_spec(self.time_spec)
        
        # 参数范围验证
        if self.temp < -50.0 or self.temp > 300.0:
            logger.warning(f"温度 {self.temp}°C 超出范围,修正为 25°C")
            self.temp = 25.0
        
        if self.time < 0:
            logger.warning(f"时间 {self.time}s 无效,修正为 300s")
            self.time = 300.0
        
        if self.stir_speed < 0 or self.stir_speed > 1500.0:
            logger.warning(f"搅拌速度 {self.stir_speed} RPM 超出范围,修正为 300 RPM")
            self.stir_speed = 300.0
    
    def _parse_temp_spec(self, temp_spec: str) -> float:
        """解析温度规格为具体温度"""
        
        temp_spec = temp_spec.strip().lower()
        
        # 特殊温度规格
        special_temps = {
            "room temperature": 25.0,      # 室温
            "reflux": 78.0,                 # 默认回流温度(乙醇沸点)
            "ice bath": 0.0,                # 冰浴
            "boiling": 100.0,               # 沸腾
            "hot": 60.0,                    # 热
            "warm": 40.0,                   # 温热
            "cold": 10.0,                   # 冷
        }
        
        if temp_spec in special_temps:
            return special_temps[temp_spec]
        
        # 解析带单位的温度(如 "256 °C")
        import re
        temp_pattern = r'(\d+(?:\.\d+)?)\s*°?[cf]?'
        match = re.search(temp_pattern, temp_spec)
        
        if match:
            return float(match.group(1))
        
        return 25.0  # 默认室温
    
    def _parse_time_spec(self, time_spec: str) -> float:
        """解析时间规格为秒数"""
        
        time_spec = time_spec.strip().lower()
        
        # 特殊时间规格
        special_times = {
            "overnight": 43200.0,           # 12小时
            "several hours": 10800.0,       # 3小时
            "few hours": 7200.0,            # 2小时
            "long time": 3600.0,            # 1小时
            "short time": 300.0,            # 5分钟
        }
        
        if time_spec in special_times:
            return special_times[time_spec]
        
        # 解析带单位的时间(如 "2 h")
        import re
        time_pattern = r'(\d+(?:\.\d+)?)\s*([a-zA-Z]+)'
        match = re.search(time_pattern, time_spec)
        
        if match:
            value = float(match.group(1))
            unit = match.group(2).lower()
            
            unit_multipliers = {
                's': 1.0,
                'sec': 1.0,
                'second': 1.0,
                'seconds': 1.0,
                'min': 60.0,
                'minute': 60.0,
                'minutes': 60.0,
                'h': 3600.0,
                'hr': 3600.0,
                'hour': 3600.0,
                'hours': 3600.0,
            }
            
            multiplier = unit_multipliers.get(unit, 3600.0)  # 默认按小时计算
            return value * multiplier
        
        return 300.0  # 默认5分钟


class HeatChillStartProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="加热容器名称")

    # === 可选参数 - 温度相关 ===
    temp: float = Field(25.0, description="目标温度 (°C)")
    temp_spec: str = Field("", description="温度规格(如 'room temperature', 'reflux')")

    # === 可选参数 - 其他XDL参数 ===
    pressure: str = Field("", description="压力规格(如 '1 mbar'),不做特殊处理")
    reflux_solvent: str = Field("", description="回流溶剂名称,不做特殊处理")

    # === 可选参数 - 搅拌相关 ===
    stir: bool = Field(False, description="是否搅拌")
    stir_speed: float = Field(300.0, description="搅拌速度 (RPM)")
    purpose: str = Field("", description="操作目的")


class HeatChillStopProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="加热容器名称")


class StirProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="搅拌容器名称")
    
    # === 可选参数 ===
    time: str = Field("5 min", description="搅拌时间(如 '0.5 h', '30 min')")
    event: str = Field("", description="事件标识(如 'A', 'B')")
    time_spec: str = Field("", description="时间规格(如 'several minutes', 'overnight')")
    
    def model_post_init(self, __context):
        """后处理:参数验证和时间解析"""
        
        # 验证必需参数
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")
        
        # 优先使用 time_spec,然后是 time
        if self.time_spec:
            self.time = self.time_spec
        
        # 时间解析和验证
        if self.time:
            try:
                # 解析时间字符串为秒数
                parsed_time = self._parse_time_string(self.time)
                if parsed_time <= 0:
                    logger.warning(f"时间 '{self.time}' 解析结果无效,使用默认值 300s")
                    self.time = "5 min"
            except Exception as e:
                logger.warning(f"时间 '{self.time}' 解析失败: {e},使用默认值 300s")
                self.time = "5 min"
    
    def _parse_time_string(self, time_str: str) -> float:
        """解析时间字符串为秒数"""
        import re
        
        time_str = time_str.strip().lower()
        
        # 特殊时间规格
        special_times = {
            "several minutes": 300.0,    # 5分钟
            "few minutes": 180.0,        # 3分钟
            "overnight": 43200.0,        # 12小时
            "room temperature": 300.0,   # 默认5分钟
        }
        
        if time_str in special_times:
            return special_times[time_str]
        
        # 正则表达式匹配数字和单位
        pattern = r'(\d+\.?\d*)\s*([a-zA-Z]+)'
        match = re.match(pattern, time_str)
        
        if not match:
            return 300.0  # 默认5分钟
        
        value = float(match.group(1))
        unit = match.group(2).lower()
        
        # 时间单位转换
        unit_multipliers = {
            's': 1.0,
            'sec': 1.0,
            'second': 1.0,
            'seconds': 1.0,
            'min': 60.0,
            'minute': 60.0,
            'minutes': 60.0,
            'h': 3600.0,
            'hr': 3600.0,
            'hour': 3600.0,
            'hours': 3600.0,
            'd': 86400.0,
            'day': 86400.0,
            'days': 86400.0,
        }
        
        multiplier = unit_multipliers.get(unit, 60.0)  # 默认按分钟计算
        return value * multiplier
    
    def get_time_in_seconds(self) -> float:
        """获取时间(秒)"""
        return self._parse_time_string(self.time)

class StartStirProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="搅拌容器名称")
    
    # === 可选参数,添加默认值 ===
    stir_speed: float = Field(200.0, description="搅拌速度 (RPM),默认200 RPM")
    purpose: str = Field("", description="搅拌目的(可选)")
    
    def model_post_init(self, __context):
        """后处理:参数验证和修正"""
        
        # 验证必需参数
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")
        
        # 修正参数范围
        if self.stir_speed < 10.0:
            logger.warning(f"搅拌速度 {self.stir_speed} RPM 过低,修正为 100 RPM")
            self.stir_speed = 100.0
        elif self.stir_speed > 1500.0:
            logger.warning(f"搅拌速度 {self.stir_speed} RPM 过高,修正为 1000 RPM")
            self.stir_speed = 1000.0

class StopStirProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="搅拌容器名称")
    
    def model_post_init(self, __context):
        """后处理:参数验证"""
        
        # 验证必需参数
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")

class TransferProtocol(BaseModel):
    from_vessel: dict
    to_vessel: dict
    volume: float
    amount: str = ""
    time: float = 0
    viscous: bool = False
    rinsing_solvent: str = ""
    rinsing_volume: float = 0.0
    rinsing_repeats: int = 0
    solid: bool = False

class CleanVesselProtocol(BaseModel):
    vessel: dict
    solvent: str
    volume: float
    temp: float
    repeats: int = 1

class DissolveProtocol(BaseModel):
    vessel: dict
    solvent: str
    volume: float
    amount: str = ""
    temp: float = 25.0
    time: float = 0.0
    stir_speed: float = 0.0

class FilterThroughProtocol(BaseModel):
    from_vessel: dict
    to_vessel: dict
    filter_through: str
    eluting_solvent: str = ""
    eluting_volume: float = 0.0
    eluting_repeats: int = 0
    residence_time: float = 0.0

class RunColumnProtocol(BaseModel):
    from_vessel: dict
    to_vessel: dict
    column: str

class WashSolidProtocol(BaseModel):
    # === 必需参数 ===
    vessel: dict = Field(..., description="装有固体的容器名称")
    solvent: str = Field(..., description="清洗溶剂名称")
    volume: float = Field(..., description="清洗溶剂体积 (mL)")
    
    # === 可选参数,添加默认值 ===
    filtrate_vessel: dict = Field("", description="滤液收集容器(可选,自动查找)")
    temp: float = Field(25.0, description="清洗温度 (°C),默认25°C")
    stir: bool = Field(False, description="是否搅拌,默认False")
    stir_speed: float = Field(0.0, description="搅拌速度 (RPM),默认0")
    time: float = Field(0.0, description="清洗时间 (秒),默认0")
    repeats: int = Field(1, description="重复次数,默认1")
    
    def model_post_init(self, __context):
        """后处理:参数验证和修正"""
        
        # 验证必需参数
        if not self.vessel.strip():
            raise ValueError("vessel 参数不能为空")
        
        if not self.solvent.strip():
            raise ValueError("solvent 参数不能为空")
        
        if self.volume <= 0:
            raise ValueError("volume 必须大于0")
        
        # 修正参数范围
        if self.temp < 0 or self.temp > 200:
            logger.warning(f"温度 {self.temp}°C 超出范围,修正为 25°C")
            self.temp = 25.0
        
        if self.stir_speed < 0 or self.stir_speed > 500:
            logger.warning(f"搅拌速度 {self.stir_speed} RPM 超出范围,修正为 0")
            self.stir_speed = 0.0
        
        if self.time < 0:
            logger.warning(f"时间 {self.time}s 无效,修正为 0")
            self.time = 0.0
        
        if self.repeats < 1:
            logger.warning(f"重复次数 {self.repeats} 无效,修正为 1")
            self.repeats = 1
        elif self.repeats > 10:
            logger.warning(f"重复次数 {self.repeats} 过多,修正为 10")
            self.repeats = 10
            
class AdjustPHProtocol(BaseModel):
    vessel: dict = Field(..., description="目标容器")
    ph_value: float = Field(..., description="目标pH值")  # 改为 ph_value
    reagent: str = Field(..., description="酸碱试剂名称")
    # 移除其他可选参数,使用默认值

class ResetHandlingProtocol(BaseModel):
    solvent: str = Field(..., description="溶剂名称")

class DryProtocol(BaseModel):
    compound: str = Field(..., description="化合物名称")
    vessel: dict = Field(..., description="目标容器")

class RecrystallizeProtocol(BaseModel):
    ratio: str = Field(..., description="溶剂比例(如 '1:1', '3:7')")
    solvent1: str = Field(..., description="第一种溶剂名称")
    solvent2: str = Field(..., description="第二种溶剂名称")
    vessel: dict = Field(..., description="目标容器")
    volume: float = Field(..., description="总体积 (mL)")

class HydrogenateProtocol(BaseModel):
    temp: str = Field(..., description="反应温度(如 '45 °C')")
    time: str = Field(..., description="反应时间(如 '2 h')")
    vessel: dict = Field(..., description="反应容器")

__all__ = [
    "Point3D", "PumpTransferProtocol", "CleanProtocol", "SeparateProtocol", 
    "EvaporateProtocol", "EvacuateAndRefillProtocol", "AGVTransferProtocol", 
    "CentrifugeProtocol", "AddProtocol", "FilterProtocol", 
    "HeatChillProtocol",
    "HeatChillStartProtocol", "HeatChillStopProtocol",
    "StirProtocol", "StartStirProtocol", "StopStirProtocol", 
    "TransferProtocol", "CleanVesselProtocol", "DissolveProtocol", 
    "FilterThroughProtocol", "RunColumnProtocol", "WashSolidProtocol",
    "AdjustPHProtocol", "ResetHandlingProtocol", "DryProtocol", 
    "RecrystallizeProtocol", "HydrogenateProtocol"
]
  1. unilabos/compile 中新建编译为机器指令的函数,函数入参为设备连接图 G 和实验操作参数。

  1. 将该函数加入 unilabos/compile/__init__.pyaction_protocol_generators 中:

  1. 记得将新开发的 Protocol 添加至启动时的 devices.json 中。