# --- recognizer.py ---
1import asyncio
2import logging
3import time
4from typing import List, Dict, Tuple, Optional, Any
5
6import numpy as np
7
8from database.face import FaceDatabaseManager
9from .session_utils import init_face_recognizer_session
10
22class FaceRecognizer:
23 def __init__(
24 self,
25 model_path: str,
26 input_size: Tuple[int, int],
27 similarity_threshold: float,
28 providers: Optional[List[str]],
29 ):
30
94 async def _extract_embeddings(
95 self, image: np.ndarray, face_data_list: List[Dict]
96 ) -> List[np.ndarray]:
97
110 aligned_faces = align_faces_batch(image, face_data_list, self.input_size)
115 batch_input = preprocess_batch(aligned_faces, self.INPUT_MEAN, self.INPUT_STD)
116
121 outputs = await loop.run_in_executor(
122 None, lambda: self.session.run(None, feeds)
123 )
126 return normalize_embeddings_batch(embeddings)
127
197 async def recognize_face(
198 self,
199 image: np.ndarray,
200 landmarks_5: List,
201 allowed_person_ids: Optional[List[str]] = None,
202 ) -> Dict:
206 embeddings = await self._extract_embeddings(image, face_data)
217 person_id, similarity = await self._find_best_match(
218 embedding, allowed_person_ids, organization_id
219 )
# --- detector.py ---
1import numpy as np
2import logging as log
3from typing import List
4from .session_utils import init_face_detector_session
5from .postprocess import process_detection
6
10class FaceDetector:
11 def __init__(
12 self,
13 model_path: str,
14 input_size: tuple,
15 conf_threshold: float,
16 nms_threshold: float,
17 top_k: int,
18 min_face_size: int,
19 ):
20
28 self.detector = init_face_detector_session(
29 model_path, input_size, conf_threshold, nms_threshold, top_k
30 )
31
36 def detect_faces(
37 self, image: np.ndarray, enable_liveness: bool = False
38 ) -> List[dict]:
39
43 orig_height, orig_width = image.shape[:2]
45 self.detector.setInputSize((orig_width, orig_height))
46 faces = self.detector.detect(image)[1]
47
55 for face in faces:
56 landmarks_5 = face[4:14].reshape(5, 2)
57 detection = process_detection(
58 face, min_size, landmarks_5,
59 orig_width, orig_height, margin,
60 )
# --- cipher.py ---
1"""
2AES-256-GCM encryption for .facenox backup files.
3
4Blob layout: MAGIC(6) | SALT(16) | IV(12) | TAG(16) | CIPHERTEXT
5Key derivation: PBKDF2-HMAC-SHA256, 480k iterations.
6"""
7
13import os
14import platform
15import hashlib
16
19from cryptography.hazmat.primitives.ciphers.aead import AESGCM
20
24SALT_SIZE = 16
25IV_SIZE = 12
26KEY_SIZE = 32
27PBKDF2_ITERS = 480_000
28FACENOX_MAGIC = b"FACENOX\x00\x01"
29
31def _derive_key(password: str, salt: bytes) -> bytes:
32 return hashlib.pbkdf2_hmac(
33 "sha256", password.encode(), salt, PBKDF2_ITERS, dklen=KEY_SIZE
34 )
35
37def encrypt_backup(plaintext: bytes, password: str) -> bytes:
38 salt = os.urandom(SALT_SIZE)
39 iv = os.urandom(IV_SIZE)
40 encrypted = AESGCM(_derive_key(password, salt)).encrypt(iv, plaintext, None)
41 return FACENOX_MAGIC + salt + iv + encrypted
42
192def encrypt_local_data(plaintext: bytes) -> bytes:
193 key = get_machine_key()
194 iv = os.urandom(IV_SIZE)
195 encrypted = AESGCM(key).encrypt(iv, plaintext, None)
196 return iv + encrypted
# --- liveness_detector.py ---
1import cv2
2import numpy as np
3from typing import List, Dict, Optional
4from .session_utils import init_onnx_session
5from .preprocess import crop, extract_face_crops_from_detections
6from .postprocess import validate_detection, run_batch_inference
7
17def probability_to_logit_threshold(p: float) -> float:
18 p = max(1e-6, min(1 - 1e-6, p))
19 return np.log(p / (1 - p))
20
22class LivenessDetector:
23 def __init__(self, model_path, model_img_size, confidence_threshold, bbox_inc):
30 self.model_img_size = model_img_size
33 self.logit_threshold = probability_to_logit_threshold(confidence_threshold)
35 self.ort_session, self.input_name = self._init_session_(model_path)
36 self.track_memory = TrackLivenessMemory()
37
48 def detect_faces(
49 self,
50 image: np.ndarray,
51 face_detections: List[Dict],
52 tracking_namespace: Optional[str] = None,
53 ) -> List[Dict]:
54
59 rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
99 raw_logits = run_batch_inference(
100 face_crops, self.ort_session, self.input_name, self.model_img_size
101 )
102
106 results = assemble_liveness_results(
107 valid_detections, raw_logits, self.logit_threshold, results
108 )
# --- recognizer.py ---
1import asyncio
2import logging
3import time
4from typing import List, Dict, Tuple, Optional, Any
5
6import numpy as np
7
8from database.face import FaceDatabaseManager
9from .session_utils import init_face_recognizer_session
10
22class FaceRecognizer:
23 def __init__(
24 self,
25 model_path: str,
26 input_size: Tuple[int, int],
27 similarity_threshold: float,
28 providers: Optional[List[str]],
29 ):
30
94 async def _extract_embeddings(
95 self, image: np.ndarray, face_data_list: List[Dict]
96 ) -> List[np.ndarray]:
97
110 aligned_faces = align_faces_batch(image, face_data_list, self.input_size)
115 batch_input = preprocess_batch(aligned_faces, self.INPUT_MEAN, self.INPUT_STD)
116
121 outputs = await loop.run_in_executor(
122 None, lambda: self.session.run(None, feeds)
123 )
126 return normalize_embeddings_batch(embeddings)
127
197 async def recognize_face(
198 self,
199 image: np.ndarray,
200 landmarks_5: List,
201 allowed_person_ids: Optional[List[str]] = None,
202 ) -> Dict:
206 embeddings = await self._extract_embeddings(image, face_data)
217 person_id, similarity = await self._find_best_match(
218 embedding, allowed_person_ids, organization_id
219 )
# --- detector.py ---
1import numpy as np
2import logging as log
3from typing import List
4from .session_utils import init_face_detector_session
5from .postprocess import process_detection
6
10class FaceDetector:
11 def __init__(
12 self,
13 model_path: str,
14 input_size: tuple,
15 conf_threshold: float,
16 nms_threshold: float,
17 top_k: int,
18 min_face_size: int,
19 ):
20
28 self.detector = init_face_detector_session(
29 model_path, input_size, conf_threshold, nms_threshold, top_k
30 )
31
36 def detect_faces(
37 self, image: np.ndarray, enable_liveness: bool = False
38 ) -> List[dict]:
39
43 orig_height, orig_width = image.shape[:2]
45 self.detector.setInputSize((orig_width, orig_height))
46 faces = self.detector.detect(image)[1]
47
55 for face in faces:
56 landmarks_5 = face[4:14].reshape(5, 2)
57 detection = process_detection(
58 face, min_size, landmarks_5,
59 orig_width, orig_height, margin,
60 )
# --- cipher.py ---
1"""
2AES-256-GCM encryption for .facenox backup files.
3
4Blob layout: MAGIC(6) | SALT(16) | IV(12) | TAG(16) | CIPHERTEXT
5Key derivation: PBKDF2-HMAC-SHA256, 480k iterations.
6"""
7
13import os
14import platform
15import hashlib
16
19from cryptography.hazmat.primitives.ciphers.aead import AESGCM
20
24SALT_SIZE = 16
25IV_SIZE = 12
26KEY_SIZE = 32
27PBKDF2_ITERS = 480_000
28FACENOX_MAGIC = b"FACENOX\x00\x01"
29
31def _derive_key(password: str, salt: bytes) -> bytes:
32 return hashlib.pbkdf2_hmac(
33 "sha256", password.encode(), salt, PBKDF2_ITERS, dklen=KEY_SIZE
34 )
35
37def encrypt_backup(plaintext: bytes, password: str) -> bytes:
38 salt = os.urandom(SALT_SIZE)
39 iv = os.urandom(IV_SIZE)
40 encrypted = AESGCM(_derive_key(password, salt)).encrypt(iv, plaintext, None)
41 return FACENOX_MAGIC + salt + iv + encrypted
42
192def encrypt_local_data(plaintext: bytes) -> bytes:
193 key = get_machine_key()
194 iv = os.urandom(IV_SIZE)
195 encrypted = AESGCM(key).encrypt(iv, plaintext, None)
196 return iv + encrypted
# --- liveness_detector.py ---
1import cv2
2import numpy as np
3from typing import List, Dict, Optional
4from .session_utils import init_onnx_session
5from .preprocess import crop, extract_face_crops_from_detections
6from .postprocess import validate_detection, run_batch_inference
7
17def probability_to_logit_threshold(p: float) -> float:
18 p = max(1e-6, min(1 - 1e-6, p))
19 return np.log(p / (1 - p))
20
22class LivenessDetector:
23 def __init__(self, model_path, model_img_size, confidence_threshold, bbox_inc):
30 self.model_img_size = model_img_size
33 self.logit_threshold = probability_to_logit_threshold(confidence_threshold)
35 self.ort_session, self.input_name = self._init_session_(model_path)
36 self.track_memory = TrackLivenessMemory()
37
48 def detect_faces(
49 self,
50 image: np.ndarray,
51 face_detections: List[Dict],
52 tracking_namespace: Optional[str] = None,
53 ) -> List[Dict]:
54
59 rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
99 raw_logits = run_batch_inference(
100 face_crops, self.ort_session, self.input_name, self.model_img_size
101 )
102
106 results = assemble_liveness_results(
107 valid_detections, raw_logits, self.logit_threshold, results
108 )