ion_calibration.py
5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from flask import jsonify, request,Response
from app.models import Product,Testdata
from app.utils.ad_convter import read
from app.utils import driver_gpio
# from app.saved_model.train import IonsModel
# from app.api.auths import Auth
from app import Config
from app import common
from app import db
from app.api import bp
from app.api.errors import bad_request
import random
from werkzeug.utils import secure_filename
from datetime import datetime
import os
import threading
import time
import numpy as np
import logging
value = {}
cal_start = False
history_value = []
record_solution = {"cols":[]}
# 开启校准
@bp.route("/calibrationstart",methods=['POST'])
def calibrationstart():
global value
global cal_start
if not cal_start:
data = request.get_json()
cal_start = True
value = {
'voltage':0,
'conductivity':0,
'stage':'filter',
'stage_change':False,
'times':0
# 'history':[]
}
# value['history'] = []
back = threading.Thread(target=back_calibration)
back.start()
driver_io(Config.SET_GPIO['filter'],sign=True)
driver_io(Config.SET_GPIO['pumb'],sign=True)
payload = {'code':200}
return jsonify(common.trueReturn(payload, "start calibration success"))
else:
payload = {'code':200}
return jsonify(common.falseReturn(payload, "start calibration failed"))
# 停止校准
@bp.route("/calibrationstop",methods=['POST'])
def calibrationstop():
global cal_start
global record_solution
cal_start = False
# calibrationtrain(record_solution)
payload = record_solution
# if cal_start:
# cal_start = False
record_solution = {"cols":[]}
driver_io(Config.SET_GPIO['clean'],sign=False)
driver_io(Config.SET_GPIO['filter'],sign=False)
driver_io(Config.SET_GPIO['pumb'],sign=False)
return jsonify(common.trueReturn(payload, "stop calibration success"))
# 控制阀体,过滤阀体,清洗阀体,水泵IO
def driver_io(pin,sign):
if sign:
driver_gpio.init(pin)
driver_gpio.gpio_high(pin)
else:
driver_gpio.init(pin)
driver_gpio.gpio_low(pin)
return True
def back_calibration():
global value
global history_value
seq = 0
while cal_start:
# c_x = random.randint(1, 20)
voltage,origin = ad_value()
# c_y = random.randint(1, 20)
c_y = voltage/10
value['voltage'] = voltage
value['conductivity'] = c_y
# c_y = ad_value()
value['times'] = seq
if not value['stage_change']:
history_value.append(origin)
# value['msg'] = 'success'
# 判断数值是否已经稳定
# print (value)
time.sleep(1)
seq += 1
# value = {}
history_value = []
return True
def ad_value():
A0 = read(0)
standard_v = 5.00
# voltage = ((5.20/255)*A1)*1000
voltage = (5.00/255)*A0
# voltage = format(voltage, '.2f')
# print ("A0:", A0, "A1:", A1,
# "A2:", A2,"A3:", voltage)
# logging.info("原始值:{}----电压:{}V".format(A0,voltage))
# check_stable()
# print ("原始值:{}----电压:{}V".format(A0,voltage))
logging.info("原始值:{}----电压:{}V".format(A0,voltage))
return voltage,A0
def check_stable():
global history_value
global value
global cal_start
# 判断数据是否大于30个
if len(history_value) > 30:
check_value=history_value[-30:]
check_var = np.var(check_value)
print ("方差:{}".format(check_var))
if check_var < 1 and not value['stage_change']:
value['stage_change'] = True
history_value = []
return True
# 弹框路由
@bp.route("/calibrationchange",methods=['POST'])
def calibrationchange():
global value
global record_solution
data = request.get_json()
# value['table_conductivity'] = random.randint(1, 20)
# calibrationstart()
print ("----------data:{}".format(data))
change_stage=data['stage']
value['stage'] = change_stage
value['stage_change'] = False
if data.get('solution'):
if record_solution['cols']:
new_solution = float(data['solution']) + record_solution['cols'][-1]['solution']
# record_solution['solution'].append(new_solution)
else:
new_solution = float(data['solution'])
d = {
"solution":new_solution,
"table_conductivity":value['voltage']
}
record_solution['cols'].append(d)
driver_io(Config.SET_GPIO['filter'],sign=False)
driver_io(Config.SET_GPIO['clean'],sign=True)
return jsonify(common.trueReturn(value, "update stage success"))
# 弹框上停止
@bp.route("/getrecord",methods=['POST'])
def getrecord():
global record_solution
return jsonify(common.trueReturn(record_solution, "record data"))
# 获取实时电压数据接口
@bp.route("/calibrationvalue",methods=['POST'])
def calibrationvalue():
# current_value = AdConvter.read()
global value
# if cal_start:
print (value)
return jsonify(common.trueReturn(value, "success"))
# else:
# return jsonify(common.falseReturn(value, "failed,calibration not start"))
# 获取校准是否开启
@bp.route("/calibrationstatus",methods=['POST'])
def calibrationstatus():
# current_value = AdConvter.read()
global cal_start
payload = {"code":200,"calibration_status":cal_start}
return jsonify(common.trueReturn(payload, "success"))