Commit 8b38abfa 刘韬

1

1 个父辈 010e6efc
此文件太大,无法显示。
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
"type": "python", "type": "python",
"request": "launch", "request": "launch",
"module": "flask", "module": "flask",
//"justMyCode": false,
"env": { "env": {
"FLASK_APP": "app", "FLASK_APP": "app",
"FLASK_ENV": "development" "FLASK_ENV": "development"
......
...@@ -16,9 +16,10 @@ def setup_log(): ...@@ -16,9 +16,10 @@ def setup_log():
file_log_handler = RotatingFileHandler(Config.LOG_FOLDER, maxBytes=1024 * 1024 * 100, backupCount=10) file_log_handler = RotatingFileHandler(Config.LOG_FOLDER, maxBytes=1024 * 1024 * 100, backupCount=10)
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息 # 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
# log_file_handler = TimedRotatingFileHandler(filename='logs/smartshelf.log', when="D", interval=1, backupCount=3) # log_file_handler = TimedRotatingFileHandler(filename='logs/smartshelf.log', when="D", interval=1, backupCount=3)
formatter = logging.Formatter('%(asctime)s : %(message)s') formatter = logging.Formatter('%(asctime)s - %(levelname)s : %(message)s')
# 为刚创建的日志记录器设置日志记录格式 # 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter) file_log_handler.setFormatter(formatter)
file_log_handler.setLevel(logging.WARNING)
# 为全局的日志工具对象(flask app使用的)添加日志记录器 # 为全局的日志工具对象(flask app使用的)添加日志记录器
logging.getLogger().addHandler(file_log_handler) logging.getLogger().addHandler(file_log_handler)
......
...@@ -38,18 +38,10 @@ def read_ip(): ...@@ -38,18 +38,10 @@ def read_ip():
ip = current[0][0] ip = current[0][0]
return ip return ip
from app.utils.g import ver
def read_version(): def read_version():
uploads_path = basepath + Config.STATE_PATH return ver()
with open(uploads_path + "version.txt","r") as f:
s_version=f.readline()
return s_version
def confirm_version(version):
logging.warning("写入最新版本号:{}".format(version))
uploads_path = basepath + Config.STATE_PATH
with open(uploads_path + "version.txt","w") as f:
f.write(str(version))
f.close()
@app.route('/upgrade',methods=['POST']) @app.route('/upgrade',methods=['POST'])
def upgrade(): def upgrade():
...@@ -72,7 +64,6 @@ def upgrade(): ...@@ -72,7 +64,6 @@ def upgrade():
time.sleep(2) time.sleep(2)
process_file() process_file()
time.sleep(2) time.sleep(2)
confirm_version(version)
reboot() reboot()
info = {'operation':'已升级到最新版本'} info = {'operation':'已升级到最新版本'}
else: else:
......
...@@ -11,11 +11,12 @@ import csv ...@@ -11,11 +11,12 @@ import csv
import re import re
import logging import logging
import requests import requests
from app.induction_post import COLORS
from config import Config from config import Config
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from app.utils.serial_communication import SerialCommunication from app.utils.serial_communication import SerialCommunication
from app.induction_config import read_addrs,ip_config,testCidExists from app.induction_config import read_addrs,ip_config,testCidExists
from serial_config import ReadConfig #from serial_config import ReadConfig
from app.utils.location import Location from app.utils.location import Location
config_dict,convert_dict = Location().read_location_file() config_dict,convert_dict = Location().read_location_file()
...@@ -159,16 +160,19 @@ def writeserial(): ...@@ -159,16 +160,19 @@ def writeserial():
if current_state=='on': if current_state=='on':
return json.dumps({'msg':"请先停止系统"}) return json.dumps({'msg':"请先停止系统"})
data = request.get_json() data = request.get_json()
command = data['command']+'\r\n' command = data['command']+'\n'
command = command.replace('\\n','\n')
#ser = SerialCommunication() #ser = SerialCommunication()
ser.send_data(command) ser.send_data(command)
time.sleep(0.1)
msg="" msg=""
for i in range(0,20): #for i in range(0,20):
time.sleep(0.1) # time.sleep(0.1)
a = ser.read_alldata() # a = ser.read_alldata()
if a is None: # if a is None:
break # break
msg +=bytes.decode(a) # msg +=bytes.decode(a)
msg=bytes.decode(ser.read_alldata())
if msg is not None: if msg is not None:
return json.dumps({'msg':"{}".format(msg)}) return json.dumps({'msg':"{}".format(msg)})
...@@ -195,7 +199,113 @@ def sitereport(): ...@@ -195,7 +199,113 @@ def sitereport():
time.sleep(1.5) time.sleep(1.5)
msg = ser.read_alldata() msg = ser.read_alldata()
return json.dumps({'msg':bytes.decode(msg)}) return json.dumps({'msg':bytes.decode(msg)})
@app.route('/getledcount', methods=['POST','GET'])
def getledcount():
if read_state()=='on':
return json.dumps({'msg':"请先停止系统"})
ser.read_alldata()
time.sleep(0.1)
ser.read_alldata()
time.sleep(0.1)
ser.read_alldata()
addrs = read_addrs()
addrs = addrs.split('@')
ledcountdict={}
ledcountdict={"A1":{'ledcount':50,'Version':'-DEMO'},"A2":{'ledcount':80,'Version':'-DEMO'}}
for addr in addrs:
command = '{"ADDR":"'+ addr[1:] +'","action":"info"}'+'\r\n'
ser.send_data(command)
time.sleep(0.2)
msg = ser.read_alldata()
print(msg)
addr=re.sub('A1(\d)',r'B\1',addr)
if msg is not None and len(msg)>0:
msg=bytes.decode(msg)
ledinfo={}
ledinfo['ledcount']=getinfo(msg,'Led Count=')
ledinfo['DeviceUniqueID']=getinfo(msg,'DeviceUniqueID=')
ledinfo['Version']=getinfo(msg,'Version=')
ledcountdict[addr]=ledinfo
return json.dumps(ledcountdict)
@app.route('/setledcolor', methods=['POST'])
def setledcolor():
data = request.get_json()
if data is not None:
addr = data["addr"][1:]
color=data["color"]
ledlist = data["ledlist"]
list=[]
if len(ledlist)>0:
ledlist.insert(0,COLORS.get(color)[0])
list.append(ledlist)
#command = '{{"ADDR":{},"color":[{},{}]}}'.format(addr,ledlist,closeledlist)
command = '{{"ADDR":{},"color":{}}}\n'.format(addr,list)
ser.send_data(command)
return json.dumps({'msg':command})
from app.scan_collection import transfer
last_sensor_dict={}
@app.route('/getshelfstate', methods=['POST','GET'])
def getshelfstate():
global last_sensor_dict
if read_state()=='on':
return json.dumps({'msg':"请先停止系统"})
dataaddrs = request.get_json()
command="";
if dataaddrs is None:
dataaddrs = read_addrs()
else:
dataaddrs = dataaddrs['addrs']
addrs=dataaddrs.split('@')
for addr in addrs:
command +=addr + '\n'
while ser.read_alldata():
continue
ser.send_data(command)
time.sleep(0.05)
#time.sleep(0.1)
location_status=ser.read_alldata()
#print("location_status",location_status)
#location_status="A1[1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]\r\nA2[0020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]\r\nA11[0000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]"
#location_status = bytes(location_status.encode('utf-8'))
# print (location_status)
cur_dict = transfer(location_status)
ProcessLastSensor(cur_dict,last_sensor_dict)
last_sensor_dict=cur_dict.copy()
return json.dumps(cur_dict)
def ProcessLastSensor(cur_dict,last_sensor_dict):
if len(last_sensor_dict)>0:
for k,v in cur_dict.items():
lightlist=[]
closelist=[]
lv = last_sensor_dict.get(k)
if lv:
for i in range(0,len(lv)):
if v[i]=='1' and v[i]!=lv[i]:
lightlist.append(i+1)
elif v[i]=='0' and v[i]!=lv[i]:
closelist.append(i+1)
list=[]
if len(lightlist)>0:
lightlist.insert(0,COLORS.get('white')[0])
list.append(lightlist)
if len(closelist)>0:
closelist.insert(0,COLORS.get('off')[0])
list.append(closelist)
if len(list)>0:
command = '{{"ADDR":{},"color":{}}}\n'.format(k[1:],list)
print("ProcessLastSensor",command)
ser.send_data(command,False)
return True
else:
return True
return False
#16mm 放8mm料盘;25mm放16mm料盘;35放24mm料盘;45放32mm料盘;54门放44mm料盘 #16mm 放8mm料盘;25mm放16mm料盘;35放24mm料盘;45放32mm料盘;54门放44mm料盘
#plateh={'1':8,'2':16,'3':24,'4':32,'5':44} #plateh={'1':8,'2':16,'3':24,'4':32,'5':44}
plateh={'20':8,'14':16,'10':24,'8':32,'5':44} plateh={'20':8,'14':16,'10':24,'8':32,'5':44}
...@@ -213,6 +323,7 @@ def buildstore(): ...@@ -213,6 +323,7 @@ def buildstore():
#data = request.get_json() #data = request.get_json()
addrs = read_addrs() addrs = read_addrs()
addrs = addrs.split('@') addrs = addrs.split('@')
ledcountdict={}
title = '位置,address,led,sensor,优先级,高度,宽度' title = '位置,address,led,sensor,优先级,高度,宽度'
contents=title+'\r\n' contents=title+'\r\n'
body=[] body=[]
...@@ -228,6 +339,8 @@ def buildstore(): ...@@ -228,6 +339,8 @@ def buildstore():
#msg='ADDR=1//ShelfPartName=NLP-00030-10-330-01//BoardIsOK=1//ReadLedType 0=2//ReadLedType 1=2//ReadLedType 2=0//ReadLedType 3=0//ReadLedType 4=0//LedType=7,7,7,6,0,0,0,0,0,0,//Led Count=27//Led List=1,2,3,4,5,6,7,11,12,13,14,15,16,17,21,22,23,24,25,26,27,31,32,33,34,35,36,//RowWidth=13//' #msg='ADDR=1//ShelfPartName=NLP-00030-10-330-01//BoardIsOK=1//ReadLedType 0=2//ReadLedType 1=2//ReadLedType 2=0//ReadLedType 3=0//ReadLedType 4=0//LedType=7,7,7,6,0,0,0,0,0,0,//Led Count=27//Led List=1,2,3,4,5,6,7,11,12,13,14,15,16,17,21,22,23,24,25,26,27,31,32,33,34,35,36,//RowWidth=13//'
if msg is not None: if msg is not None:
msg=bytes.decode(msg) msg=bytes.decode(msg)
if msg.find("timeout"):
return json.dumps({'msg':'库位建立失败,地址:'+addr+' 通讯失败,请检查后重试.'})
ledcount=getinfo(msg,'Led Count=') ledcount=getinfo(msg,'Led Count=')
_LedType = getinfo(msg,'LedType=').split(',') _LedType = getinfo(msg,'LedType=').split(',')
ShelfPartName=getinfo(msg,'ShelfPartName=') ShelfPartName=getinfo(msg,'ShelfPartName=')
...@@ -239,6 +352,7 @@ def buildstore(): ...@@ -239,6 +352,7 @@ def buildstore():
LedType.append(ll) LedType.append(ll)
LedTypeCopy.append(ll) LedTypeCopy.append(ll)
LedTypeindex=0 LedTypeindex=0
ledcountdict[addr]=ledcount
print(addr+' ledcount:'+ledcount); print(addr+' ledcount:'+ledcount);
for num in range(1,int(ledcount)+1): for num in range(1,int(ledcount)+1):
#ledh=plateh[getinfo(msg,'ReadLedType '+str(LedTypeindex)+'=')] #ledh=plateh[getinfo(msg,'ReadLedType '+str(LedTypeindex)+'=')]
......
...@@ -14,7 +14,7 @@ import requests ...@@ -14,7 +14,7 @@ import requests
from config import Config from config import Config
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from app.utils.serial_communication import SerialCommunication from app.utils.serial_communication import SerialCommunication
from serial_config import ReadConfig #from serial_config import ReadConfig
from flask_babel import Babel, gettext from flask_babel import Babel, gettext
basepath = os.path.dirname(__file__) basepath = os.path.dirname(__file__)
...@@ -81,11 +81,9 @@ def read_state(): ...@@ -81,11 +81,9 @@ def read_state():
state=f.readline() state=f.readline()
return state return state
from app.utils.g import ver,shelfconfig
def read_version(): def read_version():
uploads_path = basepath + Config.STATE_PATH return ver()
with open(uploads_path + "version.txt","r") as f:
s_version=f.readline()
return s_version
def confirm_ip(data): def confirm_ip(data):
uploads_path = basepath + Config.STATE_PATH uploads_path = basepath + Config.STATE_PATH
...@@ -100,6 +98,9 @@ def confirm_ip(data): ...@@ -100,6 +98,9 @@ def confirm_ip(data):
writer.writerow(data) writer.writerow(data)
ip_config['ip'] = data['ip'] ip_config['ip'] = data['ip']
ip_config['cid'] = data['cid'] ip_config['cid'] = data['cid']
shelfconfig.cid=ip_config['cid']
shelfconfig.server_addr = ip_config['ip']
shelfconfig.Save()
confirm_autostart() confirm_autostart()
...@@ -197,18 +198,16 @@ def read_com(): ...@@ -197,18 +198,16 @@ def read_com():
uploads_path = basepath + Config.STATE_PATH uploads_path = basepath + Config.STATE_PATH
with open(uploads_path + "serialcom.txt","r") as f: with open(uploads_path + "serialcom.txt","r") as f:
serialcom=f.readline() serialcom=f.readline()
shelfconfig.serial_port=serialcom
return serialcom return serialcom
def read_addrs(): def read_addrs():
uploads_path = basepath + Config.STATE_PATH uploads_path = basepath + Config.STATE_PATH
with open(uploads_path + "addrs.txt","r") as f: with open(uploads_path + "addrs.txt","r") as f:
addrs=f.readline() addrs=f.readline()
shelfconfig.addrs = addrs
addrs=addrs.replace('B','A1') addrs=addrs.replace('B','A1')
addrs=addrs.replace('b','A1') addrs=addrs.replace('b','A1')
#for addr in addrs:
# if addr.upper().startswith('B'):
# addr='A1'+addr[1:]
return addrs return addrs
def confirm_com(state): def confirm_com(state):
...@@ -217,29 +216,16 @@ def confirm_com(state): ...@@ -217,29 +216,16 @@ def confirm_com(state):
comnum="/dev/ttyUSB{}".format(state) comnum="/dev/ttyUSB{}".format(state)
f.write(comnum) f.write(comnum)
f.close() f.close()
shelfconfig.serial_port=comnum
shelfconfig.Save()
def confirm_addrs(state): def confirm_addrs(state):
uploads_path = basepath + Config.STATE_PATH uploads_path = basepath + Config.STATE_PATH
with open(uploads_path + "addrs.txt","w") as f: with open(uploads_path + "addrs.txt","w") as f:
f.write(state) f.write(state)
f.close() f.close()
# def update_serial(): shelfconfig.addrs=state
# data = request.get_json() shelfconfig.Save()
# serial_num = data.get('serial_num','')
# addrs = data.get('addrs','')
# serial_cf = ReadConfig()
# if serial_num:
# serial_num = "/dev/ttyUSB{}".format(serial_num)
# serial_cf.update_serial("default_com",serial_num)
# print (serial_num)
# if addrs:
# print (addrs)
# serial_cf.update_serial("addrs",addrs)
# addrs = serial_cf.get_serial("addrs")
# serial_num = serial_cf.get_serial("default_com")
# ad_serial = "串口号:{},地址:{}".format(serial_num,addrs)
# return json.dumps({"addr":ad_serial})
@app.route('/sendcolor',methods=['POST']) @app.route('/sendcolor',methods=['POST'])
def sendcolor(): def sendcolor():
...@@ -265,6 +251,26 @@ def sendcolor(): ...@@ -265,6 +251,26 @@ def sendcolor():
logging.warning("颜色集合同步状态:{}".format(status_list)) logging.warning("颜色集合同步状态:{}".format(status_list))
return json.dumps({"msg":"颜色集合同步状态:{}".format(status_list)}) return json.dumps({"msg":"颜色集合同步状态:{}".format(status_list)})
from app.utils.g import shelfconfig
@app.route('/setconfig',methods=['POST'])
def setconfig():
data = request.get_json()
cf={}
for k,v in shelfconfig.__dict__.items():
if data.get(k):
setattr(shelfconfig,k,data.get(k))
cf[k]=data.get(k)
if len(cf)>0:
shelfconfig.Save()
return json.dumps(cf)
@app.route('/setMC',methods=['POST'])
def setMC():
data = request.data.decode('UTF-8')+'\n'
ser = SerialCommunication()
ser.send_data(data)
return "ok"
def remove_a(s): def remove_a(s):
number = re.findall("\d+",s) number = re.findall("\d+",s)
if number: if number:
......
...@@ -72,6 +72,9 @@ def induction_config(): ...@@ -72,6 +72,9 @@ def induction_config():
@app.route('/induction_admin') @app.route('/induction_admin')
def induction_admin(): def induction_admin():
return render_template('induction_admin.html') return render_template('induction_admin.html')
@app.route('/induction_debug')
def induction_debug():
return render_template('induction_debug.html')
# 下载日志文件 # 下载日志文件
@app.route('/download/', methods=['GET'], strict_slashes=False) @app.route('/download/', methods=['GET'], strict_slashes=False)
......
import numpy as np
import re import re
import time import time
import timeit
import functools import functools
# from app.induction_test import config_dict,convert_dict # from app.induction_test import config_dict,convert_dict
......
/dev/ttyUSBCOM9
\ No newline at end of file \ No newline at end of file
/dev/ttyUSBneotel
\ No newline at end of file \ No newline at end of file
此文件类型无法预览
...@@ -73,6 +73,7 @@ ...@@ -73,6 +73,7 @@
<option>B1</option> <option>B1</option>
<option>B2</option> <option>B2</option>
<option>B3</option> <option>B3</option>
<option>B4</option>
<option>B5</option> <option>B5</option>
<option>B6</option> <option>B6</option>
<option>B7</option> <option>B7</option>
...@@ -126,8 +127,18 @@ ...@@ -126,8 +127,18 @@
<div class="panel-body"> <div class="panel-body">
<!-- <label for="a3" class="col-sm-1">地址检测</label> --> <!-- <label for="a3" class="col-sm-1">地址检测</label> -->
<div class="col-sm-12"> <div class="col-sm-12">
<input type="text" id="command" /> <input type="button" value="{{ _('发送') }}" <select class="form-control" onchange="$('#command').val(this.value)" style="width:300px">
class="btn btn-warning" onclick="command_test()" /> <option>快捷指令</option>
<option>report</option>
<option>A1</option>
<option>{"ADDR":"1","action":"info"}</option>
<option>{"ADDR":"1","action":"PrintStatus"}</option>
</select>
</div>
<div class="col-sm-12">
<input type="text" id="command" class="form-control" style="width:300px;float: left;margin: 5px;"/>
<input type="button" value="{{ _('发送') }}" class="btn btn-warning" onclick="command_test()" style="float: left;;margin: 5px"/>
<input type="checkbox" id="autoclear" class="form-control checkbox" value="123" style="height: 20px;width:20px;float: left;margin: 5px" />自动清除历史数据
</div> </div>
<div class="col-sm-12"></div> <div class="col-sm-12"></div>
<!-- <label for="a4" class="col-sm-1">阈值</label> <!-- <label for="a4" class="col-sm-1">阈值</label>
...@@ -315,7 +326,9 @@ ...@@ -315,7 +326,9 @@
var data = { var data = {
"command": document.getElementById("command").value "command": document.getElementById("command").value
} }
if ($("#autoclear")[0].checked){
$("#commandrecv").val("");
}
$.ajax({ $.ajax({
url: "/writeserial", url: "/writeserial",
type: "post", type: "post",
......
...@@ -220,7 +220,7 @@ ...@@ -220,7 +220,7 @@
console.log(data) console.log(data)
}, },
error:function(e){ error:function(e){
alert("{{_('未获取到状态信息')}}"); //alert("{{_('未获取到状态信息')}}");
} }
}) })
} }
......
Serl = None
\ No newline at end of file \ No newline at end of file
import pickle
import os
from config import Config
Serl = None
basepath = os.path.dirname(__file__)
shelfconfigfile = basepath+"/../" + Config.STATE_PATH+'shelfconfig.pkl'
def ver():
return "1.6"
class _shelfconfig:
def __init__(self):
self.server_addr ="not_config"
self.serial_port="not_config"
self.cid="not_config"
self.addrs="not_config"
self.after_instore_close_light=False
self.after_instore_light_color='orange'
def Save(self):
with open(shelfconfigfile, 'wb') as f:
pickle.dump(self, f)
shelfconfig = _shelfconfig()
if os.path.exists(shelfconfigfile):
with open(shelfconfigfile, 'rb') as f:
shelfconfig = pickle.load(f)
print("shelfconfig",shelfconfig.__dict__.items())
...@@ -8,7 +8,7 @@ import serial ...@@ -8,7 +8,7 @@ import serial
import serial.tools.list_ports import serial.tools.list_ports
import logging import logging
from config import Config from config import Config
from serial_config import ReadConfig #from serial_config import ReadConfig
from flask_babel import Babel, gettext as _ from flask_babel import Babel, gettext as _
class Communication(): class Communication():
......
'''
@ author: jie
@ tools: pycharm
@ content: 串口通讯实现类
@ date: 2021.5.10
'''
import serial
import serial.tools.list_ports
import logging
from config import Config
from serial_config import ReadConfig
from flask_babel import Babel, gettext as _
class Communication():
def __init__(self):
serial_cf = ReadConfig()
self.port = serial_cf.get_serial("DEFAULT_COM")
self.bps = int(serial_cf.get_serial("DEFAULT_BAUDRATE"))
# self.timeout = float(serial_cf.get_serial("DEFAULT_TIMEOUT"))
self.timeout = None
try:
# 打开串口,并得到串口对象
self.main_engine= serial.Serial(port=self.port,baudrate=self.bps,timeout=self.timeout)
# 判断是否打开成功
# if (self.main_engine.is_open):
# print ("串口已打开")
except Exception as e:
print("---异常---:", e)
# 打印设备基本信息
def print_name(self):
print(self.main_engine.name) #设备名字
print(self.main_engine.port)#读或者写端口
print(self.main_engine.baudrate)#波特率
print(self.main_engine.bytesize)#字节大小
print(self.main_engine.parity)#校验位
print(self.main_engine.stopbits)#停止位
print(self.main_engine.timeout)#读超时设置
print(self.main_engine.writeTimeout)#写超时
print(self.main_engine.xonxoff)#软件流控
print(self.main_engine.rtscts)#软件流控
print(self.main_engine.dsrdtr)#硬件流控
print(self.main_engine.interCharTimeout)#字符间隔超时
#打开串口
def Open_Engine(self):
self.main_engine.open()
#关闭串口
def Close_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open) # 检验串口是否打开
# 打印可用串口列表
def get_used_port(self):
port_list = list(serial.tools.list_ports.comports())
# print(port_list)
return port_list
# 检查串口连接状态
def check_serial(self):
try:
if (self.main_engine.is_open):
return _('串口连接正常')
else:
return _('串口连接失败')
except Exception as e:
error_msg = 'Error,{}'.format(e)
return error_msg
#接收指定大小的数据
#从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def read_Size(self,size):
return self.main_engine.read(size=size)
#接收一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def read_Line(self):
return self.main_engine.readline()
def read_alldata(self):
if self.main_engine.in_waiting:
return self.main_engine.read_all()
#发数据
def send_data(self,data):
# text = '{"ADDR":"99","colorset:[[1,255,0,0],[2,0,255,255],[5,132,142,110]]}\n{"ADDR":"1","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n{"ADDR":"3","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n'
data = bytes(data.encode('utf-8'))
self.main_engine.write(data)
#更多示例
# self.main_engine.write(chr(0x06).encode("utf-8")) # 十六制发送一个数据
# print(self.main_engine.read().hex()) # # 十六进制的读取读一个字节
# print(self.main_engine.read())#读一个字节
# print(self.main_engine.read(10).decode("gbk"))#读十个字节
# print(self.main_engine.readline().decode("gbk"))#读一行
# print(self.main_engine.readlines())#读取多行,返回列表,必须匹配超时(timeout)使用
# print(self.main_engine.in_waiting)#获取输入缓冲区的剩余字节数
# print(self.main_engine.out_waiting)#获取输出缓冲区的字节数
# print(self.main_engine.readall())#读取全部字符。
#
def Recive_data(self,way):
# 循环接收数据,此为死循环,可用线程实现
print("开始接收数据:")
while True:
try:
# 一个字节一个字节的接收
if self.main_engine.in_waiting:
if(way == 0):
for i in range(self.main_engine.in_waiting):
print("接收ascii数据:"+str(self.Read_Size(1)))
data1 = self.Read_Size(1).hex()#转为十六进制
data2 = int(data1,16)#转为十进制print("收到数据十六进制:"+data1+" 收到数据十进制:"+str(data2))
if(way == 1):
#整体接收
# data = self.main_engine.read(self.main_engine.in_waiting).decode("utf-8")#方式一
data = self.main_engine.read_all()#方式二print("接收ascii数据:", data)
except Exception as e:
print("异常报错:",e)
# if __name__ == '__main__':
# myser = Communication("/dev/ttyUSB0",115200,1)
# myser.print_name()
# myser.get_used_port()
# myser.delete_port()
# myser.show_port()
...@@ -35,8 +35,8 @@ class SerialCommunication(): ...@@ -35,8 +35,8 @@ class SerialCommunication():
self.TryConnectSerial() self.TryConnectSerial()
else: else:
gg.Serl = serial.Serial(port=self.port, baudrate=self.bps, timeout=self.timeout) gg.Serl = serial.Serial(port=self.port, baudrate=self.bps, timeout=self.timeout)
if not gg.Serl.is_open: #if not gg.Serl.is_open:
gg.Serl.open() # gg.Serl.open()
log("串口已打开1") log("串口已打开1")
except Exception as e: except Exception as e:
log("---异常---:", e) log("---异常---:", e)
...@@ -62,8 +62,8 @@ class SerialCommunication(): ...@@ -62,8 +62,8 @@ class SerialCommunication():
currentport = portpre+str(i) currentport = portpre+str(i)
log("尝试打开串口:{}".format(currentport)) log("尝试打开串口:{}".format(currentport))
gg.Serl = serial.Serial(port=currentport, baudrate=self.bps, timeout=self.timeout) gg.Serl = serial.Serial(port=currentport, baudrate=self.bps, timeout=self.timeout)
if not gg.Serl.is_open: #if not gg.Serl.is_open:
gg.Serl.open() # gg.Serl.open()
# 判断是否打开成功 # 判断是否打开成功
gg.Serl.write(bytes("A1\n".encode('utf-8'))) gg.Serl.write(bytes("A1\n".encode('utf-8')))
self.main_engine=gg.Serl self.main_engine=gg.Serl
...@@ -136,8 +136,12 @@ class SerialCommunication(): ...@@ -136,8 +136,12 @@ class SerialCommunication():
def read_alldata(self): def read_alldata(self):
try: try:
if self.main_engine.in_waiting: print("in_waiting",self.main_engine.in_waiting)
return self.main_engine.read_all() data=bytes(bytes("".encode('utf-8')))
while self.main_engine.in_waiting:
data=data+ self.main_engine.read_all()
time.sleep(0.05)
return data
except Exception as e: except Exception as e:
log('串口读取数据错误,{}'.format(e)) log('串口读取数据错误,{}'.format(e))
self.TryConnectSerial() self.TryConnectSerial()
...@@ -150,14 +154,16 @@ class SerialCommunication(): ...@@ -150,14 +154,16 @@ class SerialCommunication():
# data= str(binascii.b2a_hex(t.read(num)))[2:-1] #十六进制显示方法2 # data= str(binascii.b2a_hex(t.read(num)))[2:-1] #十六进制显示方法2
# 发数据 # 发数据
def send_data(self, data): def send_data(self, data, wait=True):
# self.main_engine.flushInput() # self.main_engine.flushInput()
# self.main_engine.flushOutput() # self.main_engine.flushOutput()
# text = '{"ADDR":"99","colorset:[[1,255,0,0],[2,0,255,255],[5,132,142,110]]}\n{"ADDR":"1","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n{"ADDR":"3","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n' # text = '{"ADDR":"99","colorset:[[1,255,0,0],[2,0,255,255],[5,132,142,110]]}\n{"ADDR":"1","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n{"ADDR":"3","color":[[1,10,20,30],[2,31,37,49],[5,1,3,5,7,9,11,14,18]]}\n'
try: try:
print('send_data:'+data) print("out_waiting",self.main_engine.out_waiting,'send_data:'+data.replace('\n','\\n')+"EOF")
data = bytes(data.encode('utf-8')) data = bytes(data.encode('utf-8'))
self.main_engine.write(data) self.main_engine.write(data)
if wait:
self.main_engine.flush()
except Exception as e: except Exception as e:
log('串口发送数据错误,{}'.format(e)) log('串口发送数据错误,{}'.format(e))
self.TryConnectSerial() self.TryConnectSerial()
......
此文件类型无法预览
此文件的差异太大,无法显示。
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!