Files
doc_processer/app/services/ocr_service.py
2026-02-05 21:26:23 +08:00

659 lines
23 KiB
Python

"""PaddleOCR-VL client service for text and formula recognition."""
import re
import numpy as np
import cv2
import requests
from io import BytesIO
import base64
from app.core.config import get_settings
from paddleocr import PaddleOCRVL
from typing import Optional
from app.services.layout_detector import LayoutDetector
from app.services.image_processor import ImageProcessor
from app.services.converter import Converter
from abc import ABC, abstractmethod
from openai import OpenAI
settings = get_settings()
_COMMANDS_NEED_SPACE = {
# operators / calculus
"cdot",
"times",
"div",
"pm",
"mp",
"int",
"iint",
"iiint",
"oint",
"sum",
"prod",
"lim",
# common functions
"sin",
"cos",
"tan",
"cot",
"sec",
"csc",
"log",
"ln",
"exp",
# misc
"partial",
"nabla",
}
_MATH_SEGMENT_PATTERN = re.compile(r"\$\$.*?\$\$|\$.*?\$", re.DOTALL)
_COMMAND_TOKEN_PATTERN = re.compile(r"\\[a-zA-Z]+")
# stage2: differentials inside math segments
# IMPORTANT: Very conservative pattern to avoid breaking LaTeX commands and variables
# Only match differentials in specific contexts (after integrals, in fractions)
# (?<!\\) - not preceded by backslash (not a LaTeX command)
# (?<![a-zA-Z]) - not preceded by any letter (not inside a word/command)
# (?![a-zA-Z]) - not followed by another letter (avoid matching "dx" in "dxyz")
_DIFFERENTIAL_UPPER_PATTERN = re.compile(r"(?<!\\)(?<![a-zA-Z])d([A-Z])(?![a-zA-Z])")
_DIFFERENTIAL_LOWER_PATTERN = re.compile(r"(?<!\\)(?<![a-zA-Z])d([a-z])(?![a-zA-Z])")
def _split_glued_command_token(token: str) -> str:
"""Split OCR-glued LaTeX command token by whitelist longest-prefix.
Examples:
- \\cdotdS -> \\cdot dS
- \\intdx -> \\int dx
"""
if not token.startswith("\\"):
return token
body = token[1:]
if len(body) < 2:
return token
best = None
# longest prefix that is in whitelist
for i in range(1, len(body)):
prefix = body[:i]
if prefix in _COMMANDS_NEED_SPACE:
best = prefix
if not best:
return token
suffix = body[len(best) :]
if not suffix:
return token
return f"\\{best} {suffix}"
def _clean_latex_syntax_spaces(expr: str) -> str:
"""Clean unwanted spaces in LaTeX syntax (common OCR errors).
OCR often adds spaces in LaTeX syntax structures where they shouldn't be:
- Subscripts: a _ {i 1} -> a_{i1}
- Superscripts: x ^ {2 3} -> x^{23}
- Fractions: \\frac { a } { b } -> \\frac{a}{b}
- Commands: \\ alpha -> \\alpha
- Braces: { a b } -> {ab} (within subscripts/superscripts)
This is safe because these spaces are always OCR errors - LaTeX doesn't
need or want spaces in these positions.
Args:
expr: LaTeX math expression.
Returns:
Expression with LaTeX syntax spaces cleaned.
"""
# Pattern 1: Spaces around _ and ^ (subscript/superscript operators)
# a _ {i} -> a_{i}, x ^ {2} -> x^{2}
expr = re.sub(r"\s*_\s*", "_", expr)
expr = re.sub(r"\s*\^\s*", "^", expr)
# Pattern 2: Spaces inside braces that follow _ or ^
# _{i 1} -> _{i1}, ^{2 3} -> ^{23}
# This is safe because spaces inside subscript/superscript braces are usually OCR errors
def clean_subscript_superscript_braces(match):
operator = match.group(1) # _ or ^
content = match.group(2) # content inside braces
# Remove spaces but preserve LaTeX commands (e.g., \alpha, \beta)
# Only remove spaces between non-backslash characters
cleaned = re.sub(r"(?<!\\)\s+(?!\\)", "", content)
return f"{operator}{{{cleaned}}}"
# Match _{ ... } or ^{ ... }
expr = re.sub(r"([_^])\{([^}]+)\}", clean_subscript_superscript_braces, expr)
# Pattern 3: Spaces inside \frac arguments
# \frac { a } { b } -> \frac{a}{b}
# \frac{ a + b }{ c } -> \frac{a+b}{c}
def clean_frac_braces(match):
numerator = match.group(1).strip()
denominator = match.group(2).strip()
return f"\\frac{{{numerator}}}{{{denominator}}}"
expr = re.sub(r"\\frac\s*\{\s*([^}]+?)\s*\}\s*\{\s*([^}]+?)\s*\}", clean_frac_braces, expr)
# Pattern 4: Spaces after backslash in LaTeX commands
# \ alpha -> \alpha, \ beta -> \beta
expr = re.sub(r"\\\s+([a-zA-Z]+)", r"\\\1", expr)
# Pattern 5: Spaces before/after braces in general contexts (conservative)
# Only remove if the space is clearly wrong (e.g., after operators)
# { x } in standalone context is kept as-is to avoid breaking valid spacing
# But after operators like \sqrt{ x } -> \sqrt{x}
expr = re.sub(r"(\\[a-zA-Z]+)\s*\{\s*", r"\1{", expr) # \sqrt { -> \sqrt{
return expr
def _postprocess_math(expr: str) -> str:
"""Postprocess a *math* expression (already inside $...$ or $$...$$).
Processing stages:
0. Fix OCR number errors (spaces in numbers)
1. Split glued LaTeX commands (e.g., \\cdotdS -> \\cdot dS)
2. Clean LaTeX syntax spaces (e.g., a _ {i 1} -> a_{i1})
3. Normalize differentials (DISABLED by default to avoid breaking variables)
Args:
expr: LaTeX math expression without delimiters.
Returns:
Processed LaTeX expression.
"""
# stage0: fix OCR number errors (digits with spaces)
expr = _fix_ocr_number_errors(expr)
# stage1: split glued command tokens (e.g. \cdotdS)
expr = _COMMAND_TOKEN_PATTERN.sub(lambda m: _split_glued_command_token(m.group(0)), expr)
# stage2: clean LaTeX syntax spaces (OCR often adds unwanted spaces)
expr = _clean_latex_syntax_spaces(expr)
# stage3: normalize differentials - DISABLED
# This feature is disabled because it's too aggressive and can break:
# - LaTeX commands containing 'd': \vdots, \lambda (via subscripts), \delta, etc.
# - Variable names: dx, dy, dz might be variable names, not differentials
# - Subscripts: x_{dx}, y_{dy}
# - Function names or custom notation
#
# The risk of false positives (breaking valid LaTeX) outweighs the benefit
# of normalizing differentials for OCR output.
#
# If differential normalization is needed, implement a context-aware version:
# expr = _normalize_differentials_contextaware(expr)
return expr
def _normalize_differentials_contextaware(expr: str) -> str:
"""Context-aware differential normalization (optional, not used by default).
Only normalizes differentials in specific mathematical contexts:
1. After integral symbols: \\int dx, \\iint dA, \\oint dr
2. In fraction denominators: \\frac{dy}{dx}
3. In explicit differential notation: f(x)dx (function followed by differential)
This avoids false positives like variable names, subscripts, or LaTeX commands.
Args:
expr: LaTeX math expression.
Returns:
Expression with differentials normalized in safe contexts only.
"""
# Pattern 1: After integral commands
# \int dx -> \int d x
integral_pattern = re.compile(r"(\\i+nt|\\oint)\s*([^\\]*?)\s*d([a-zA-Z])(?![a-zA-Z])")
expr = integral_pattern.sub(r"\1 \2 d \3", expr)
# Pattern 2: In fraction denominators
# \frac{...}{dx} -> \frac{...}{d x}
frac_pattern = re.compile(r"(\\frac\{[^}]*\}\{[^}]*?)d([a-zA-Z])(?![a-zA-Z])([^}]*\})")
expr = frac_pattern.sub(r"\1d \2\3", expr)
return expr
def _fix_ocr_number_errors(expr: str) -> str:
"""Fix common OCR errors in LaTeX math expressions.
OCR often splits numbers incorrectly, especially decimals:
- "2 2. 2" should be "22.2"
- "3 0. 4" should be "30.4"
- "1 5 0" should be "150"
This function merges digit sequences that are separated by spaces.
Args:
expr: LaTeX math expression.
Returns:
LaTeX expression with number errors fixed.
"""
# Fix pattern 1: "digit space digit(s). digit(s)" → "digit digit(s).digit(s)"
# Example: "2 2. 2" → "22.2"
expr = re.sub(r"(\d)\s+(\d+)\.\s*(\d+)", r"\1\2.\3", expr)
# Fix pattern 2: "digit(s). space digit(s)" → "digit(s).digit(s)"
# Example: "22. 2" → "22.2"
expr = re.sub(r"(\d+)\.\s+(\d+)", r"\1.\2", expr)
# Fix pattern 3: "digit space digit" (no decimal point, within same number context)
# Be careful: only merge if followed by decimal point or comma/end
# Example: "1 5 0" → "150" when followed by comma or end
expr = re.sub(r"(\d)\s+(\d)(?=\s*[,\)]|$)", r"\1\2", expr)
# Fix pattern 4: Multiple spaces in decimal numbers
# Example: "2 2 . 2" → "22.2"
expr = re.sub(r"(\d)\s+(\d)(?=\s*\.)", r"\1\2", expr)
return expr
def _postprocess_markdown(markdown_content: str) -> str:
"""Apply LaTeX postprocessing only within $...$ / $$...$$ segments."""
if not markdown_content:
return markdown_content
def _fix_segment(m: re.Match) -> str:
seg = m.group(0)
if seg.startswith("$$") and seg.endswith("$$"):
return f"$${_postprocess_math(seg[2:-2])}$$"
if seg.startswith("$") and seg.endswith("$"):
return f"${_postprocess_math(seg[1:-1])}$"
return seg
markdown_content = _MATH_SEGMENT_PATTERN.sub(_fix_segment, markdown_content)
# Apply markdown-level postprocessing (after LaTeX processing)
markdown_content = _remove_false_heading_from_single_formula(markdown_content)
return markdown_content
def _remove_false_heading_from_single_formula(markdown_content: str) -> str:
"""Remove false heading markers from single-formula content.
OCR sometimes incorrectly identifies a single formula as a heading by adding '#' prefix.
This function detects and removes the heading marker when:
1. The content contains only one formula (display or inline)
2. The formula line starts with '#' (heading marker)
3. No other non-formula text content exists
Examples:
Input: "# $$E = mc^2$$"
Output: "$$E = mc^2$$"
Input: "# $x = y$"
Output: "$x = y$"
Input: "# Introduction\n$$E = mc^2$$" (has text, keep heading)
Output: "# Introduction\n$$E = mc^2$$"
Args:
markdown_content: Markdown text with potential false headings.
Returns:
Markdown text with false heading markers removed.
"""
if not markdown_content or not markdown_content.strip():
return markdown_content
lines = markdown_content.split("\n")
# Count formulas and heading lines
formula_count = 0
heading_lines = []
has_non_formula_text = False
for i, line in enumerate(lines):
line_stripped = line.strip()
if not line_stripped:
continue
# Check if line starts with heading marker
heading_match = re.match(r"^(#{1,6})\s+(.+)$", line_stripped)
if heading_match:
heading_level = heading_match.group(1)
content = heading_match.group(2)
# Check if the heading content is a formula
if re.fullmatch(r"\$\$?.+\$\$?", content):
# This is a heading with a formula
heading_lines.append((i, heading_level, content))
formula_count += 1
else:
# This is a real heading with text
has_non_formula_text = True
elif re.fullmatch(r"\$\$?.+\$\$?", line_stripped):
# Standalone formula line (not in a heading)
formula_count += 1
elif line_stripped and not re.match(r"^#+\s*$", line_stripped):
# Non-empty, non-heading, non-formula line
has_non_formula_text = True
# Only remove heading markers if:
# 1. There's exactly one formula
# 2. That formula is in a heading line
# 3. There's no other text content
if formula_count == 1 and len(heading_lines) == 1 and not has_non_formula_text:
# Remove the heading marker from the formula
line_idx, heading_level, formula_content = heading_lines[0]
lines[line_idx] = formula_content
return "\n".join(lines)
class OCRServiceBase(ABC):
@abstractmethod
def recognize(self, image: np.ndarray) -> dict:
pass
class OCRService(OCRServiceBase):
"""Service for OCR using PaddleOCR-VL."""
_pipeline: Optional[PaddleOCRVL] = None
_layout_detector: Optional[LayoutDetector] = None
def __init__(
self,
vl_server_url: str,
layout_detector: LayoutDetector,
image_processor: ImageProcessor,
converter: Converter,
):
"""Initialize OCR service.
Args:
vl_server_url: URL of the vLLM server for PaddleOCR-VL.
layout_detector: Layout detector instance.
image_processor: Image processor instance.
"""
self.vl_server_url = vl_server_url or settings.paddleocr_vl_url
self.layout_detector = layout_detector
self.image_processor = image_processor
self.converter = converter
def _get_pipeline(self):
"""Get or create PaddleOCR-VL pipeline.
Returns:
PaddleOCRVL pipeline instance.
"""
if OCRService._pipeline is None:
OCRService._pipeline = PaddleOCRVL(
vl_rec_backend="vllm-server",
vl_rec_server_url=self.vl_server_url,
layout_detection_model_name="PP-DocLayoutV2",
)
return OCRService._pipeline
def _recognize_mixed(self, image: np.ndarray) -> dict:
"""Recognize mixed content (text + formulas) using PP-DocLayoutV2.
This mode uses PaddleOCR-VL with PP-DocLayoutV2 for document-aware
recognition of mixed content.
Args:
image: Input image as numpy array in BGR format.
Returns:
Dict with 'markdown', 'latex', 'mathml' keys.
"""
try:
pipeline = self._get_pipeline()
output = pipeline.predict(image, use_layout_detection=True)
markdown_content = ""
for res in output:
markdown_content += res.markdown.get("markdown_texts", "")
markdown_content = _postprocess_markdown(markdown_content)
convert_result = self.converter.convert_to_formats(markdown_content)
return {
"markdown": markdown_content,
"latex": convert_result.latex,
"mathml": convert_result.mathml,
"mml": convert_result.mml,
}
except Exception as e:
raise RuntimeError(f"Mixed recognition failed: {e}") from e
def _recognize_formula(self, image: np.ndarray) -> dict:
"""Recognize formula/math content using PaddleOCR-VL with prompt.
This mode uses PaddleOCR-VL directly with a formula recognition prompt.
Args:
image: Input image as numpy array in BGR format.
Returns:
Dict with 'latex', 'markdown', 'mathml' keys.
"""
try:
pipeline = self._get_pipeline()
output = pipeline.predict(image, use_layout_detection=False, prompt_label="formula")
markdown_content = ""
for res in output:
markdown_content += res.markdown.get("markdown_texts", "")
markdown_content = _postprocess_markdown(markdown_content)
convert_result = self.converter.convert_to_formats(markdown_content)
return {
"latex": convert_result.latex,
"mathml": convert_result.mathml,
"mml": convert_result.mml,
"markdown": markdown_content,
}
except Exception as e:
raise RuntimeError(f"Formula recognition failed: {e}") from e
def recognize(self, image: np.ndarray) -> dict:
"""Recognize content using PaddleOCR-VL.
Args:
image: Input image as numpy array in BGR format.
Returns:
Dict with 'latex', 'markdown', 'mathml' keys.
"""
padded_image = self.image_processor.add_padding(image)
layout_info = self.layout_detector.detect(padded_image)
if layout_info.MixedRecognition:
return self._recognize_mixed(image)
else:
return self._recognize_formula(image)
class MineruOCRService(OCRServiceBase):
"""Service for OCR using local file_parse API."""
def __init__(
self,
api_url: str = "http://127.0.0.1:8000/file_parse",
image_processor: Optional[ImageProcessor] = None,
converter: Optional[Converter] = None,
paddleocr_vl_url: str = "http://localhost:8001/v1",
):
"""Initialize Local API service.
Args:
api_url: URL of the local file_parse API endpoint.
converter: Optional converter instance for format conversion.
paddleocr_vl_url: URL of the PaddleOCR-VL vLLM server.
"""
self.api_url = api_url
self.image_processor = image_processor
self.converter = converter
self.paddleocr_vl_url = paddleocr_vl_url
self.openai_client = OpenAI(api_key="EMPTY", base_url=paddleocr_vl_url, timeout=3600)
def _recognize_formula_with_paddleocr_vl(self, image: np.ndarray, prompt: str = "Formula Recognition:") -> str:
"""Recognize formula using PaddleOCR-VL API.
Args:
image: Input image as numpy array in BGR format.
prompt: Recognition prompt (default: "Formula Recognition:")
Returns:
Recognized formula text (LaTeX format).
"""
try:
# Encode image to base64
success, encoded_image = cv2.imencode(".png", image)
if not success:
raise RuntimeError("Failed to encode image")
image_base64 = base64.b64encode(encoded_image.tobytes()).decode("utf-8")
image_url = f"data:image/png;base64,{image_base64}"
# Call OpenAI-compatible API
messages = [{"role": "user", "content": [{"type": "image_url", "image_url": {"url": image_url}}, {"type": "text", "text": prompt}]}]
response = self.openai_client.chat.completions.create(
model="PaddleOCR-VL-0.9B", # Use exact model name from vLLM server
messages=messages,
temperature=0.0,
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"PaddleOCR-VL formula recognition failed: {e}") from e
def _extract_and_recognize_formulas(self, markdown_content: str, original_image: np.ndarray) -> str:
"""Extract image references from markdown and recognize formulas.
Args:
markdown_content: Markdown content with potential image references.
original_image: Original input image.
Returns:
Markdown content with formulas recognized by PaddleOCR-VL.
"""
# Pattern to match image references: ![](images/xxx.png) or ![](images/xxx.jpg)
image_pattern = re.compile(r"!\[\]\(images/[^)]+\)")
if not image_pattern.search(markdown_content):
return markdown_content
formula_text = self._recognize_formula_with_paddleocr_vl(original_image)
if formula_text.startswith("\[") or formula_text.startswith("\("):
formula_text = formula_text.replace("\[", "$$").replace("\(", "$$")
formula_text = formula_text.replace("\]", "$$").replace("\)", "$$")
else:
formula_text = f"$${formula_text}$$"
return formula_text
def recognize(self, image: np.ndarray) -> dict:
"""Recognize content using local file_parse API.
Args:
image: Input image as numpy array in BGR format.
Returns:
Dict with 'markdown', 'latex', 'mathml' keys.
"""
try:
if self.image_processor:
image = self.image_processor.add_padding(image)
# Convert numpy array to image bytes
success, encoded_image = cv2.imencode(".png", image)
if not success:
raise RuntimeError("Failed to encode image")
image_bytes = BytesIO(encoded_image.tobytes())
# Prepare multipart form data
files = {"files": ("image.png", image_bytes, "image/png")}
data = {
"return_middle_json": "false",
"return_model_output": "false",
"return_md": "true",
"return_images": "false",
"end_page_id": "99999",
"start_page_id": "0",
"lang_list": "en",
"server_url": "string",
"return_content_list": "false",
"backend": "hybrid-auto-engine",
"table_enable": "true",
"response_format_zip": "false",
"formula_enable": "true",
"parse_method": "ocr",
}
# Make API request
response = requests.post(self.api_url, files=files, data=data, headers={"accept": "application/json"}, timeout=30)
response.raise_for_status()
result = response.json()
# Extract markdown content from response
markdown_content = ""
if "results" in result and "image" in result["results"]:
markdown_content = result["results"]["image"].get("md_content", "")
print(f"[DEBUG] Markdown content from Mineru: {markdown_content[:200]}...")
# Check if markdown contains formula image references
if "![](images/" in markdown_content:
print(f"[DEBUG] Detected image reference, calling PaddleOCR-VL...")
# Use PaddleOCR-VL to recognize the formula
markdown_content = self._extract_and_recognize_formulas(markdown_content, image)
else:
print(f"[DEBUG] No image reference found in markdown")
# Apply postprocessing to fix OCR errors
markdown_content = _postprocess_markdown(markdown_content)
# Convert to other formats if converter is available
latex = ""
mathml = ""
mml = ""
if self.converter and markdown_content:
convert_result = self.converter.convert_to_formats(markdown_content)
latex = convert_result.latex
mathml = convert_result.mathml
mml = convert_result.mml
return {
"markdown": markdown_content,
"latex": latex,
"mathml": mathml,
"mml": mml,
}
except requests.RequestException as e:
raise RuntimeError(f"Local API request failed: {e}") from e
except Exception as e:
raise RuntimeError(f"Recognition failed: {e}") from e
if __name__ == "__main__":
mineru_service = MineruOCRService()
image = cv2.imread("test/complex_formula.png")
image_numpy = np.array(image)
ocr_result = mineru_service.recognize(image_numpy)
print(ocr_result)