Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 58051326

Přidáno uživatelem David Friesecký před více než 3 roky(ů)

Re #8578 - Implemented new functions
- set_certificate_revoked, clear_certificate_revocation, get_all_revoked_by, get_all_issued_by
- update returns of all functions

Zobrazit rozdíly:

src/dao/certificate_repository.py
34 34
                   f"{COL_VALID_FROM},"
35 35
                   f"{COL_VALID_TO},"
36 36
                   f"{COL_PEM_DATA},"
37
                   f"{COL_REVOCATION_DATE},"
38
                   f"{COL_REVOCATION_REASON}"
37 39
                   f"{COL_PRIVATE_KEY_ID},"
38 40
                   f"{COL_TYPE_ID},"
39 41
                   f"{COL_PARENT_ID})"
......
42 44
                      certificate.valid_from,
43 45
                      certificate.valid_to,
44 46
                      certificate.pem_data,
47
                      certificate.revocation_date,
48
                      certificate.revocation_reason,
45 49
                      certificate.private_key_id,
46 50
                      certificate.type_id,
47 51
                      certificate.parent_id]
......
65 69
                        self.cursor.execute(sql, values)
66 70
                        self.connection.commit()
67 71
        except Error as e:
68
            print(e)
69
            return None
72
            return e
70 73

  
71 74
        return last_id
72 75

  
......
103 106
                                                   certificate_row[2],
104 107
                                                   certificate_row[3],
105 108
                                                   certificate_row[4],
106
                                                   certificate_row[5],
107
                                                   certificate_row[6],
108 109
                                                   certificate_row[7],
109
                                                   usage_dict)
110
                                                   certificate_row[8],
111
                                                   certificate_row[9],
112
                                                   usage_dict,
113
                                                   certificate_row[5],
114
                                                   certificate_row[6])
110 115
        except Error as e:
111
            print(e)
112
            return None
116
            return e
113 117

  
114 118
        if len(certificate_row) > 0:
115 119
            return certificate
116 120
        else:
117 121
            return None
118 122

  
119
    def read_all(self, filter_type: int = None) -> List[Certificate]:
123
    def read_all(self, filter_type: int = None):
120 124
        """
121 125
        Reads (selects) all certificates (with type).
122 126

  
......
154 158
                                                certificate_row[2],
155 159
                                                certificate_row[3],
156 160
                                                certificate_row[4],
157
                                                certificate_row[5],
158
                                                certificate_row[6],
159 161
                                                certificate_row[7],
160
                                                usage_dict))
162
                                                certificate_row[8],
163
                                                certificate_row[9],
164
                                                usage_dict,
165
                                                certificate_row[5],
166
                                                certificate_row[6]))
161 167
        except Error as e:
162
            print(e)
163
            return None
168
            return e
164 169

  
165
        return certificates
170
        if len(certificates) > 0:
171
            return certificates
172
        else:
173
            return None
166 174

  
167
    def update(self, certificate_id: int, certificate: Certificate) -> bool:
175
    def update(self, certificate_id: int, certificate: Certificate):
168 176
        """
169 177
        Updates a certificate.
170 178
        If the parameter of certificate (Certificate object) is not to be changed,
......
182 190
                   f"{COL_VALID_FROM} = ?, "
183 191
                   f"{COL_VALID_TO} = ?, "
184 192
                   f"{COL_PEM_DATA} = ?, "
193
                   f"{COL_REVOCATION_DATE} = ?, "
194
                   f"{COL_REVOCATION_REASON} = ?, "
185 195
                   f"{COL_PRIVATE_KEY_ID} = ?, "
186 196
                   f"{COL_TYPE_ID} = ?, "
187 197
                   f"{COL_PARENT_ID} = ? "
......
190 200
                      certificate.valid_from,
191 201
                      certificate.valid_to,
192 202
                      certificate.pem_data,
203
                      certificate.revocation_date,
204
                      certificate.revocation_reason,
193 205
                      certificate.private_key_id,
194 206
                      certificate.type_id,
195 207
                      certificate.parent_id,
......
214 226
                    self.cursor.execute(sql, values)
215 227
                    self.connection.commit()
216 228
        except Error as e:
217
            print(e)
218
            return False
229
            return e
219 230

  
220 231
        return True
221 232

  
222
    def delete(self, certificate_id: int) -> bool:
233
    def delete(self, certificate_id: int):
223 234
        """
224 235
        Deletes a certificate
225 236

  
......
235 246
            self.cursor.execute(sql, values)
236 247
            self.connection.commit()
237 248
        except Error as e:
238
            print(e)
239
            return False
249
            return e
240 250

  
241 251
        return self.cursor.rowcount > 0
252

  
253
    def set_certificate_revoked(
254
            self, certificate_id: int, revocation_date: str, revocation_reason: str = "unspecified"):
255
        """
