models.py 5.7 KB
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
from flask import url_for

from app import db

class PaginatedAPIMixin(object):
    @staticmethod
    def to_collection_dict(query, page, per_page, endpoint, **kwargs):
        # 如果当前没有任何资源时,或者前端请求的 page 越界时,都会抛出 404 错误
        # 由 @bp.app_errorhandler(404) 自动处理,即响应 JSON 数据:{ error: "Not Found"}
        resources = query.paginate(page, per_page)
        data = {
            'items': [item.to_dict() for item in resources.items],
            '_meta': {
                'page': page,
                'per_page': per_page,
                'total_pages': resources.pages,
                'total_items': resources.total
            },
            '_links': {
                'self': url_for(endpoint, page=page, per_page=per_page,
                                **kwargs),
                'next': url_for(endpoint, page=page + 1, per_page=per_page,
                                **kwargs) if resources.has_next else None,
                'prev': url_for(endpoint, page=page - 1, per_page=per_page,
                                **kwargs) if resources.has_prev else None
            }
        }
        return data

class Users(PaginatedAPIMixin,db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(250),  unique=True, nullable=False)
    username = db.Column(db.String(250),  unique=True, nullable=False)
    password = db.Column(db.String(250))
    login_time = db.Column(db.Integer)

    def __init__(self, username, password, email):
        self.username = username
        self.password = password
        self.email = email

    def __str__(self):
        return "Users(id='%s')" % self.id

    def set_password(self, password):
        return generate_password_hash(password)

    def check_password(self, hash, password):
        return check_password_hash(hash, password)

    def get(self, id):
        return self.query.filter_by(id=id).first()

    def add(self, user):
        db.session.add(user)
        return session_commit()

    def update(self):
        return session_commit()

    def delete(self, id):
        self.query.filter_by(id=id).delete()
        return session_commit()

class Product(PaginatedAPIMixin,db.Model):
    __tablename__ = 'products'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(250),  unique=True, nullable=False)
    length = db.Column(db.Float(10))
    width = db.Column(db.Float(10))
    area = db.Column(db.Float(10))
    ipc = db.Column(db.Float(10))
    testtime = db.Column(db.DateTime(), default=datetime.utcnow)
    note = db.Column(db.String(250))
    img = db.Column(db.String(250),default="default.jpg",server_default="default.jpg")

    def __repr__(self):
        return '<Product {}>'.format(self.name)
     
    def to_dict(self):
        data = {
            'id': self.id,
            'name': self.name,
            'length': self.length,
            'width': self.width,
            'area': self.area,
            'ipc': self.ipc,
            'testtime':self.testtime,
            'note':self.note,
            'img':self.img,
            # 'author': self.author.to_dict(),
            '_links': {
                'self': url_for('api.get_product', id=self.id),
                'frame':url_for('api.get_frame', filename=self.img)
            }
        }
        return data

    def from_dict(self, data):
        for field in ['name', 'length', 'width','area','ipc','note','img']:
            if field in data:
                setattr(self, field, data[field])

# class Setting(db.Model):
#     length = db.Column(db.Float(10))
#     active = db.Column(db.Boolean(0))
#     
class Basedata(PaginatedAPIMixin,db.Model):
    __tablename__ = 'basedata'
    id = db.Column(db.Integer, primary_key=True)
    device_model = db.Column(db.String(250),  unique=True, nullable=False)
    sn = db.Column(db.Float(10))
    calibration_date = db.Column(db.DateTime(), default=datetime.utcnow)
    active = db.Column(db.Boolean(0))
    # area = db.Column(db.Float(10))
    # ipc = db.Column(db.Float(10))
    # testtime = db.Column(db.DateTime(), default=datetime.utcnow)
    # note = db.Column(db.String(250))
    # 
    def __repr__(self):
        return '<Basedata {}>'.format(self.device_model)
     
    def to_dict(self):
        data = {
            'id': self.id,
            'device_model': self.device_model,
            'sn': self.sn,
            'calibration_date': self.calibration_date,
            'active':self.active,
            # 'author': self.author.to_dict(),
            '_links': {
                'self': url_for('api.get_basedata', id=self.id)
            }
        }
        return data

    def from_dict(self, data):
        for field in ['device_model', 'sn', 'calibration_date','active']:
            if field in data:
                setattr(self, field, data[field])

class Testdata(PaginatedAPIMixin,db.Model):
    __tablename__ = 'testdata'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(30))
    product_id = db.Column(db.Integer,db.ForeignKey('products.id'))
    test_type = db.Column(db.String(30))
    test_result = db.Column(db.String(30))
    testtime = db.Column(db.DateTime(), default=datetime.utcnow)
    continue_times = db.Column(db.Float(10)) # 测试持续时间
    datas = db.Column(db.String(30))
    
    def __repr__(self):
        return '<Testdata {}>'.format(self.name)


def session_commit():
    try:
        db.session.commit()
    except SQLAlchemyError as e:
        db.session.rollback()
        reason = str(e)
        return reason