induction_admin.py 4.3 KB
# codinf:utf-8
from flask import request,render_template
# from flask import render_template,
from app import app
import json
import time
import timeit
import pexpect
import os
import csv
import re
import logging
from config import Config
from werkzeug.utils import secure_filename
from app.utils.serial_communication import SerialCommunication
from app.induction_config import read_addrs
from serial_config import ReadConfig
from app.utils.location import Location

config_dict,convert_dict = Location().read_location_file()

# ser = SerialCommunication()

@app.route('/adminaddr_test', methods=['POST'])
def adminaddr_test():
    rec_data = []
    addrs = read_addrs()
    addrs = addrs.split('@')
    ser = SerialCommunication()
    for addr in addrs:
        command = addr + '\n'
        ser.send_data(command)
        line_status=ser.read_Line()
        if "sensor Time out" in line_status:
            rec_data.append(line_status)
        time.sleep(0.1)
    if rec_data:
        c_dict = {'msg':rec_data}
    else:
        c_dict = {'msg':'所有传感器通信成功'}
    return json.dumps(c_dict)

@app.route('/adminled_test', methods=['POST'])
def adminled_test():
    addrs = read_addrs()
    addrs = addrs.split('@')
    ser = SerialCommunication()
    rece_data = []
    for addr in addrs:
        leds = [1,1,100]
        # leds.insert(0,1)
        number = re.findall(r"\d+",addr)[0]
        command = str(json.dumps({"ADDR":number,"ledrange":leds})) + '\n'
        ser.send_data(command)
        line_response = ser.read_Line()
        rece_data.append(line_response)
        time.sleep(0.1)
    msg = "结果:{}".format(rece_data)
    return json.dumps({'msg':msg})

# @app.route('/check_single', methods=['POST'])
# def check_single():
#   data = request.get_json()
#   addr = data['addr']
#   led_index = data['led_index']
#   ser = SerialCommunication()
#     texts = addr + '\n'
#     print (texts)
#     ser.send_data(texts)
#     reponse=ser.read_alldata()
#     # print (reponse)
#     rec = response.decode().split("\r\n")
#     for line in rec:
#       if line.startswith("A") and line.endswith("]")
#           led = re.search("\[.*?\]", line)
#           c_list = list(led.group())
#           # print ("-------------------c_list",c_list)
#           c_list.pop(0)
#           c_list.pop(-1)
#   convert_dict.get()
#   
@app.route('/auto_calibrate', methods=['POST'])
def auto_calibrate():
    data = request.get_json()
    calibrate = data['calibrate']
    addr = data['addr']
    command = str(json.dumps({"ADDR":addr,"calibrate":calibrate})) + '\n'
    ser = SerialCommunication()
    ser.send_data(command)
    print (command)
    msg = "auto calibrate:{}已发送".format(calibrate)
    return json.dumps({'msg':msg})

@app.route('/hancalibrate',methods=['POST'])
def hancalibrate():
    data = request.get_json()
    addr = data.split('@')[0]
    start = int(data.split('@')[1])
    end = int(data.split('@')[2])
    maxv = int(data.split('@')[3])
    minv = int(data.split('@')[4])
    threshold = int(data.split('@')[5])
    setcalibrate = [start,end,maxv,minv,threshold]
    command = str(json.dumps({"ADDR":addr,"setcalibrate":setcalibrate})) + '\n'
    ser = SerialCommunication()
    ser.send_data(command)
    print (command)
    msg = "handle calibrate:{}已发送".format(command)
    return json.dumps({'msg':msg})


@app.route('/getcalibrate',methods=['POST'])
def getcalibrate():
    data = request.get_json()
    addr = data.split('@')[0]
    start = int(data.split('@')[1])
    end = int(data.split('@')[2])
    v1 = 1
    v2 = 1
    v3 = 1
    getcalibrate = [start,end,v1,v2,v3]
    command = str(json.dumps({"ADDR":addr,"getcalibrate":getcalibrate})) + '\n'
    ser = SerialCommunication()
    ser.send_data(command)
    print (command)
    calibrate_value=ser.read_Line()
    print (calibrate_value,type(calibrate_value))
    msg = "get calibrate:{}".format(calibrate_value)
    return json.dumps({'msg':msg})

@app.route('/threshold', methods=['POST'])
def threshold():
    data = request.get_json()
    threshold = int(data['threshold'])
    addr = data['addr']
    command = str(json.dumps({"ADDR":addr,"threshold":threshold})) + '\n'
    ser = SerialCommunication()
    ser.send_data(command)
    print (command)
    msg = "threshold:{}已发送".format(command)
    return json.dumps({'msg':msg})