fix: refact logic

This commit is contained in:
2025-12-31 17:38:32 +08:00
parent 6ac50f7d2f
commit 35928c2484
17 changed files with 678 additions and 738 deletions

312
app/services/converter.py Normal file
View File

@@ -0,0 +1,312 @@
"""Markdown conversion and export service using pypandoc."""
import os
import re
import tempfile
from dataclasses import dataclass
from typing import Literal
import pypandoc
@dataclass
class ConvertResult:
"""Result of markdown conversion."""
latex: str
mathml: str
@dataclass
class ExportResult:
"""Result of markdown export."""
file_path: str
content_type: str
download_name: str
ExportType = Literal["docx", "pdf"]
class Converter:
"""Service for conversion and export operations."""
# Pandoc input format with LaTeX math extensions
INPUT_FORMAT = "markdown+raw_tex+tex_math_dollars+tex_math_double_backslash"
def __init__(self):
"""Initialize converter."""
def convert_to_formats(self, md_text: str) -> ConvertResult:
"""Convert markdown to LaTeX and MathML formats.
Args:
md_text: Markdown text to convert.
Returns:
ConvertResult with latex and mathml fields.
Raises:
ValueError: If md_text is empty.
RuntimeError: If conversion fails.
"""
if md_text == "":
return ConvertResult(latex="", mathml="")
try:
# Convert to LaTeX
latex_output = pypandoc.convert_text(
md_text,
"latex",
format=self.INPUT_FORMAT,
).rstrip("\n")
# Convert to HTML with MathML
mathml_output = pypandoc.convert_text(
md_text,
"html",
format=self.INPUT_FORMAT,
extra_args=["--mathml"],
).rstrip("\n")
return ConvertResult(latex=latex_output, mathml=mathml_output)
except Exception as e:
raise RuntimeError(f"Conversion failed: {e}") from e
def preprocess_for_export(self, md_text: str) -> str:
"""Preprocess markdown text for export to docx/pdf.
Handles LaTeX formula formatting, matrix environments, and
other transformations needed for proper Word/PDF rendering.
Args:
md_text: Raw markdown text.
Returns:
Preprocessed markdown text.
"""
# Replace \[1mm] => \vspace{1mm}
md_text = re.sub(r"\\\[1mm\]", r"\\vspace{1mm}", md_text)
# Add blank lines around \[...\] block formulas
md_text = re.sub(
r"([^\n])(\s*)\\\[(.*?)\\\]([^\n])",
r"\1\n\n\\[\3\\]\n\n\4",
md_text,
flags=re.DOTALL,
)
md_text = re.sub(
r"^(\s*)\\\[(.*?)\\\](\s*)(?=\n|$)",
r"\n\\[\2\\]\n",
md_text,
flags=re.MULTILINE | re.DOTALL,
)
# Remove arithmatex span wrappers
cleaned_md = re.sub(r'<span class="arithmatex">(.*?)</span>', r"\1", md_text)
# Convert inline formulas: \( \) => $ $
cleaned_md = re.sub(r"\\\(", r"$", cleaned_md)
cleaned_md = re.sub(r"\\\)", r"$", cleaned_md)
# Convert block formulas: \[ \] => $$ $$
cleaned_md = re.sub(r"\\\[", r"$$", cleaned_md)
cleaned_md = re.sub(r"\\\]", r"$$", cleaned_md)
# Remove spaces between $ and formula content
# Use negative lookahead/lookbehind to avoid matching $$ block formulas
cleaned_md = re.sub(r"(?<!\$)\$ +(.+?) +\$(?!\$)", r"$\1$", cleaned_md)
# Convert matrix environments for better Word rendering
cleaned_md = self._convert_matrix_environments(cleaned_md)
# Fix brace spacing for equation systems
cleaned_md = self._fix_brace_spacing(cleaned_md)
# Convert cases and aligned environments
cleaned_md = self._convert_special_environments(cleaned_md)
return cleaned_md
def _convert_matrix_environments(self, md_text: str) -> str:
"""Convert vmatrix/Vmatrix to left/right delimited forms.
This fixes the vertical line height issues in Word.
"""
# vmatrix -> \left| \begin{matrix}...\end{matrix} \right|
md_text = re.sub(
r"\\begin\{vmatrix\}(.*?)\\end\{vmatrix\}",
r"\\left| \\begin{matrix}\1\\end{matrix} \\right|",
md_text,
flags=re.DOTALL,
)
# Vmatrix -> \left\| \begin{matrix}...\end{matrix} \right\|
md_text = re.sub(
r"\\begin\{Vmatrix\}(.*?)\\end\{Vmatrix\}",
r"\\left\\| \\begin{matrix}\1\\end{matrix} \\right\\|",
md_text,
flags=re.DOTALL,
)
return md_text
def _fix_brace_spacing(self, md_text: str) -> str:
"""Fix spacing issues with braces in equation systems.
Removes whitespace and adds negative space for proper alignment in Word/OMML.
"""
# Fix \left\{ spacing
md_text = re.sub(
r"\\left\\\{\s+",
r"\\left\\{\\!",
md_text,
)
# Fix \right\} spacing
md_text = re.sub(
r"\s+\\right\\\}",
r"\\!\\right\\}",
md_text,
)
return md_text
def _convert_special_environments(self, md_text: str) -> str:
"""Convert cases and aligned environments to array format.
These environments have better rendering support in Word/OMML.
"""
def convert_cases(match: re.Match) -> str:
content = match.group(1)
return r"\left\{\begin{array}{ll}" + content + r"\end{array}\right."
md_text = re.sub(
r"\\begin\{cases\}(.*?)\\end\{cases\}",
convert_cases,
md_text,
flags=re.DOTALL,
)
def convert_aligned_to_array(match: re.Match) -> str:
content = match.group(1)
# Remove leading & alignment markers (not needed in array{l})
content = re.sub(r"(^|\\\\)\s*&", r"\1", content)
return r"\left\{\begin{array}{l}" + content + r"\end{array}\right."
md_text = re.sub(
r"\\left\\\{\\begin\{aligned\}(.*?)\\end\{aligned\}\\right\.",
convert_aligned_to_array,
md_text,
flags=re.DOTALL,
)
def convert_standalone_aligned(match: re.Match) -> str:
content = match.group(1)
content = re.sub(r"(^|\\\\)\s*&", r"\1", content)
return r"\begin{array}{l}" + content + r"\end{array}"
md_text = re.sub(
r"\\begin\{aligned\}(.*?)\\end\{aligned\}",
convert_standalone_aligned,
md_text,
flags=re.DOTALL,
)
return md_text
def export_to_file(self, md_text: str, export_type: ExportType = "docx") -> bytes:
"""Export markdown to docx or pdf file.
Args:
md_text: Markdown text to export.
export_type: Export format, either 'docx' or 'pdf'.
Returns:
bytes of the exported file.
Raises:
ValueError: If export_type is not supported.
RuntimeError: If export fails.
"""
# Preprocess markdown
cleaned_md = self.preprocess_for_export(md_text)
# Create temp file for input
with tempfile.NamedTemporaryFile(suffix=".md", delete=False) as f_in:
f_in.write(cleaned_md.encode("utf-8"))
md_path = f_in.name
output_file = md_path + "." + export_type
try:
if export_type == "docx":
self._export_docx(md_path, output_file)
with open(output_file, "rb") as f:
return f.read()
else: # pdf
self._export_pdf(md_path, output_file)
with open(output_file, "rb") as f:
return f.read()
except Exception as e:
# Cleanup on error
self._cleanup_files(md_path, output_file)
raise RuntimeError(f"Export failed: {e}") from e
finally:
# Always cleanup input file
if os.path.exists(md_path):
os.remove(md_path)
def _export_docx(self, input_path: str, output_path: str) -> None:
"""Export to DOCX format using pypandoc."""
extra_args = [
"--highlight-style=pygments",
f"--reference-doc=app/pkg/reference.docx",
]
pypandoc.convert_file(
input_path,
"docx",
format=self.INPUT_FORMAT,
outputfile=output_path,
extra_args=extra_args,
)
def _export_pdf(self, input_path: str, output_path: str) -> None:
"""Export to PDF format using pypandoc with XeLaTeX."""
extra_args = [
"--pdf-engine=xelatex",
"-V",
"mainfont=Noto Sans CJK SC",
"--highlight-style=pygments",
]
pypandoc.convert_file(
input_path,
"pdf",
format=self.INPUT_FORMAT,
outputfile=output_path,
extra_args=extra_args,
)
def _cleanup_files(self, *paths: str) -> None:
"""Remove files if they exist."""
for path in paths:
if os.path.exists(path):
os.remove(path)
def cleanup_export_file(self, file_path: str) -> None:
"""Cleanup exported file after sending response.
Call this after sending the file to the client.
Args:
file_path: Path to the exported file.
"""
if os.path.exists(file_path):
os.remove(file_path)

