routes.py 11.8 KB
# codinf:utf-8
from flask import render_template, Response,request,redirect,url_for,send_from_directory
# from flask_socketio import SocketIO, emit, disconnect
from werkzeug.utils import secure_filename
from random import choice
import json
import time
import csv
import threading
import os
import re
from time import sleep
from config import Config
from rpi_ws281x import Adafruit_NeoPixel, Color,PixelStrip
from app import app
# from app import socketio
from app.led_strip import get_strip
import logging
import RPi.GPIO as GPIO

config_dict = {}
status_lights = {}
option_list = []
basepath = os.path.dirname(__file__)
uploads_path = basepath + Config.UPLOAD_FOLDER
state_path = basepath + Config.STATE_PATH
try:
    with open(uploads_path +'/linePositions.csv', 'r',encoding='gbk') as f:
        reader = csv.reader(f)
        for row in reader:
            if row[0] == 'status':
                status_lights[row[0]] = row[7] + '@' +row[6]
            else:
                config_dict[row[0]] = row[7]+'@'+row[6]
                option_list.append(row[0])
    logging.warning("配置文件加载成功...")
    config_state = '加载成功...'
except Exception as e:
    logging.warning("配置文件加载失败,请上传配置文件")
    config_state = '加载失败,请在配置页面上传配置文件'
SET_CHANNEL = {'green':16,'red':5,'yellow':6}
SET_LED_CHANNEL = {'1':12,'2':21}

led_lights1 = {}
led_lights2 = {}

# @app.route('/taillog',methods=['POST'])
# def taillog():
#     a=os.popen("tail -n 20 /prog/smartshelf/logs/smart.log").readlines()
#     s = '<br>'
#     p = s.join(a)
#     jsondata = {'msg':p}
#     return json.dumps(jsondata)

@app.route('/')
@app.route('/index')
def index():
    logging.warning("控制界面")
    return render_template('base.html',config_state=config_state)


@app.route('/ledtest')
def ledtest():
    logging.warning("测试界面")
    return render_template('ledtest.html',option_list=option_list,config_state=config_state)

@app.route('/shelfconfig')
def shelfconfig():
    logging.warning("配置界面")
    return render_template('shelfconfig.html',config_state=config_state)

@app.route('/workinglight',methods=['POST'])
def workinglight():
    data = request.get_json()
    color = data['workcolor']
    global led_lights1
    work_light=status_lights.get('status','')
    if work_light:
        worklight=work_light.split('@')
        workindex = int(worklight[0])
        channel = worklight[1]
        print (channel,workindex)
        strip = get_strip(SET_LED_CHANNEL[channel])
        for key,value in led_lights1.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
        strip.setPixelColor(workindex,set_indexcolor(color))
        strip.show()
        msg = "状态灯点亮"
        logging.warning(msg)
    else:
        msg = "未配置状态灯地址"
        logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)

@app.route('/workingoff',methods=['POST'])
def workingoff():
    global led_lights1
    work_light=status_lights.get('status','')
    if work_light:
        worklight=work_light.split('@')
        workindex = int(worklight[0])
        channel = worklight[1]
        strip = get_strip(SET_LED_CHANNEL[channel])
        for key,value in led_lights1.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
        strip.setPixelColor(workindex,set_indexcolor('off'))
        strip.show()
        msg = "状态灯关闭"
        logging.warning(msg)
    else:
        msg = "未配置状态灯地址"
        logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)


def set_indexcolor(light_led_color):
    if light_led_color == '红':
        color = Color(0,255,0)
    elif light_led_color == '黄':
        color = Color(255,255,0)
    elif light_led_color == '蓝':
        color = Color(0,0,255)
    elif light_led_color == '绿':
        color = Color(255,0,0)
    else:
        color = Color(0,0,0)
    return color

# 处理库位操作测试>>亮灯操作
def process_lightindex(data):
    light_led_color = data['light_led_color']
    color = set_indexcolor(light_led_color)
    light_led = data['light_led']
    # readyon_led = int(config_dict.get(light_led))
    channel = config_dict.get(light_led).split('@')[1]
    led_index = config_dict.get(light_led).split('@')[0]
    if channel == '1':
        global led_lights1
        pin = SET_LED_CHANNEL['1']
        if led_index not in led_lights1.keys():
            led_lights1[led_index] = color
    else:
        global led_lights2
        pin = SET_LED_CHANNEL['2']
        if led_index not in led_lights2.keys():
            led_lights2[led_index] = color
    strip = get_strip(pin)
    return channel,strip

# 处理库位操作测试>>灭灯操作   
def process_offindex(data):
    off_led = data['off_led']
    channel = config_dict.get(off_led).split('@')[1]
    led_index = config_dict.get(off_led).split('@')[0]
    if channel == '1':
        pin = SET_LED_CHANNEL['1']
        global led_lights1
        if led_index in led_lights1.keys():
            del led_lights1[led_index]
    else:
        pin = SET_LED_CHANNEL['2']
        global led_lights2
        if led_index in led_lights2.keys():
            del led_lights2[led_index]
    strip = get_strip(pin)
    return channel,strip

