models.py
5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
from flask import url_for
from app import db
class PaginatedAPIMixin(object):
@staticmethod
def to_collection_dict(query, page, per_page, endpoint, **kwargs):
# 如果当前没有任何资源时,或者前端请求的 page 越界时,都会抛出 404 错误
# 由 @bp.app_errorhandler(404) 自动处理,即响应 JSON 数据:{ error: "Not Found"}
resources = query.paginate(page, per_page)
data = {
'items': [item.to_dict() for item in resources.items],
'_meta': {
'page': page,
'per_page': per_page,
'total_pages': resources.pages,
'total_items': resources.total
},
'_links': {
'self': url_for(endpoint, page=page, per_page=per_page,
**kwargs),
'next': url_for(endpoint, page=page + 1, per_page=per_page,
**kwargs) if resources.has_next else None,
'prev': url_for(endpoint, page=page - 1, per_page=per_page,
**kwargs) if resources.has_prev else None
}
}
return data
class Users(PaginatedAPIMixin,db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(250), unique=True, nullable=False)
username = db.Column(db.String(250), unique=True, nullable=False)
password = db.Column(db.String(250))
login_time = db.Column(db.Integer)
def __init__(self, username, password, email):
self.username = username
self.password = password
self.email = email
def __str__(self):
return "Users(id='%s')" % self.id
def set_password(self, password):
return generate_password_hash(password)
def check_password(self, hash, password):
return check_password_hash(hash, password)
def get(self, id):
return self.query.filter_by(id=id).first()
def add(self, user):
db.session.add(user)
return session_commit()
def update(self):
return session_commit()
def delete(self, id):
self.query.filter_by(id=id).delete()
return session_commit()
class Product(PaginatedAPIMixin,db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(250), unique=True, nullable=False)
length = db.Column(db.Float(10))
width = db.Column(db.Float(10))
area = db.Column(db.Float(10))
ipc = db.Column(db.Float(10))
testtime = db.Column(db.DateTime(), default=datetime.utcnow)
note = db.Column(db.String(250))
img = db.Column(db.String(250),default="default.jpg",server_default="default.jpg")
def __repr__(self):
return '<Product {}>'.format(self.name)
def to_dict(self):
data = {
'id': self.id,
'name': self.name,
'length': self.length,
'width': self.width,
'area': self.area,
'ipc': self.ipc,
'testtime':self.testtime,
'note':self.note,
'img':self.img,
# 'author': self.author.to_dict(),
'_links': {
'self': url_for('api.get_product', id=self.id),
'frame':url_for('api.get_frame', filename=self.img)
}
}
return data
def from_dict(self, data):
for field in ['name', 'length', 'width','area','ipc','note','img']:
if field in data:
setattr(self, field, data[field])
# class Setting(db.Model):
# length = db.Column(db.Float(10))
# active = db.Column(db.Boolean(0))
#
class Basedata(PaginatedAPIMixin,db.Model):
__tablename__ = 'basedata'
id = db.Column(db.Integer, primary_key=True)
device_model = db.Column(db.String(250), unique=True, nullable=False)
sn = db.Column(db.Float(10))
calibration_date = db.Column(db.DateTime(), default=datetime.utcnow)
active = db.Column(db.Boolean(0))
# area = db.Column(db.Float(10))
# ipc = db.Column(db.Float(10))
# testtime = db.Column(db.DateTime(), default=datetime.utcnow)
# note = db.Column(db.String(250))
#
def __repr__(self):
return '<Basedata {}>'.format(self.device_model)
def to_dict(self):
data = {
'id': self.id,
'device_model': self.device_model,
'sn': self.sn,
'calibration_date': self.calibration_date,
'active':self.active,
# 'author': self.author.to_dict(),
'_links': {
'self': url_for('api.get_basedata', id=self.id)
}
}
return data
def from_dict(self, data):
for field in ['device_model', 'sn', 'calibration_date','active']:
if field in data:
setattr(self, field, data[field])
class Testdata(PaginatedAPIMixin,db.Model):
__tablename__ = 'testdata'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(30))
product_id = db.Column(db.Integer,db.ForeignKey('products.id'))
test_type = db.Column(db.String(30))
test_result = db.Column(db.String(30))
testtime = db.Column(db.DateTime(), default=datetime.utcnow)
continue_times = db.Column(db.Float(10)) # 测试持续时间
datas = db.Column(db.String(30))
def __repr__(self):
return '<Testdata {}>'.format(self.name)
def session_commit():
try:
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
reason = str(e)
return reason