serial_communication.py
9.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
'''
@ author: jie
@ tools: pycharm
@ content: 串口通讯实现类
@ date: 2021.5.10
'''
import imp
import time
import serial
#import serial.tools.list_ports
import logging
from config import Config
import app.utils.g as gg
from app.utils.g import shelfconfig
from flask_babel import Babel, gettext as _
class SerialCommunication():
def __init__(self):
# print(gg.Serl)
#global serl
# serial_cf = ReadConfig()
# self.port = serial_cf.get_serial("DEFAULT_COM")
# self.bps = int(serial_cf.get_serial("DEFAULT_BAUDRATE"))
# self.timeout = float(serial_cf.get_serial("DEFAULT_TIMEOUT"))
# self.port = read_com()
# self.uploads_path = Config.UPLOAD_FOLDER
self.serialstate="串口连接正常"
with open(Config.IP_PATH + "/serialcom.txt", "r") as f:
serialcom = f.read()
self.port = serialcom.strip("\n")
self.bps = 115200
self.timeout = 1
if gg.Serl is None:
try:
if self.port.lower().endswith("n"):
self.TryConnectSerial()
else:
gg.Serl = serial.Serial(port=self.port, baudrate=self.bps, timeout=self.timeout)
#if not gg.Serl.is_open:
# gg.Serl.open()
log("串口已打开1")
except Exception as e:
log("---异常---:", e)
self.main_engine = gg.Serl
def TryConnectSerial(self):
if not self.port.lower().endswith("n"):
return
try:
self.main_engine.close()
except:
log("main_engine 关闭失败")
if not self.findSerial(self.port.strip('n').strip('N')):
log("无法定位串口,系统重启.")
shelfconfig.comport_carsh_reboot=True
shelfconfig.Save()
time.sleep(1)
import os
os.system('sudo reboot')
def findSerial(self, portpre):
trytimes = 0
while trytimes < shelfconfig.comport_carsh_timeout /5:
for i in range(0, 5):
try:
currentport = portpre+str(i)
log("尝试打开串口:{}".format(currentport))
gg.Serl = serial.Serial(port=currentport, baudrate=self.bps, timeout=self.timeout)
#if not gg.Serl.is_open:
# gg.Serl.open()
# 判断是否打开成功
gg.Serl.write(bytes("A1\n".encode('utf-8')))
self.main_engine=gg.Serl
log("成功打开串口:{}".format(currentport))
self.serialstate="串口连接正常"
return True
except Exception as e:
log("串口打开失败:", currentport, e)
self.serialstate=e
trytimes += 1
time.sleep(5)
log("串口打开失败,第{}次重试".format(trytimes))
return False
# 打印设备基本信息
def print_name(self):
print(self.main_engine.name) # 设备名字
print(self.main_engine.port) # 读或者写端口
print(self.main_engine.baudrate) # 波特率
print(self.main_engine.bytesize) # 字节大小
print(self.main_engine.parity) # 校验位
print(self.main_engine.stopbits) # 停止位
print(self.main_engine.timeout) # 读超时设置
print(self.main_engine.writeTimeout) # 写超时
print(self.main_engine.xonxoff) # 软件流控
print(self.main_engine.rtscts) # 软件流控
print(self.main_engine.dsrdtr) # 硬件流控
print(self.main_engine.interCharTimeout) # 字符间隔超时
# 打开串口
def Open_Engine(self):
self.main_engine.open()
# 关闭串口
def Close_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open) # 检验串口是否打开
# 打印可用串口列表
def get_used_port(self):
port_list = list(serial.tools.list_ports.comports())
# print(port_list)
return port_list
# 检查串口连接状态
def check_serial(self):
try:
if (self.main_engine.is_open):
return _(self.serialstate)
else:
return _('串口连接失败')
except Exception as e:
error_msg = 'Error,{}'.format(e)
return error_msg
# 接收指定大小的数据
# 从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def read_Size(self, size):
return self.main_engine.read(size=size)
# 接收一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def read_Line(self):
line = self.main_engine.readline().decode('utf-8').rstrip()
return line
# 未使用
def read_Lines(self):
line = self.main_engine.readlines()
return line
def read_alldata(self):
try:
print("in_waiting",self.main_engine.in_waiting)
data=bytes(bytes("".encode('utf-8')))
while self.main_engine.in_waiting:
data=data+ self.main_engine.read_all()
time.sleep(0.06)
return data
except Exception as e:
log('串口读取数据错误,{}'.format(e))
self.TryConnectSerial()
self.serialstate=e
return bytes(bytes("".encode('utf-8')))
# 未使用
def read_hex_data(self):
if self.main_engine.in_waiting:
data = self.main_engine.read_all()
# data= str(binascii.b2a_hex(t.read(num)))[2:-1] #十六进制显示方法2
# 发数据
def send_data(self, data, wait=True):
# self.main_engine.flushInput()
# self.main_engine.flushOutput()
# text = '{"ADDR":"99","colorset:[[1,255,0,0],[2,0,255,255],[5,132,142,110]]}\n{"ADDR":"1","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n{"ADDR":"3","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n'
try:
print("out_waiting",self.main_engine.out_waiting,'send_data:'+data.replace('\n','\\n')+"EOF")
commands = data.split("\n")
for cmd in commands:
if len(cmd)==0:
continue;
cmd=cmd+"\n"
cmd = bytes(cmd.encode('utf-8'))
self.main_engine.write(cmd)
time.sleep(0.03)
if wait:
self.main_engine.flush()
except Exception as e:
log('串口发送数据错误,{}'.format(e))
self.TryConnectSerial()
# 十六进制数据发送
# 未使用
def send_hex_data(self, data):
data = bytes(data.encode('utf-8')) # 先将输入的字符串转化成字节码
hexstring = data.hex()
self.main_engine.write(hexstring)
# 更多示例
# self.main_engine.write(chr(0x06).encode("utf-8")) # 十六制发送一个数据
# print(self.main_engine.read().hex()) # # 十六进制的读取读一个字节
# print(self.main_engine.read())#读一个字节
# print(self.main_engine.read(10).decode("gbk"))#读十个字节
# print(self.main_engine.readline().decode("gbk"))#读一行
# print(self.main_engine.readlines())#读取多行,返回列表,必须匹配超时(timeout)使用
# print(self.main_engine.in_waiting)#获取输入缓冲区的剩余字节数
# print(self.main_engine.out_waiting)#获取输出缓冲区的字节数
# print(self.main_engine.readall())#读取全部字符。
# 未使用
def Recive_data(self, way):
# 循环接收数据,此为死循环,可用线程实现
print("开始接收数据:")
while True:
try:
# 一个字节一个字节的接收
if self.main_engine.in_waiting:
if(way == 0):
for i in range(self.main_engine.in_waiting):
print("接收ascii数据:"+str(self.Read_Size(1)))
data1 = self.Read_Size(1).hex() # 转为十六进制
# 转为十进制print("收到数据十六进制:"+data1+" 收到数据十进制:"+str(data2))
data2 = int(data1, 16)
if(way == 1):
# 整体接收
# data = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8")#方式一
data = self.main_engine.read_all() # 方式二print("接收ascii数据:", data)
except Exception as e:
print("异常报错:", e)
def log(*values: object):
print(values)
logging.warning(values)
# if __name__ == '__main__':
# myser = Communication("/dev/ttyUSB0",115200,1)
# myser.print_name()
# myser.get_used_port()
# myser.delete_port()
# myser.show_port()