apply config correctly, adjust defaults

This commit is contained in:
mertalev 2025-06-16 16:24:26 -04:00
parent 22690fa096
commit 585d093baf
No known key found for this signature in database
GPG key ID: DF6ABC77AAD98C95
10 changed files with 43 additions and 35 deletions

View file

@ -57,7 +57,7 @@ class InferenceModel(ABC):
self.load()
if model_kwargs:
self.configure(**model_kwargs)
return self._predict(*inputs, **model_kwargs)
return self._predict(*inputs)
@abstractmethod
def _predict(self, *inputs: Any, **model_kwargs: Any) -> Any: ...

View file

@ -19,7 +19,7 @@ class BaseCLIPTextualEncoder(InferenceModel):
depends = []
identity = (ModelType.TEXTUAL, ModelTask.SEARCH)
def _predict(self, inputs: str, language: str | None = None, **kwargs: Any) -> str:
def _predict(self, inputs: str, language: str | None = None) -> str:
tokens = self.tokenize(inputs, language=language)
res: NDArray[np.float32] = self.session.run(None, tokens)[0][0]
return serialize_np_array(res)

View file

@ -26,7 +26,7 @@ class BaseCLIPVisualEncoder(InferenceModel):
depends = []
identity = (ModelType.VISUAL, ModelTask.SEARCH)
def _predict(self, inputs: Image.Image | bytes, **kwargs: Any) -> str:
def _predict(self, inputs: Image.Image | bytes) -> str:
image = decode_pil(inputs)
res: NDArray[np.float32] = self.session.run(None, self.transform(image))[0][0]
return serialize_np_array(res)

View file

@ -24,7 +24,7 @@ class FaceDetector(InferenceModel):
return session
def _predict(self, inputs: NDArray[np.uint8] | bytes, **kwargs: Any) -> FaceDetectionOutput:
def _predict(self, inputs: NDArray[np.uint8] | bytes) -> FaceDetectionOutput:
inputs = decode_cv2(inputs)
bboxes, landmarks = self._detect(inputs)

View file

@ -44,7 +44,7 @@ class FaceRecognizer(InferenceModel):
return session
def _predict(
self, inputs: NDArray[np.uint8] | bytes | Image.Image, faces: FaceDetectionOutput, **kwargs: Any
self, inputs: NDArray[np.uint8] | bytes | Image.Image, faces: FaceDetectionOutput
) -> FacialRecognitionOutput:
if faces["boxes"].shape[0] == 0:
return []

View file

