Revize 58051326
Přidáno uživatelem David Friesecký před více než 3 roky(ů)
src/dao/certificate_repository.py | ||
---|---|---|
34 | 34 |
f"{COL_VALID_FROM}," |
35 | 35 |
f"{COL_VALID_TO}," |
36 | 36 |
f"{COL_PEM_DATA}," |
37 |
f"{COL_REVOCATION_DATE}," |
|
38 |
f"{COL_REVOCATION_REASON}" |
|
37 | 39 |
f"{COL_PRIVATE_KEY_ID}," |
38 | 40 |
f"{COL_TYPE_ID}," |
39 | 41 |
f"{COL_PARENT_ID})" |
... | ... | |
42 | 44 |
certificate.valid_from, |
43 | 45 |
certificate.valid_to, |
44 | 46 |
certificate.pem_data, |
47 |
certificate.revocation_date, |
|
48 |
certificate.revocation_reason, |
|
45 | 49 |
certificate.private_key_id, |
46 | 50 |
certificate.type_id, |
47 | 51 |
certificate.parent_id] |
... | ... | |
65 | 69 |
self.cursor.execute(sql, values) |
66 | 70 |
self.connection.commit() |
67 | 71 |
except Error as e: |
68 |
print(e) |
|
69 |
return None |
|
72 |
return e |
|
70 | 73 |
|
71 | 74 |
return last_id |
72 | 75 |
|
... | ... | |
103 | 106 |
certificate_row[2], |
104 | 107 |
certificate_row[3], |
105 | 108 |
certificate_row[4], |
106 |
certificate_row[5], |
|
107 |
certificate_row[6], |
|
108 | 109 |
certificate_row[7], |
109 |
usage_dict) |
|
110 |
certificate_row[8], |
|
111 |
certificate_row[9], |
|
112 |
usage_dict, |
|
113 |
certificate_row[5], |
|
114 |
certificate_row[6]) |
|
110 | 115 |
except Error as e: |
111 |
print(e) |
|
112 |
return None |
|
116 |
return e |
|
113 | 117 |
|
114 | 118 |
if len(certificate_row) > 0: |
115 | 119 |
return certificate |
116 | 120 |
else: |
117 | 121 |
return None |
118 | 122 |
|
119 |
def read_all(self, filter_type: int = None) -> List[Certificate]:
|
|
123 |
def read_all(self, filter_type: int = None): |
|
120 | 124 |
""" |
121 | 125 |
Reads (selects) all certificates (with type). |
122 | 126 |
|
... | ... | |
154 | 158 |
certificate_row[2], |
155 | 159 |
certificate_row[3], |
156 | 160 |
certificate_row[4], |
157 |
certificate_row[5], |
|
158 |
certificate_row[6], |
|
159 | 161 |
certificate_row[7], |
160 |
usage_dict)) |
|
162 |
certificate_row[8], |
|
163 |
certificate_row[9], |
|
164 |
usage_dict, |
|
165 |
certificate_row[5], |
|
166 |
certificate_row[6])) |
|
161 | 167 |
except Error as e: |
162 |
print(e) |
|
163 |
return None |
|
168 |
return e |
|
164 | 169 |
|
165 |
return certificates |
|
170 |
if len(certificates) > 0: |
|
171 |
return certificates |
|
172 |
else: |
|
173 |
return None |
|
166 | 174 |
|
167 |
def update(self, certificate_id: int, certificate: Certificate) -> bool:
|
|
175 |
def update(self, certificate_id: int, certificate: Certificate): |
|
168 | 176 |
""" |
169 | 177 |
Updates a certificate. |
170 | 178 |
If the parameter of certificate (Certificate object) is not to be changed, |
... | ... | |
182 | 190 |
f"{COL_VALID_FROM} = ?, " |
183 | 191 |
f"{COL_VALID_TO} = ?, " |
184 | 192 |
f"{COL_PEM_DATA} = ?, " |
193 |
f"{COL_REVOCATION_DATE} = ?, " |
|
194 |
f"{COL_REVOCATION_REASON} = ?, " |
|
185 | 195 |
f"{COL_PRIVATE_KEY_ID} = ?, " |
186 | 196 |
f"{COL_TYPE_ID} = ?, " |
187 | 197 |
f"{COL_PARENT_ID} = ? " |
... | ... | |
190 | 200 |
certificate.valid_from, |
191 | 201 |
certificate.valid_to, |
192 | 202 |
certificate.pem_data, |
203 |
certificate.revocation_date, |
|
204 |
certificate.revocation_reason, |
|
193 | 205 |
certificate.private_key_id, |
194 | 206 |
certificate.type_id, |
195 | 207 |
certificate.parent_id, |
... | ... | |
214 | 226 |
self.cursor.execute(sql, values) |
215 | 227 |
self.connection.commit() |
216 | 228 |
except Error as e: |
217 |
print(e) |
|
218 |
return False |
|
229 |
return e |
|
219 | 230 |
|
220 | 231 |
return True |
221 | 232 |
|
222 |
def delete(self, certificate_id: int) -> bool:
|
|
233 |
def delete(self, certificate_id: int): |
|
223 | 234 |
""" |
224 | 235 |
Deletes a certificate |
225 | 236 |
|
... | ... | |
235 | 246 |
self.cursor.execute(sql, values) |
236 | 247 |
self.connection.commit() |
237 | 248 |
except Error as e: |
238 |
print(e) |
|
239 |
return False |
|
249 |
return e |
|
240 | 250 |
|
241 | 251 |
return self.cursor.rowcount > 0 |
252 |
|
|
253 |
def set_certificate_revoked( |
|
254 |
self, certificate_id: int, revocation_date: str, revocation_reason: str = "unspecified"): |
|
255 |
""" |
|
256 |
Revoke a certificate |
|
257 |
|
|
258 |
:param certificate_id: ID of specific certificate |
|
259 |
:param revocation_date: Date, when the certificate is revoked |
|
260 |
:param revocation_reason: Reason of the revocation |
|
261 |
|
|
262 |
:return: the result of whether the revocation was successful |
|
263 |
""" |
|
264 |
|
|
265 |
try: |
|
266 |
sql = (f"UPDATE {TAB_CERTIFICATES} " |
|
267 |
f"SET {COL_REVOCATION_DATE} = ?, " |
|
268 |
f"{COL_REVOCATION_REASON} = ?, " |
|
269 |
f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')") |
|
270 |
values = [revocation_date, |
|
271 |
revocation_reason, |
|
272 |
certificate_id] |
|
273 |
self.cursor.execute(sql, values) |
|
274 |
self.connection.commit() |
|
275 |
except Error as e: |
|
276 |
return e |
|
277 |
|
|
278 |
return True |
|
279 |
|
|
280 |
def clear_certificate_revocation(self, certificate_id: int): |
|
281 |
""" |
|
282 |
Clear revocation of a certificate |
|
283 |
|
|
284 |
:param certificate_id: ID of specific certificate |
|
285 |
|
|
286 |
:return: the result of whether the clear revocation was successful |
|
287 |
""" |
|
288 |
|
|
289 |
try: |
|
290 |
sql = (f"UPDATE {TAB_CERTIFICATES} " |
|
291 |
f"SET {COL_REVOCATION_DATE} = '', " |
|
292 |
f"{COL_REVOCATION_REASON} = '', " |
|
293 |
f"WHERE {COL_ID} = ?") |
|
294 |
values = [certificate_id] |
|
295 |
self.cursor.execute(sql, values) |
|
296 |
self.connection.commit() |
|
297 |
except Error as e: |
|
298 |
return e |
|
299 |
|
|
300 |
return True |
|
301 |
|
|
302 |
def get_all_revoked_by(self, certificate_id: int): |
|
303 |
""" |
|
304 |
Get list of the revoked certificates that are direct descendants of the certificate with the ID |
|
305 |
|
|
306 |
:param certificate_id: ID of specific certificate |
|
307 |
|
|
308 |
:return: |
|
309 |
# list of the certificates |
|
310 |
# None if the list is empty |
|
311 |
# sqlite3.Error if an exception is thrown |
|
312 |
""" |
|
313 |
|
|
314 |
try: |
|
315 |
sql = (f"SELECT * FROM {TAB_CERTIFICATES} " |
|
316 |
f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''") |
|
317 |
values = [certificate_id] |
|
318 |
self.cursor.execute(sql, values) |
|
319 |
certificate_rows = self.cursor.fetchall() |
|
320 |
|
|
321 |
certificates: List[Certificate] = [] |
|
322 |
for certificate_row in certificate_rows: |
|
323 |
sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} " |
|
324 |
f"WHERE {COL_CERTIFICATE_ID} = ?") |
|
325 |
values = [certificate_row[0]] |
|
326 |
self.cursor.execute(sql, values) |
|
327 |
usage_rows = self.cursor.fetchall() |
|
328 |
|
|
329 |
usage_dict: Dict[int, bool] = {} |
|
330 |
for usage_row in usage_rows: |
|
331 |
usage_dict[usage_row[2]] = True |
|
332 |
|
|
333 |
certificates.append(Certificate(certificate_row[0], |
|
334 |
certificate_row[1], |
|
335 |
certificate_row[2], |
|
336 |
certificate_row[3], |
|
337 |
certificate_row[4], |
|
338 |
certificate_row[7], |
|
339 |
certificate_row[8], |
|
340 |
certificate_row[9], |
|
341 |
usage_dict, |
|
342 |
certificate_row[5], |
|
343 |
certificate_row[6])) |
|
344 |
except Error as e: |
|
345 |
return e |
|
346 |
|
|
347 |
if len(certificates) > 0: |
|
348 |
return certificates |
|
349 |
else: |
|
350 |
return None |
|
351 |
|
|
352 |
def get_all_issued_by(self, certificate_id: int): |
|
353 |
""" |
|
354 |
Get list of the certificates that are direct descendants of the certificate with the ID |
|
355 |
|
|
356 |
:param certificate_id: ID of specific certificate |
|
357 |
|
|
358 |
:return: |
|
359 |
# list of the certificates |
|
360 |
# None if the list is empty |
|
361 |
# sqlite3.Error if an exception is thrown |
|
362 |
""" |
|
363 |
|
|
364 |
try: |
|
365 |
sql = (f"SELECT * FROM {TAB_CERTIFICATES} " |
|
366 |
f"WHERE {COL_PARENT_ID} = ?") |
|
367 |
values = [certificate_id] |
|
368 |
self.cursor.execute(sql, values) |
|
369 |
certificate_rows = self.cursor.fetchall() |
|
370 |
|
|
371 |
certificates: List[Certificate] = [] |
|
372 |
for certificate_row in certificate_rows: |
|
373 |
sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} " |
|
374 |
f"WHERE {COL_CERTIFICATE_ID} = ?") |
|
375 |
values = [certificate_row[0]] |
|
376 |
self.cursor.execute(sql, values) |
|
377 |
usage_rows = self.cursor.fetchall() |
|
378 |
|
|
379 |
usage_dict: Dict[int, bool] = {} |
|
380 |
for usage_row in usage_rows: |
|
381 |
usage_dict[usage_row[2]] = True |
|
382 |
|
|
383 |
certificates.append(Certificate(certificate_row[0], |
|
384 |
certificate_row[1], |
|
385 |
certificate_row[2], |
|
386 |
certificate_row[3], |
|
387 |
certificate_row[4], |
|
388 |
certificate_row[7], |
|
389 |
certificate_row[8], |
|
390 |
certificate_row[9], |
|
391 |
usage_dict, |
|
392 |
certificate_row[5], |
|
393 |
certificate_row[6])) |
|
394 |
except Error as e: |
|
395 |
return e |
|
396 |
|
|
397 |
if len(certificates) > 0: |
|
398 |
return certificates |
|
399 |
else: |
|
400 |
return None |
Také k dispozici: Unified diff
Re #8578 - Implemented new functions
- set_certificate_revoked, clear_certificate_revocation, get_all_revoked_by, get_all_issued_by
- update returns of all functions