python 使用pyserial控制温箱(附源码)

目录

前言:

温箱介绍:

开始编码工作:

官方文档解读:

(1)发送的数据格式介绍:

(2)发送数据举例: 

构建转换进制的类:

构建读取温箱温度的方法:

(1)按照文档规则拼接命令:

(2)将温箱返回的值ASCII值转10进制:

(3)读取当前温度:

写在最后:

总结:

源码:


前言:

由于最近进了新公司,一个台资企业,做通信的硬件产品。在硬件产品的测试过程中需要用到温箱,故而需要远程控制温箱,实现全自动化测试;所以这边文章主要介绍一下如何使用python来控制 TEMI 1000这个型号的温箱。以下内容适用于TEMI 1000, TEMI 1500型号的温箱。

温箱介绍:

该温箱是我司从昆山振弘检测设备有限公司采购的TEMI 1000,但是控制器用的是韩国的,关于控温箱这个项目,可参考的资料仅有一份官方的manual,里面包含一些命令格式啥。只要你有温箱,就应该可以向厂家索取该文档。

该温箱的通讯接口只有一个RS232接口,要想实现远程控制,需要有串口转USB的线;类似于下图

所以,想依靠python来与之进行通信,还是要通过python 的pyserial库,关于怎么安装pyserial,过于简单,这里不做赘述,可自行百度。

开始编码工作:

官方文档解读:

(1)发送的数据格式介绍:

其实,在编码工作开始之前,必须熟读官方文档,首先要知道发送数据的格式,规则;比如,在温箱文档中这样描述发送数据的格式:

要想给温箱发送数据,必须遵循此格式,其中①②④⑦⑧都是固定的值,不用更改,②的设备地址默认是01,所以我们要拼凑的数据只有③⑤⑥了,下面我来一步一步分析。

③代表通信命令的类型,根据文档,命令有以下表中几种类型:常用的也就是RSD,WSD两种

 ⑤代表你要发送的数据

⑥是一个SUM值,需要计算后填充至命令中,根据文档的意思是将①之后,也就是“STX”之后的所有内容转十六进制相加,相加后转ASCII码,取ASCII码的后两位就是我们所需要的SUM值

(2)发送数据举例: 

由文档的示例可知,需拼接:STX + 设备地址(默认01)+命令(读用RSD,写用WSD)+ 逗号+寄存器个数+寄存器地址+计算后的SUM值+CR+LF。

拼接的结果用ASCII表示就是:

STX01RRD,02,0001,0002B2CRLF

转成十六进制表示就是:

02 30 31 52 53 44 2C 30 32 2C 30 30 30 32 42 32 0D 0A

上面的ASCII值中B2就是通过从STX后的值到B2前面的值转16进制相加,再转ASII取后两位的结果得出B2,这里的算法可参考文章末尾源码中的def _calculate_sum 函数。

到此,关于发送的数据格式和规则介绍完了,接收的数据也是差不多的原理。接下来开始上代码。

构建转换进制的类:

官方文档熟读后得知,与该温箱通信需要进行几种进制的转换,所以先封装转换方法为Converter类,具体见注释:

class Converter(object):  @staticmethoddef to_ascii(h):  # 转换str为ascii值list_s = []for i in range(0, len(h), 2):list_s.append(chr(int(h[i:i + 2], 16)))return ''.join(list_s)@staticmethoddef to_hex(s):  # 转换str为十六进制值list_h = []for c in s:list_h.append(str(hex(ord(c))[2:]))return ''.join(list_h)@staticmethoddef convert_hex_to_dec(hexTemp):  # 转换十六进制为10进制的温度值temp = int(hexTemp, 16)return -(temp & 0x8000) | (temp & 0x7fff)@staticmethoddef convert_dec_to_hex(decTem):  # 转10进制为16进制if decTem >= 0:return hex(decTem)[2:].upper().zfill(4)else:binStr = bin(decTem & 0xFFFF)[2:]hexStr = hex(int(binStr, 2))[2:].upper()return hexStr

构建读取温箱温度的方法:

(1)按照文档规则拼接命令:

	def _splicing_command(self, register, registerCount=1, cmdType=CommandType.RSD, data=None):register = Utils.to_enum(register, ModbusRegister).valueregisterCount = int(registerCount)cmdType = Utils.to_enum(cmdType, CommandType).namesumStr = self._calculate_sum(cmdType, registerCount, register, data)if register >= 1000:register = str(register)else:register = str(register).zfill(4)# convert to hexheader = '02'address = Converter.to_hex('01')cmdType = Converter.to_hex(cmdType)registerCount = Converter.to_hex('01')register = Converter.to_hex(register)comma = Converter.to_hex(',').upper()sumStr = Converter.to_hex(sumStr)CR = '0D'LF = '0A'if data is not None:data = Converter.to_hex(data)command = ''.join([header, address, cmdType, comma, registerCount, comma, register, comma, data, sumStr, CR, LF])else:command = ''.join([header, address, cmdType, comma, registerCount, comma, register, sumStr, CR, LF])return command