@ -11,7 +11,8 @@ from rapidocr.utils.typings import ModelType as RapidModelType
from immich_ml.config import log
from immich_ml.models.base import InferenceModel
from immich_ml.models.transforms import decode_cv2
from immich_ml.schemas import ModelSession, ModelTask, ModelType
from immich_ml.schemas import ModelFormat, ModelSession, ModelTask, ModelType
from immich_ml.sessions.ort import OrtSession
from .schemas import OcrOptions, TextDetectionOutput
@ -21,14 +22,14 @@ class TextDetector(InferenceModel):
identity = (ModelType.DETECTION, ModelTask.OCR)
def __init__(self, model_name: str, **model_kwargs: Any) -> None:
super().__init__(model_name, **model_kwargs)
self.max_resolution = 1440
super().__init__(model_name, **model_kwargs, model_format=ModelFormat.ONNX)
self.max_resolution = 736
self.min_score = 0.5
self.score_mode = "fast"
self._empty: TextDetectionOutput = {
"resized": np.empty(0, dtype=np.float32),
"image": np.empty(0, dtype=np.float32),
"boxes": np.empty(0, dtype=np.float32),
"scores": (),
"scores": np.empty(0, dtype=np.float32),
}
def _download(self) -> None:
@ -50,7 +51,8 @@ class TextDetector(InferenceModel):
DownloadFile.run(download_params)
def _load(self) -> ModelSession:
session = self._make_session(self.model_path)
# TODO: support other runtime sessions
session = OrtSession(self.model_path)
self.model = RapidTextDetector(
OcrOptions(
session=session.session,
@ -62,17 +64,23 @@ class TextDetector(InferenceModel):
)
return session
def configure(self, **kwargs: Any) -> None:
self.max_resolution = kwargs.get("maxResolution", self.max_resolution)
self.min_score = kwargs.get("minScore", self.min_score)
self.score_mode = kwargs.get("scoreMode", self.score_mode)
def _predict(self, inputs: bytes | Image.Image, **kwargs: Any) -> TextDetectionOutput:
def _predict(self, inputs: bytes | Image.Image) -> TextDetectionOutput:
results = self.model(decode_cv2(inputs))
if results.boxes is None or results.scores is None or results.img is None:
return self._empty
return {
"resized": results.img,
"image": results.img,
"boxes": np.array(results.boxes, dtype=np.float32),
"scores": np.array(results.scores, dtype=np.float32),
}
def configure(self, **kwargs: Any) -> None:
if (max_resolution := kwargs.get("maxResolution")) is not None:
self.max_resolution = max_resolution
self.model.limit_side_len = max_resolution
if (min_score := kwargs.get("minScore")) is not None:
self.min_score = min_score
self.model.postprocess_op.box_thresh = min_score
if (score_mode := kwargs.get("scoreMode")) is not None:
self.score_mode = score_mode
self.model.postprocess_op.score_mode = score_mode

View file

@ -23,12 +23,12 @@ class TextRecognizer(InferenceModel):
identity = (ModelType.RECOGNITION, ModelTask.OCR)
def __init__(self, model_name: str, **model_kwargs: Any) -> None:
self.min_score = model_kwargs.get("minScore", 0.5)
self.min_score = model_kwargs.get("minScore", 0.9)
self._empty: TextRecognitionOutput = {
"box": np.empty(0, dtype=np.float32),
"boxScore": [],
"boxScore": np.empty(0, dtype=np.float32),
"text": [],
"textScore": [],
"textScore": np.empty(0, dtype=np.float32),
}
super().__init__(model_name, **model_kwargs, model_format=ModelFormat.ONNX)
@ -62,24 +62,20 @@ class TextRecognizer(InferenceModel):
)
return session
def configure(self, **kwargs: Any) -> None:
self.min_score = kwargs.get("minScore", self.min_score)
def _predict(self, _: Image, texts: TextDetectionOutput, **kwargs: Any) -> TextRecognitionOutput:
boxes, resized_img, box_scores = texts["boxes"], texts["resized"], texts["scores"]
def _predict(self, _: Image, texts: TextDetectionOutput) -> TextRecognitionOutput:
boxes, img, box_scores = texts["boxes"], texts["image"], texts["scores"]
if boxes.shape[0] == 0:
return self._empty
rec = self.model(TextRecInput(img=self.get_crop_img_list(resized_img, boxes)))
rec = self.model(TextRecInput(img=self.get_crop_img_list(img, boxes)))
if rec.txts is None:
return self._empty
height, width = resized_img.shape[0:2]
log.info(f"Image shape: width={width}, height={height}")
height, width = img.shape[0:2]
boxes[:, :, 0] /= width
boxes[:, :, 1] /= height
text_scores = np.array(rec.scores)
valid_text_score_idx = text_scores > 0.5
valid_text_score_idx = text_scores > self.min_score
valid_score_idx_list = valid_text_score_idx.tolist()
return {
"box": boxes.reshape(-1, 8)[valid_text_score_idx].reshape(-1),
@ -115,3 +111,6 @@ class TextRecognizer(InferenceModel):
dst_img = np.rot90(dst_img)
imgs.append(dst_img)
return imgs
def configure(self, **kwargs: Any) -> None:
self.min_score = kwargs.get("minScore", self.min_score)

View file

@ -7,16 +7,16 @@ from typing_extensions import TypedDict
class TextDetectionOutput(TypedDict):
resized: npt.NDArray[np.float32]
image: npt.NDArray[np.float32]
boxes: npt.NDArray[np.float32]
scores: npt.NDArray[np.float32]
class TextRecognitionOutput(TypedDict):
box: npt.NDArray[np.float32]
boxScore: Iterable[float]
boxScore: npt.NDArray[np.float32]
text: Iterable[str]
textScore: Iterable[float]
textScore: npt.NDArray[np.float32]
# RapidOCR expects engine_type to be an attribute

View file

@ -15,6 +15,7 @@ from ..config import log, settings
class OrtSession:
session: ort.InferenceSession
def __init__(
self,
model_path: Path | str,

View file

@ -254,8 +254,8 @@ export const defaults = Object.freeze<SystemConfig>({
enabled: true,
modelName: 'PP-OCRv5_server',
minDetectionScore: 0.5,
minRecognitionScore: 0.5,
maxResolution: 1440,
minRecognitionScore: 0.9,
maxResolution: 736,
},
},
map: {