256
        Revoke a certificate
257

  
258
        :param certificate_id: ID of specific certificate
259
        :param revocation_date: Date, when the certificate is revoked
260
        :param revocation_reason: Reason of the revocation
261

  
262
        :return: the result of whether the revocation was successful
263
        """
264

  
265
        try:
266
            sql = (f"UPDATE {TAB_CERTIFICATES} "
267
                   f"SET {COL_REVOCATION_DATE} = ?, "
268
                   f"{COL_REVOCATION_REASON} = ?, "
269
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
270
            values = [revocation_date,
271
                      revocation_reason,
272
                      certificate_id]
273
            self.cursor.execute(sql, values)
274
            self.connection.commit()
275
        except Error as e:
276
            return e
277

  
278
        return True
279

  
280
    def clear_certificate_revocation(self, certificate_id: int):
281
        """
282
        Clear revocation of a certificate
283

  
284
        :param certificate_id: ID of specific certificate
285

  
286
        :return: the result of whether the clear revocation was successful
287
        """
288

  
289
        try:
290
            sql = (f"UPDATE {TAB_CERTIFICATES} "
291
                   f"SET {COL_REVOCATION_DATE} = '', "
292
                   f"{COL_REVOCATION_REASON} = '', "
293
                   f"WHERE {COL_ID} = ?")
294
            values = [certificate_id]
295
            self.cursor.execute(sql, values)
296
            self.connection.commit()
297
        except Error as e:
298
            return e
299

  
300
        return True
301

  
302
    def get_all_revoked_by(self, certificate_id: int):
303
        """
304
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
305

  
306
        :param certificate_id: ID of specific certificate
307

  
308
        :return:
309
            # list of the certificates
310
            # None if the list is empty
311
            # sqlite3.Error if an exception is thrown
312
        """
313

  
314
        try:
315
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
316
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
317
            values = [certificate_id]
318
            self.cursor.execute(sql, values)
319
            certificate_rows = self.cursor.fetchall()
320

  
321
            certificates: List[Certificate] = []
322
            for certificate_row in certificate_rows:
323
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
324
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
325
                values = [certificate_row[0]]
326
                self.cursor.execute(sql, values)
327
                usage_rows = self.cursor.fetchall()
328

  
329
                usage_dict: Dict[int, bool] = {}
330
                for usage_row in usage_rows:
331
                    usage_dict[usage_row[2]] = True
332

  
333
                certificates.append(Certificate(certificate_row[0],
334
                                                certificate_row[1],
335
                                                certificate_row[2],
336
                                                certificate_row[3],
337
                                                certificate_row[4],
338
                                                certificate_row[7],
339
                                                certificate_row[8],
340
                                                certificate_row[9],
341
                                                usage_dict,
342
                                                certificate_row[5],
343
                                                certificate_row[6]))
344
        except Error as e:
345
            return e
346

  
347
        if len(certificates) > 0:
348
            return certificates
349
        else:
350
            return None
351

  
352
    def get_all_issued_by(self, certificate_id: int):
353
        """
354
        Get list of the certificates that are direct descendants of the certificate with the ID
355

  
356
        :param certificate_id: ID of specific certificate
357

  
358
        :return:
359
            # list of the certificates
360
            # None if the list is empty
361
            # sqlite3.Error if an exception is thrown
362
        """
363

  
364
        try:
365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
366
                   f"WHERE {COL_PARENT_ID} = ?")
367
            values = [certificate_id]
368
            self.cursor.execute(sql, values)
369
            certificate_rows = self.cursor.fetchall()
370

  
371
            certificates: List[Certificate] = []
372
            for certificate_row in certificate_rows:
373
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
374
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
375
                values = [certificate_row[0]]
376
                self.cursor.execute(sql, values)
377
                usage_rows = self.cursor.fetchall()
378

  
379
                usage_dict: Dict[int, bool] = {}
380
                for usage_row in usage_rows:
381
                    usage_dict[usage_row[2]] = True
382

  
383
                certificates.append(Certificate(certificate_row[0],
384
                                                certificate_row[1],
385
                                                certificate_row[2],
386
                                                certificate_row[3],
387
                                                certificate_row[4],
388
                                                certificate_row[7],
389
                                                certificate_row[8],
390
                                                certificate_row[9],
391
                                                usage_dict,
392
                                                certificate_row[5],
393
                                                certificate_row[6]))
394
        except Error as e:
395
            return e
396

  
397
        if len(certificates) > 0:
398
            return certificates
399
        else:
400
            return None

Také k dispozici: Unified diff