View File

@@ -1,335 +0,0 @@
"""Markdown to DOCX conversion service.
Reference implementation based on https://github.com/YogeLiu/markdown_2_docx
"""
import io
import re
from dataclasses import dataclass
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.shared import Inches, Pt
@dataclass
class MarkdownElement:
"""Parsed markdown element."""
type: str # heading, paragraph, list_item, code_block, table, math
content: str
level: int = 0 # For headings and lists
language: str = "" # For code blocks
class DocxConverter:
"""Converts markdown content to DOCX format."""
def __init__(self):
"""Initialize the converter."""
self.heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$")
self.list_pattern = re.compile(r"^(\s*)[-*+]\s+(.+)$")
self.ordered_list_pattern = re.compile(r"^(\s*)\d+\.\s+(.+)$")
self.code_block_pattern = re.compile(r"^```(\w*)$")
self.inline_code_pattern = re.compile(r"`([^`]+)`")
self.bold_pattern = re.compile(r"\*\*([^*]+)\*\*")
self.italic_pattern = re.compile(r"\*([^*]+)\*")
self.math_block_pattern = re.compile(r"\$\$(.+?)\$\$", re.DOTALL)
self.inline_math_pattern = re.compile(r"\$([^$]+)\$")
def convert(self, markdown: str) -> bytes:
"""Convert markdown content to DOCX.
Args:
markdown: Markdown content to convert.
Returns:
DOCX file as bytes.
"""
doc = Document()
elements = self._parse_markdown(markdown)
for element in elements:
self._add_element_to_doc(doc, element)
# Save to bytes
buffer = io.BytesIO()
doc.save(buffer)
buffer.seek(0)
return buffer.getvalue()
def _parse_markdown(self, markdown: str) -> list[MarkdownElement]:
"""Parse markdown into elements.
Args:
markdown: Markdown content.
Returns:
List of parsed elements.
"""
elements: list[MarkdownElement] = []
lines = markdown.split("\n")
i = 0
in_code_block = False
code_content = []
code_language = ""
while i < len(lines):
line = lines[i]
# Code block handling
code_match = self.code_block_pattern.match(line)
if code_match:
if in_code_block:
elements.append(
MarkdownElement(
type="code_block",
content="\n".join(code_content),
language=code_language,
)
)
code_content = []
in_code_block = False
else:
in_code_block = True
code_language = code_match.group(1)
i += 1
continue
if in_code_block:
code_content.append(line)
i += 1
continue
# Math block ($$...$$)
if line.strip().startswith("$$"):
math_content = []
if line.strip() == "$$":
i += 1
while i < len(lines) and lines[i].strip() != "$$":
math_content.append(lines[i])
i += 1
else:
# Single line $$...$$ or start
content = line.strip()[2:]
if content.endswith("$$"):
math_content.append(content[:-2])
else:
math_content.append(content)
i += 1
while i < len(lines):
if lines[i].strip().endswith("$$"):
math_content.append(lines[i].strip()[:-2])
break
math_content.append(lines[i])
i += 1
elements.append(
MarkdownElement(type="math", content="\n".join(math_content))
)
i += 1
continue
# Heading
heading_match = self.heading_pattern.match(line)
if heading_match:
level = len(heading_match.group(1))
content = heading_match.group(2)
elements.append(
MarkdownElement(type="heading", content=content, level=level)
)
i += 1
continue
# Unordered list
list_match = self.list_pattern.match(line)
if list_match:
indent = len(list_match.group(1))
content = list_match.group(2)
elements.append(
MarkdownElement(type="list_item", content=content, level=indent // 2)
)
i += 1
continue
# Ordered list
ordered_match = self.ordered_list_pattern.match(line)
if ordered_match:
indent = len(ordered_match.group(1))
content = ordered_match.group(2)
elements.append(
MarkdownElement(
type="ordered_list_item", content=content, level=indent // 2
)
)
i += 1
continue
# Table (simple detection)
if "|" in line and i + 1 < len(lines) and "---" in lines[i + 1]:
table_lines = [line]
i += 1
while i < len(lines) and "|" in lines[i]:
table_lines.append(lines[i])
i += 1
elements.append(
MarkdownElement(type="table", content="\n".join(table_lines))
)
continue
# Regular paragraph
if line.strip():
elements.append(MarkdownElement(type="paragraph", content=line))
i += 1
return elements
def _add_element_to_doc(self, doc: Document, element: MarkdownElement) -> None:
"""Add a markdown element to the document.
Args:
doc: Word document.
element: Parsed markdown element.
"""
if element.type == "heading":
self._add_heading(doc, element.content, element.level)
elif element.type == "paragraph":
self._add_paragraph(doc, element.content)
elif element.type == "list_item":
self._add_list_item(doc, element.content, element.level, ordered=False)
elif element.type == "ordered_list_item":
self._add_list_item(doc, element.content, element.level, ordered=True)
elif element.type == "code_block":
self._add_code_block(doc, element.content)
elif element.type == "table":
self._add_table(doc, element.content)
elif element.type == "math":
self._add_math(doc, element.content)
def _add_heading(self, doc: Document, content: str, level: int) -> None:
"""Add a heading to the document."""
# Map markdown levels to Word heading styles
heading_level = min(level, 9) # Word supports up to Heading 9
doc.add_heading(content, level=heading_level)
def _add_paragraph(self, doc: Document, content: str) -> None:
"""Add a paragraph with inline formatting."""
para = doc.add_paragraph()
self._add_formatted_text(para, content)
def _add_formatted_text(self, para, content: str) -> None:
"""Add text with inline formatting (bold, italic, code)."""
# Simple approach: process inline patterns
remaining = content
while remaining:
# Find next formatting marker
bold_match = self.bold_pattern.search(remaining)
italic_match = self.italic_pattern.search(remaining)
code_match = self.inline_code_pattern.search(remaining)
math_match = self.inline_math_pattern.search(remaining)
matches = [
(bold_match, "bold"),
(italic_match, "italic"),
(code_match, "code"),
(math_match, "math"),
]
matches = [(m, t) for m, t in matches if m]
if not matches:
para.add_run(remaining)
break
# Find earliest match
earliest = min(matches, key=lambda x: x[0].start())
match, match_type = earliest
# Add text before match
if match.start() > 0:
para.add_run(remaining[: match.start()])
# Add formatted text
run = para.add_run(match.group(1))
if match_type == "bold":
run.bold = True
elif match_type == "italic":
run.italic = True
elif match_type == "code":
run.font.name = "Courier New"
run.font.size = Pt(10)
elif match_type == "math":
run.italic = True
remaining = remaining[match.end() :]
def _add_list_item(
self, doc: Document, content: str, level: int, ordered: bool
) -> None:
"""Add a list item."""
para = doc.add_paragraph(style="List Bullet" if not ordered else "List Number")
para.paragraph_format.left_indent = Inches(0.25 * level)
self._add_formatted_text(para, content)
def _add_code_block(self, doc: Document, content: str) -> None:
"""Add a code block."""
para = doc.add_paragraph()
para.paragraph_format.left_indent = Inches(0.5)
run = para.add_run(content)
run.font.name = "Courier New"
run.font.size = Pt(9)
# Add shading
shading = OxmlElement("w:shd")
shading.set(qn("w:val"), "clear")
shading.set(qn("w:fill"), "F0F0F0")
para._p.get_or_add_pPr().append(shading)
def _add_table(self, doc: Document, content: str) -> None:
"""Add a table from markdown table format."""
lines = [l.strip() for l in content.split("\n") if l.strip()]
if len(lines) < 2:
return
# Parse header
header = [c.strip() for c in lines[0].split("|") if c.strip()]
# Skip separator line
data_lines = lines[2:] if len(lines) > 2 else []
# Create table
table = doc.add_table(rows=1, cols=len(header))
table.style = "Table Grid"
# Add header
header_cells = table.rows[0].cells
for i, text in enumerate(header):
header_cells[i].text = text
header_cells[i].paragraphs[0].runs[0].bold = True
# Add data rows
for line in data_lines:
cells = [c.strip() for c in line.split("|") if c.strip()]
row_cells = table.add_row().cells
for i, text in enumerate(cells):
if i < len(row_cells):
row_cells[i].text = text
def _add_math(self, doc: Document, content: str) -> None:
"""Add a math block.
For proper OMML rendering, this would need more complex conversion.
Currently renders as italic text with the LaTeX source.
"""
para = doc.add_paragraph()
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = para.add_run(content)
run.italic = True
run.font.name = "Cambria Math"
run.font.size = Pt(12)

View File

@@ -116,7 +116,7 @@ class ImageProcessor:
else:
raise ValueError("Either image_url or image_base64 must be provided")
return self.add_padding(image)
return image
def image_to_base64(self, image: np.ndarray, format: str = "PNG") -> str:
"""Convert numpy image to base64 string.

View File

@@ -1,122 +1,157 @@
"""DocLayout-YOLO wrapper for document layout detection."""
"""PP-DocLayoutV2 wrapper for document layout detection."""
import numpy as np
from app.schemas.image import LayoutInfo, LayoutRegion
from app.core.config import get_settings
from paddleocr import LayoutDetection
from typing import Optional
settings = get_settings()
class LayoutDetector:
"""Wrapper for DocLayout-YOLO model."""
"""Layout detector for PP-DocLayoutV2."""
# Class names from DocLayout-YOLO
CLASS_NAMES = {
0: "title",
1: "plain_text",
2: "abandon",
3: "figure",
4: "figure_caption",
5: "table",
6: "table_caption",
7: "table_footnote",
8: "isolate_formula",
9: "formula_caption",
_layout_detector: Optional[LayoutDetection] = None
# PP-DocLayoutV2 class ID to label mapping
CLS_ID_TO_LABEL: dict[int, str] = {
0: "abstract",
1: "algorithm",
2: "aside_text",
3: "chart",
4: "content",
5: "display_formula",
6: "doc_title",
7: "figure_title",
8: "footer",
9: "footer_image",
10: "footnote",
11: "formula_number",
12: "header",
13: "header_image",
14: "image",
15: "inline_formula",
16: "number",
17: "paragraph_title",
18: "reference",
19: "reference_content",
20: "seal",
21: "table",
22: "text",
23: "vertical_text",
24: "vision_footnote",
}
# Classes considered as plain text
PLAIN_TEXT_CLASSES = {"title", "plain_text", "figure_caption", "table_caption", "table_footnote"}
# Mapping from raw labels to normalized region types
LABEL_TO_TYPE: dict[str, str] = {
# Text types
"abstract": "text",
"algorithm": "text",
"aside_text": "text",
"content": "text",
"doc_title": "text",
"footer": "text",
"footnote": "text",
"header": "text",
"number": "text",
"paragraph_title": "text",
"reference": "text",
"reference_content": "text",
"text": "text",
"vertical_text": "text",
"vision_footnote": "text",
# Formula types
"display_formula": "formula",
"inline_formula": "formula",
"formula_number": "formula",
# Table types
"table": "table",
# Figure types
"chart": "figure",
"figure_title": "figure",
"footer_image": "figure",
"header_image": "figure",
"image": "figure",
"seal": "figure",
}
# Classes considered as formula
FORMULA_CLASSES = {"isolate_formula", "formula_caption"}
def __init__(self, model_path: str, confidence_threshold: float = 0.2):
"""Initialize the layout detector.
def __init__(self):
"""Initialize layout detector.
Args:
model_path: Path to the DocLayout-YOLO model weights.
confidence_threshold: Minimum confidence for detections.
"""
self.model_path = model_path
self.confidence_threshold = confidence_threshold
self.model = None
_ = self._get_layout_detector()
def load_model(self) -> None:
"""Load the DocLayout-YOLO model.
def _get_layout_detector(self):
"""Get or create LayoutDetection instance."""
if LayoutDetector._layout_detector is None:
LayoutDetector._layout_detector = LayoutDetection(model_name="PP-DocLayoutV2")
return LayoutDetector._layout_detector
Raises:
RuntimeError: If model cannot be loaded.
"""
try:
from doclayout_yolo import YOLOv10
self.model = YOLOv10(self.model_path)
except Exception as e:
raise RuntimeError(f"Failed to load DocLayout-YOLO model: {e}") from e
def detect(self, image: np.ndarray, image_size: int = 1024) -> LayoutInfo:
"""Detect document layout regions.
def detect(self, image: np.ndarray) -> LayoutInfo:
"""Detect layout of the image using PP-DocLayoutV2.
Args:
image: Input image as numpy array in BGR format.
image_size: Image size for prediction.
image: Input image as numpy array.
Returns:
LayoutInfo with detected regions.
Raises:
RuntimeError: If model not loaded.
LayoutInfo with detected regions and flags.
"""
if self.model is None:
raise RuntimeError("Model not loaded. Call load_model() first.")
# Run prediction
results = self.model.predict(
image,
imgsz=image_size,
conf=self.confidence_threshold,
device=settings.device,
)
layout_detector = self._get_layout_detector()
result = layout_detector.predict(image)
# Parse the result
regions: list[LayoutRegion] = []
has_plain_text = False
has_formula = False
mixed_recognition = False
if results and len(results) > 0:
result = results[0]
if result.boxes is not None:
for box in result.boxes:
cls_id = int(box.cls[0].item())
confidence = float(box.conf[0].item())
bbox = box.xyxy[0].tolist()
# Handle result format: [{'input_path': ..., 'page_index': None, 'boxes': [...]}]
if isinstance(result, list) and len(result) > 0:
first_result = result[0]
if isinstance(first_result, dict) and "boxes" in first_result:
boxes = first_result.get("boxes", [])
else:
boxes = []
else:
boxes = []
class_name = self.CLASS_NAMES.get(cls_id, f"unknown_{cls_id}")
for box in boxes:
cls_id = box.get("cls_id")
label = box.get("label") or self.CLS_ID_TO_LABEL.get(cls_id, "other")
score = box.get("score", 0.0)
coordinate = box.get("coordinate", [0, 0, 0, 0])
# Map to simplified type
if class_name in self.PLAIN_TEXT_CLASSES:
region_type = "text"
has_plain_text = True
elif class_name in self.FORMULA_CLASSES:
region_type = "formula"
has_formula = True
elif class_name in {"figure"}:
region_type = "figure"
elif class_name in {"table"}:
region_type = "table"
else:
region_type = class_name
# Normalize label to region type
region_type = self.LABEL_TO_TYPE.get(label, "text")
regions.append(
LayoutRegion(
type=region_type,
bbox=bbox,
confidence=confidence,
)
)
regions.append(LayoutRegion(
type=region_type,
bbox=coordinate,
confidence=score,
score=score,
))
return LayoutInfo(
regions=regions,
has_plain_text=has_plain_text,
has_formula=has_formula,
)
mixed_recognition = any(region.type == "text" and region.score > 0.85 for region in regions)
return LayoutInfo(regions=regions, MixedRecognition=mixed_recognition)
if __name__ == "__main__":
import cv2
from app.services.image_processor import ImageProcessor
layout_detector = LayoutDetector()
image_path = "test/timeout.png"
image = cv2.imread(image_path)
image_processor = ImageProcessor(padding_ratio=0.15)
image = image_processor.add_padding(image)
# Save the padded image for debugging
cv2.imwrite("debug_padded_image.png", image)
layout_info = layout_detector.detect(image)
print(layout_info)

View File

@@ -1,14 +1,12 @@
"""PaddleOCR-VL client service for text and formula recognition."""
import io
import tempfile
from pathlib import Path
import cv2
import numpy as np
from app.core.config import get_settings
from app.schemas.image import LayoutInfo
from paddleocr import PaddleOCRVL
from typing import Optional
from app.services.layout_detector import LayoutDetector
from app.services.image_processor import ImageProcessor
from app.services.converter import Converter
settings = get_settings()
@@ -16,52 +14,40 @@ settings = get_settings()
class OCRService:
"""Service for OCR using PaddleOCR-VL."""
FORMULA_PROMPT = "Please recognize the mathematical formula in this image and output in LaTeX format."
_pipeline: Optional[PaddleOCRVL] = None
_layout_detector: Optional[LayoutDetector] = None
def __init__(
self,
vl_server_url: str | None = None,
pp_doclayout_model_dir: str | None = None,
vl_server_url: str,
layout_detector: LayoutDetector,
image_processor: ImageProcessor,
converter: Converter,
):
"""Initialize OCR service.
Args:
vl_server_url: URL of the vLLM server for PaddleOCR-VL.
pp_doclayout_model_dir: Path to PP-DocLayoutV2 model directory.
layout_detector: Layout detector instance.
image_processor: Image processor instance.
"""
self.vl_server_url = vl_server_url or settings.paddleocr_vl_url
self.pp_doclayout_model_dir = pp_doclayout_model_dir or settings.pp_doclayout_model_dir
self._pipeline = None
def _get_pipeline(self):
self.layout_detector = layout_detector
self.image_processor = image_processor
self.converter = converter
def _get_pipeline(self):
"""Get or create PaddleOCR-VL pipeline.
Returns:
PaddleOCRVL pipeline instance.
"""
if self._pipeline is None:
from paddleocr import PaddleOCRVL
self._pipeline = PaddleOCRVL(
if OCRService._pipeline is None:
OCRService._pipeline = PaddleOCRVL(
vl_rec_backend="vllm-server",
vl_rec_server_url=self.vl_server_url,
layout_detection_model_name="PP-DocLayoutV2",
layout_detection_model_dir=self.pp_doclayout_model_dir,
)
return self._pipeline
def _save_temp_image(self, image: np.ndarray) -> str:
"""Save image to a temporary file.
Args:
image: Image as numpy array in BGR format.
Returns:
Path to temporary file.
"""
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
cv2.imwrite(f.name, image)
return f.name
return OCRService._pipeline
def recognize_mixed(self, image: np.ndarray) -> dict:
"""Recognize mixed content (text + formulas) using PP-DocLayoutV2.
@@ -77,30 +63,21 @@ class OCRService:
"""
try:
pipeline = self._get_pipeline()
temp_path = self._save_temp_image(image)
try:
results = list(pipeline.predict(temp_path))
output = pipeline.predict(image, use_layout_detection=True)
markdown_content = ""
for result in results:
# PaddleOCR-VL results can be saved to markdown
md_buffer = io.StringIO()
result.save_to_markdown(save_path=md_buffer)
markdown_content += md_buffer.getvalue()
markdown_content = ""
# Convert markdown to other formats
latex = self._markdown_to_latex(markdown_content)
mathml = self._extract_mathml(markdown_content)
for res in output:
markdown_content += res.markdown.get("markdown_texts", "")
return {
"markdown": markdown_content,
"latex": latex,
"mathml": mathml,
}
finally:
Path(temp_path).unlink(missing_ok=True)
convert_result = self.converter.convert_to_formats(markdown_content)
return {
"markdown": markdown_content,
"latex": convert_result.latex,
"mathml": convert_result.mathml,
}
except Exception as e:
raise RuntimeError(f"Mixed recognition failed: {e}") from e
@@ -116,188 +93,49 @@ class OCRService:
Dict with 'latex', 'markdown', 'mathml' keys.
"""
try:
import httpx
pipeline = self._get_pipeline()
temp_path = self._save_temp_image(image)
output = pipeline.predict(image, use_layout_detection=False, prompt_label="formula")
try:
# Use vLLM API directly for formula recognition
import base64
markdown_content = ""
with open(temp_path, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode("utf-8")
for res in output:
markdown_content += res.markdown.get("markdown_texts", "")
# Call vLLM server with formula prompt
response = httpx.post(
f"{self.vl_server_url}/chat/completions",
json={
"model": "paddleocr-vl",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": self.FORMULA_PROMPT},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_base64}"},
},
],
}
],
"max_tokens": 1024,
},
timeout=60.0,
)
response.raise_for_status()
result = response.json()
convert_result = self.converter.convert_to_formats(markdown_content)
latex = result["choices"][0]["message"]["content"].strip()
# Convert latex to other formats
markdown = self._latex_to_markdown(latex)
mathml = self._latex_to_mathml(latex)
return {
"latex": latex,
"markdown": markdown,
"mathml": mathml,
}
finally:
Path(temp_path).unlink(missing_ok=True)
except httpx.HTTPStatusError as e:
raise RuntimeError(f"Formula recognition failed: HTTP {e.response.status_code}") from e
return {
"latex": convert_result.latex,
"mathml": convert_result.mathml,
"markdown": markdown_content,
}
except Exception as e:
raise RuntimeError(f"Formula recognition failed: {e}") from e
def recognize(self, image: np.ndarray, layout_info: LayoutInfo) -> dict:
"""Recognize content based on layout detection results.
def recognize(self, image: np.ndarray) -> dict:
"""Recognize content using PaddleOCR-VL.
Args:
image: Input image as numpy array in BGR format.
layout_info: Layout detection results.
Returns:
Dict with recognition results including mode used.
Dict with 'latex', 'markdown', 'mathml' keys.
"""
# Decision logic:
# - If plain text exists -> use mixed_recognition (PP-DocLayoutV2)
# - Otherwise -> use formula_recognition (VL with prompt)
if layout_info.has_plain_text:
result = self.recognize_mixed(image)
result["recognition_mode"] = "mixed_recognition"
padded_image = self.image_processor.add_padding(image)
layout_info = self.layout_detector.detect(padded_image)
if layout_info.MixedRecognition:
return self.recognize_mixed(image)
else:
result = self.recognize_formula(image)
result["recognition_mode"] = "formula_recognition"
return self.recognize_formula(image)
return result
def _markdown_to_latex(self, markdown: str) -> str:
"""Convert markdown to LaTeX.
Simple conversion - wraps content in LaTeX document structure.
Args:
markdown: Markdown content.
Returns:
LaTeX representation.
"""
# Basic conversion: preserve math blocks, convert structure
lines = []
in_code_block = False
for line in markdown.split("\n"):
if line.startswith("```"):
in_code_block = not in_code_block
if in_code_block:
lines.append("\\begin{verbatim}")
else:
lines.append("\\end{verbatim}")
elif in_code_block:
lines.append(line)
elif line.startswith("# "):
lines.append(f"\\section{{{line[2:]}}}")
elif line.startswith("## "):
lines.append(f"\\subsection{{{line[3:]}}}")
elif line.startswith("### "):
lines.append(f"\\subsubsection{{{line[4:]}}}")
elif line.startswith("- "):
lines.append(f"\\item {line[2:]}")
elif line.startswith("$$"):
lines.append(line.replace("$$", "\\[").replace("$$", "\\]"))
elif "$" in line:
# Keep inline math as-is
lines.append(line)
else:
lines.append(line)
return "\n".join(lines)
def _latex_to_markdown(self, latex: str) -> str:
"""Convert LaTeX to markdown.
Args:
latex: LaTeX content.
Returns:
Markdown representation.
"""
# Wrap LaTeX in markdown math block
if latex.strip():
return f"$$\n{latex}\n$$"
return ""
def _latex_to_mathml(self, latex: str) -> str:
"""Convert LaTeX to MathML.
Args:
latex: LaTeX content.
Returns:
MathML representation.
"""
# Basic LaTeX to MathML conversion
# For production, consider using latex2mathml library
if not latex.strip():
return ""
try:
# Try to use latex2mathml if available
from latex2mathml.converter import convert
return convert(latex)
except ImportError:
# Fallback: wrap in basic MathML structure
return f'<math xmlns="http://www.w3.org/1998/Math/MathML"><mtext>{latex}</mtext></math>'
except Exception:
return f'<math xmlns="http://www.w3.org/1998/Math/MathML"><mtext>{latex}</mtext></math>'
def _extract_mathml(self, markdown: str) -> str:
"""Extract and convert math from markdown to MathML.
Args:
markdown: Markdown content.
Returns:
MathML for any math content found.
"""
import re
# Find all math blocks
math_blocks = re.findall(r"\$\$(.*?)\$\$", markdown, re.DOTALL)
inline_math = re.findall(r"\$([^$]+)\$", markdown)
all_math = math_blocks + inline_math
if not all_math:
return ""
# Convert each to MathML and combine
mathml_parts = []
for latex in all_math:
mathml = self._latex_to_mathml(latex.strip())
if mathml:
mathml_parts.append(mathml)
return "\n".join(mathml_parts)
if __name__ == "__main__":
import cv2
from app.services.image_processor import ImageProcessor
from app.services.layout_detector import LayoutDetector
image_processor = ImageProcessor(padding_ratio=0.15)
layout_detector = LayoutDetector()
ocr_service = OCRService(image_processor=image_processor, layout_detector=layout_detector)
image = cv2.imread("test/image.png")
ocr_result = ocr_service.recognize(image)
print(ocr_result)