@app.route('/ledopen',methods=['POST'])
def ledopen():
    data = request.get_json()
    channel,strip = process_lightindex(data)
    if channel == '1':
        global led_lights1
        for key,value in led_lights1.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
        # print ("111")
    else:
        global led_lights2
        for key,value in led_lights2.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
        # print ("222")
    strip.show()
    logging.warning("当前通道1ON_LED:{},当前通道2ON_LED:{}".format(led_lights1,led_lights2))
    msg = '灯地址:{},已点亮'.format(data['light_led'])
    logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)

@app.route('/ledoff',methods=['POST'])
def ledoff():
    data = request.get_json()
    channel,strip = process_offindex(data)
    if channel == '1':
        global led_lights1
        for key,value in led_lights1.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
    else:
        global led_lights2
        for key,value in led_lights2.items():
            c_index=list(map(int,re.split(';',key)))
            for on_led in c_index:
                strip.setPixelColor(on_led, value)
    strip.show()
    msg = '灯地址:{},已关闭'.format(data['off_led'])
    logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)

@app.route('/resetled',methods=['POST'])
def resetled():
    threads = []
    threads.append(threading.Thread(target=reset_strip1))
    threads.append(threading.Thread(target=reset_strip2))
    for t in threads:
        t.start()
    msg = '灯条已重置'
    logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)

def reset_strip1():
    global led_lights1
    pin=SET_LED_CHANNEL['1']
    strip = get_strip(pin)
    for i in range(0,strip.numPixels()):   #设置个循环(循环次数为LED数量)
        strip.setPixelColor(i, Color(0,0,0))
    strip.show()
    led_lights1 = {}

def reset_strip2():
    global led_lights2
    pin=SET_LED_CHANNEL['2']
    strip = get_strip(pin)
    for i in range(0,strip.numPixels()):   #设置个循环(循环次数为LED数量)
        strip.setPixelColor(i, Color(0,0,0))
    strip.show()
    led_lights2 = {}



# 上传配置文件
@app.route('/upload/', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        # check if the post request has the file part
        if 'file' not in request.files:
            flash('No file part')
            return redirect(request.url)
        file = request.files['file']
        # if user does not select file, browser also
        # submit an empty part without filename
        if file.filename == '':
            flash('No selected file')
            return redirect(request.url)
        if file:
            basepath = os.path.dirname(__file__)
            uploads_path = basepath + Config.UPLOAD_FOLDER
            # print (uploads_path)
            # print (Config.UPLOAD_FOLDER)
            filename = secure_filename(file.filename)
            file.save(os.path.join(uploads_path, filename))
            print (filename)
            reload_config()
            return  '{"filename":"%s"}' % filename
    return True

# 下载日志文件
@app.route('/download/', methods=['GET'], strict_slashes=False)
def download():
    log_path = '/prog/smartshelf/logs/'
    ls=os.path.join(log_path, 'smart.log')
    if request.method == "GET":
        if os.path.isfile(os.path.join(log_path, 'smart.log')):
            return send_from_directory(log_path, 'smart.log', as_attachment=True)

def reload_config():
    global config_dict
    global config_state
    global option_list
    print (config_dict,option_list)
    try:
        with open(uploads_path +'/linePositions.csv', 'r',encoding='gbk') as f:
            reader = csv.reader(f)
            config_dict = {}
            option_list = []
            for row in reader:
                if row[0] == 'status':
                    status_lights[row[0]] = row[7] + '@' +row[6]
                else:
                    config_dict[row[0]] = row[7]+'@'+row[6]
                    option_list.append(row[0])
        logging.warning("配置文件加载成功")
        config_state = '加载成功'
        print (config_dict,option_list)
    except Exception as e:
        logging.warning("配置文件加载失败,请上传配置文件")
        config_state = '重新加载失败,请在配置页面上传配置文件'

def process_linetest(data):
    if data['channel_num'] == "通道1":
        LED_PIN = SET_LED_CHANNEL['1']
    else:
        LED_PIN = SET_LED_CHANNEL['2']
    # print (data['channel_color'])
    if data['channel_color'] == "绿":
        # print ("绿色")
        color = Color(255,0,0)
    elif data['channel_color'] == "蓝":
        # print ("蓝色")
        color = Color(0,0,255)
    elif data['channel_color'] == "红":
        # print ("红色")
        color = Color(0,255,0)
    elif data['channel_color'] == "黄":
        # print ("黄色")
        color = Color(255,255,0)
    else:
        # print ("白色")
        color = Color(255,255,255)
    return LED_PIN,color



@app.route('/lineledon', methods=['POST'])
def lineledon():
    data = request.get_json()
    pin,color = process_linetest(data)
    strip = get_strip(pin)
    for i in range(0,strip.numPixels()):   #设置个循环(循环次数为LED数量)
        strip.setPixelColor(i, color)
    strip.show()
    msg = "灯条:{},颜色:{},已开启".format(data['channel_num'],data['channel_color'])
    logging.warning(msg)
    info = []
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)

@app.route('/lineledoff', methods=['POST'])
def lineledoff():
    data = request.get_json()
    pin,color = process_linetest(data)
    strip = get_strip(pin)
    for i in range(0,strip.numPixels()):
        strip.setPixelColor(i, Color(0,0,0))
    strip.show()
    info = []
    msg = "灯条:{}已关闭".format(data['channel_num'])
    logging.warning(msg)
    c_dict = {'msg':msg}
    info.append(c_dict)
    return json.dumps(info)