Revize b6f0e019
Přidáno uživatelem Matěj Zeman před asi 2 roky(ů)
server/sql_app/crud.py | ||
---|---|---|
1 | 1 |
from datetime import datetime, date |
2 | 2 |
|
3 | 3 |
from sqlalchemy.orm import Session |
4 |
from sqlalchemy import and_ |
|
4 |
from sqlalchemy import and_, desc
|
|
5 | 5 |
from . import models, schemas |
6 | 6 |
|
7 | 7 |
|
... | ... | |
15 | 15 |
|
16 | 16 |
def find_device(db: Session, device: schemas.DeviceBase): |
17 | 17 |
return db.query(models.Device).filter(and_(models.Device.product_id == device.product_id, |
18 |
models.Device.vendor_id == device.vendor_id, |
|
19 |
models.Device.serial_number == device.serial_number)).first() |
|
18 |
models.Device.vendor_id == device.vendor_id,
|
|
19 |
models.Device.serial_number == device.serial_number)).first()
|
|
20 | 20 |
|
21 | 21 |
|
22 | 22 |
def create_device(db: Session, device: schemas.DeviceBase): |
... | ... | |
36 | 36 |
return db.query(models.License).offset(skip).limit(limit).all() |
37 | 37 |
|
38 | 38 |
|
39 |
def find_license(db: Session, name: str): |
|
40 |
return db.query(models.License).filter(models.License.name == name).first() |
|
41 |
|
|
42 |
|
|
39 | 43 |
def create_license(db: Session, name: str, expdate: date): |
40 | 44 |
db_license = models.License(name=name, expiration_date=expdate) |
41 | 45 |
db.add(db_license) |
... | ... | |
44 | 48 |
return db_license |
45 | 49 |
|
46 | 50 |
|
47 |
def create_device_license(db: Session, device_license: schemas.DeviceLicenseBase): |
|
48 |
db_device_license = models.DeviceLicense(device_id=device_license.device_id, license_id=device_license.license_id, |
|
49 |
assigned_datetime=device_license.assigned_datetime) |
|
51 |
def get_license_devices(db: Session, license_id: int): |
|
52 |
return db.query(models.DeviceLicense).filter(models.DeviceLicense.license_id == license_id).all() |
|
53 |
|
|
54 |
|
|
55 |
def create_device_license(db: Session, device: int, license: int, time: datetime): |
|
56 |
db_device_license = models.DeviceLicense(device_id=device, license_id=license, |
|
57 |
assigned_datetime=time) |
|
50 | 58 |
db.add(db_device_license) |
51 | 59 |
db.commit() |
52 | 60 |
db.refresh(db_device_license) |
53 | 61 |
return db_device_license |
54 | 62 |
|
55 | 63 |
|
64 |
def find_pc_by_username(db: Session, name: str): |
|
65 |
return db.query(models.PC).filter(models.PC.username == name).first() |
|
66 |
|
|
67 |
|
|
56 | 68 |
def get_pc(db: Session, pc_id: int): |
57 | 69 |
return db.query(models.PC).filter(models.PC.id == pc_id).first() |
58 | 70 |
|
59 | 71 |
|
72 |
def update_pc(db: Session, pc_id: int, team: str): |
|
73 |
old_pc = get_pc(db, pc_id) |
|
74 |
team = get_team(db, int(team)) |
|
75 |
new = {'id': old_pc.id, 'username': old_pc.username, 'hostname': old_pc.hostname, 'assigned': True, |
|
76 |
'team_id': team.id} |
|
77 |
for key, value in new.items(): |
|
78 |
setattr(old_pc, key, value) |
|
79 |
db.commit() |
|
80 |
db.refresh(old_pc) |
|
81 |
return old_pc |
|
82 |
|
|
83 |
|
|
60 | 84 |
def get_pcs(db: Session, skip: int = 0, limit: int = 100): |
61 | 85 |
return db.query(models.PC).offset(skip).limit(limit).all() |
62 | 86 |
|
63 | 87 |
|
64 | 88 |
def find_pc(db: Session, username: str, hostname: str): |
65 | 89 |
return db.query(models.PC).filter(and_(models.PC.username == username, |
66 |
models.PC.hostname == hostname)).first() |
|
90 |
models.PC.hostname == hostname)).first() |
|
91 |
|
|
92 |
|
|
93 |
def find_pc_by_name(db: Session, username: str): |
|
94 |
return db.query(models.PC).filter(models.PC.username == username).first() |
|
95 |
|
|
96 |
|
|
97 |
def find_pc_by_name_all(db: Session, username: str): |
|
98 |
return db.query(models.PC).filter(models.PC.username == username).offset(0).limit(100).all() |
|
67 | 99 |
|
68 | 100 |
|
69 | 101 |
def find_pcs(db: Session, pcs: []): |
70 | 102 |
return db.query(models.PC).filter(models.PC.id.in_(pcs)).all() |
71 | 103 |
|
72 | 104 |
|
105 |
def get_pcs_by_team(db: Session, team_id: int): |
|
106 |
return db.query(models.PC).filter(models.PC.team_id == team_id).all() |
|
107 |
|
|
108 |
|
|
73 | 109 |
def create_pc(db: Session, user: str, host: str): |
74 | 110 |
db_pc = models.PC(username=user, hostname=host, assigned=False) |
75 | 111 |
db.add(db_pc) |
... | ... | |
86 | 122 |
return db.query(models.Team).offset(skip).limit(limit).all() |
87 | 123 |
|
88 | 124 |
|
125 |
def find_team(db: Session, name: str): |
|
126 |
return db.query(models.Team).filter(models.Team.name == name).first() |
|
127 |
|
|
128 |
|
|
89 | 129 |
def create_team(db: Session, name: str): |
90 |
db_team = models.PC(name=name)
|
|
130 |
db_team = models.Team(name=name)
|
|
91 | 131 |
db.add(db_team) |
92 | 132 |
db.commit() |
93 | 133 |
db.refresh(db_team) |
... | ... | |
95 | 135 |
|
96 | 136 |
|
97 | 137 |
def get_logs(db: Session, skip: int = 0, limit: int = 100): |
98 |
return db.query(models.USBLog).offset(skip).limit(limit).all() |
|
138 |
return db.query(models.USBLog).order_by(desc(models.USBLog.timestamp)).offset(skip).limit(limit).all()
|
|
99 | 139 |
|
100 | 140 |
|
101 | 141 |
def get_log(db: Session, device_id: int, skip: int = 0, limit: int = 100): |
102 | 142 |
return db.query(models.USBLog).filter(models.USBLog.device_id == device_id).offset(skip).limit(limit).all() |
103 | 143 |
|
104 | 144 |
|
145 |
def find_filtered_logs(db: Session, logs: []): |
|
146 |
return db.query(models.USBLog).filter(models.USBLog.id.in_(logs)).order_by(desc(models.USBLog.timestamp)).all() |
|
147 |
|
|
148 |
|
|
149 |
def get_filtered_logs(db: Session, pc: str, tema: str, lic: str): |
|
150 |
execute_string = "SELECT * FROM usb_logs AS logs" |
|
151 |
if pc != "all": |
|
152 |
pcs = find_pc_by_username(db, pc) |
|
153 |
execute_string += " WHERE logs.pc_id = " + str(pcs.id) |
|
154 |
if tema != "all": |
|
155 |
team = find_team(db, tema) |
|
156 |
pcs = get_pcs_by_team(db, team.id) |
|
157 |
pc_ids = "(" |
|
158 |
for p in pcs: |
|
159 |
pc_ids += str(p.id) + ", " |
|
160 |
def_pc_ids = pc_ids[:-2] + ")" |
|
161 |
if pc != "all": |
|
162 |
execute_string += " AND logs.pc_id IN " + def_pc_ids |
|
163 |
else: |
|
164 |
execute_string += " WHERE logs.pc_id IN " + def_pc_ids |
|
165 |
if lic != "all": |
|
166 |
license = find_license(db, lic) |
|
167 |
device_licenses = get_license_devices(db, license.id) |
|
168 |
dev_ids = "(" |
|
169 |
for dev in device_licenses: |
|
170 |
dev_ids += str(dev.device_id) + ", " |
|
171 |
defin_ids = dev_ids[:-2] + ")" |
|
172 |
if pc != "all" or tema != "all": |
|
173 |
execute_string += " AND logs.device_id IN " + defin_ids |
|
174 |
else: |
|
175 |
execute_string += " WHERE logs.device_id IN " + defin_ids |
|
176 |
|
|
177 |
result = db.execute(execute_string) |
|
178 |
return result |
|
179 |
|
|
180 |
|
|
105 | 181 |
def create_device_logs(db: Session, item: schemas.USBTempBase, dev_id: int, pc_id: int, date: datetime): |
106 | 182 |
db_log = models.USBLog(pc_id=pc_id, timestamp=date, status=item.status, device_id=dev_id) |
107 | 183 |
db.add(db_log) |
Také k dispozici: Unified diff
re #9575 Filtering Over logs and devices.