(2)将温箱返回的值ASCII值转10进制:

	def _calculate_temp(self, response):resp = str(response).split(',')hexTemp = resp[-1][:4]temp = Converter.convert_hex_to_dec(hexTemp) / 100return round(temp, 2)

(3)读取当前温度:

def get_current_temperature(self, register=ModbusRegister.TEMP_NPV):command = self._splicing_command(register)self._ser.open_port_connection(self._ser.get_usb_to_serial_port(), baudrate=9600)self._ser.write_data('{:s}'.format(command))self._common.pause(1, reason='Wait TEMI1000 response')resp = self._ser.read_data()self._ser.close_port_connection()self._check_response(resp)temp = self._calculate_temp(resp)return temp

写在最后:

总结:

由此可见,关于温箱的控制,难点在于了解数据通信的规则,以及各种进制的转换,后期我打算用tkinter模块写一个GUI程序,将温箱的控制集成进去,感兴趣的伙伴可以持续关注哈! 

对了,附一下项目整体的源码吧,源码中没有中文注释,就这几句也是现加的,因为,好的代码不需要注释别人也能看懂!感兴趣的可私聊!该博客会持续更新

源码:

# 以下是导入一些现有的模块,枚举类,以及util类,等等
import timefrom ..enums.tempbox import CommandType
from ..enums.tempbox import RespStatusCode
from ..enums.tempbox import ModbusRegister
from ..enums.tempbox import RunningStatus
from .serialext import _SerialKeywords
from ._context import _Context
from .common import _CommonKeywords
from .seleniumext import _SeleniumExtKeywordsfrom robot.api import logger
from robot.utils import secs_to_timestr
from robot.utils import timestr_to_secsfrom ..utils import Utils# 以下是封装的进制转换的类class Converter(object):@staticmethoddef to_ascii(h):list_s = []for i in range(0, len(h), 2):list_s.append(chr(int(h[i:i + 2], 16)))return ''.join(list_s)@staticmethoddef to_hex(s):list_h = []for c in s:list_h.append(str(hex(ord(c))[2:]))return ''.join(list_h)@staticmethoddef convert_hex_to_dec(hexTemp):temp = int(hexTemp, 16)return -(temp & 0x8000) | (temp & 0x7fff)@staticmethoddef convert_dec_to_hex(decTem):if decTem >= 0:return hex(decTem)[2:].upper().zfill(4)else:binStr = bin(decTem & 0xFFFF)[2:]hexStr = hex(int(binStr, 2))[2:].upper()return hexStrclass _TempKeywords(_Context):  # 这里是所有温箱实现的功能def __init__(self, *args):super().__init__(*args)self._common = _CommonKeywords(*args)self._ser = _SerialKeywords(*args)self._sel = _SeleniumExtKeywords(*args)# region Data Acquisitiondef get_current_temperature(self, register=ModbusRegister.TEMP_NPV):  # 读取温箱现在的温度,并打印出来resp = self._read_content(register)temp = self._calculate_temp(resp)logger.info('The temperature of the TEMI1000 is {:.2f} C now'.format(temp))return tempdef get_the_configured_temperature(self, register=ModbusRegister.TEMP_NSV):# 获取温箱当前设定的定值是多少resp = self._read_content(register)temp = self._calculate_temp(resp)logger.info('The TEMI1000 configured temperature is {:.2f} C'.format(temp))return tempdef get_running_state(self, register=ModbusRegister.NOWSTS):  # 获取温箱现在的运行状态,是正在升降温还是在待机resp = self._read_content(register)code = str(resp).split(',')[-1].strip()[:-2]state = Utils.to_enum(int(code), RunningStatus)logger.info('The TEMI1000 running status is: {:s}'.format(state))return state# endregion# region Functiondef set_temperature_output(self, temp, register=ModbusRegister.FIX_TEMP_TSP):  # 设置温度输出,并激活输出temp = int(temp) * 100targetTemp = Converter.convert_dec_to_hex(temp)state = self.get_running_state()if state is RunningStatus.ON:self.start_stop_trigger()self._write_content(data=targetTemp, register=register)self.activate_output()def start_stop_trigger(self, data='0001', register=ModbusRegister.COM_OPMODE):  # 开始激活输出的底层函数self._write_content(data=data, register=register)def deactivate_temperature_output(self): # 停止温度输出,回到待机状态state = self.get_running_state()if state is RunningStatus.ON:self.start_stop_trigger()else:logger.info('Running status is already {}'.format(RunningStatus.ON.name))def activate_temperature_output(self): # 激活温度输出的函数state = self.get_running_state()if state is RunningStatus.ON:logger.info('Running status is already {}'.format(RunningStatus.ON.name))else:self.start_stop_trigger()def wait_until_temperature_is(self, temp, timeout=3600, interval=5): # 等待,等待温箱升降温到设定的值temp = int(temp)timeout = timestr_to_secs(timeout)interval = timestr_to_secs(interval)timeoutAbs = time.time() + timeoutwhile time.time() < timeoutAbs:self._sel.fragment_sleep(interval)currentTemp = self.get_current_temperature()if Utils.floatEquals(currentTemp, temp, uncertainty=0.2):logger.info('The current temperature reaches the target temperature: {}'.format(temp))return currentTempraise AssertionError('The temperature not reaches the target temperature after {:s}'.format(secs_to_timestr(timeout)))# endregiondef _check_response(self, response): # 对温箱的返回内容进行检查,异常的话则报错response = str(response)if 'OK' in response:logger.info('Returned response is OK')elif 'NG' in response:errCode = int(response.split('NG')[-1][:2])errInfo = Utils.to_enum(errCode, RespStatusCode)raise AssertionError('Response failed :{:s}'.format(errInfo.name))else:raise AssertionError('Command send failed, response :{:s}'.format(response))def _splicing_command(self, register, registerCount=1, cmdType=CommandType.RSD, data=None): # 拼接读写数据的命令register = Utils.to_enum(register, ModbusRegister).valueregisterCount = int(registerCount)cmdType = Utils.to_enum(cmdType, CommandType).namesumStr = self._calculate_sum(cmdType, registerCount, register, data)if register >= 1000:register = str(register)else:register = str(register).zfill(4)# convert to hexheader = '02'address = Converter.to_hex('01')cmdType = Converter.to_hex(cmdType)registerCount = Converter.to_hex('01')register = Converter.to_hex(register)comma = Converter.to_hex(',').upper()sumStr = Converter.to_hex(sumStr)CR = '0D'LF = '0A'if data is not None:data = Converter.to_hex(data)command = ''.join([header, address, cmdType, comma, registerCount, comma, register, comma, data, sumStr, CR, LF])else:command = ''.join([header, address, cmdType, comma, registerCount, comma, register, sumStr, CR, LF])return commanddef _calculate_sum(self, cmdType, registerCount, register, data=None):  # 温箱发送的数据中需要对部分数据先处理,计算出一个SUM值,然后将要发送的数据拼接上SUM值一起发送给温箱,温箱才能正确相应,这部分也是一个难点"""1) Add the ASCII code of characters from the character next to STX one by one up to the character prior to SUM2) Represent the lowest one byte of the sum as a hexadecimal notation (2 characters)"""registerCount = int(registerCount)cmdType = Utils.to_enum(cmdType, CommandType).nameregister = Utils.to_enum(register, ModbusRegister).valueif register >= 1000:register = str(register)else:register = str(register).zfill(4)comma = ['2C']address = [Converter.to_hex(i) for i in '01']registerCount = [Converter.to_hex(i) for i in str(registerCount).zfill(2)]register = [Converter.to_hex(i) for i in register]cmdType = [Converter.to_hex(i) for i in str(cmdType)]if data is not None:data = [Converter.to_hex(i) for i in data]if data is not None:cmdStr = address + cmdType + comma + registerCount + comma + register + comma + dataelse:cmdStr = address + cmdType + comma + registerCount + comma + registersumStr = ','.join(cmdStr)SUM = hex(sum([int(v, 16) for v in sumStr.split(',')]))[-2:]return SUM.upper()def _calculate_temp(self, response): # 将温箱返回的内容转换成十进制温度resp = str(response).split(',')hexTemp = resp[-1][:4]temp = Converter.convert_hex_to_dec(hexTemp) / 100return round(temp, 2)def _write_content(self, data, register):  # 封装写数据的通用方法command = self._splicing_command(register, cmdType=CommandType.WSD, data=data)port = self._ser.get_usb_to_serial_port()self._ser.open_port_connection(port, baudrate=9600)self._ser.write_data('{:s}'.format(command))self._common.pause(1, reason='Wait TEMI1000 response')resp = self._ser.read_data()self._ser.close_port_connection()self._check_response(resp)return respdef _read_content(self, register): # 封装读取数据的通用方法command = self._splicing_command(register)self._ser.open_port_connection(self._ser.get_usb_to_serial_port(), baudrate=9600)self._ser.write_data('{:s}'.format(command))self._common.pause(1, reason='Wait TEMI1000 response')resp = self._ser.read_data()self._ser.close_port_connection()self._check_response(resp)return resp

Published by

风君子

独自遨游何稽首 揭天掀地慰生平