init repo
This commit is contained in:
0
app/__init__.py
Normal file
0
app/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/api/__init__.py
Normal file
0
app/api/v1/__init__.py
Normal file
0
app/api/v1/__init__.py
Normal file
0
app/api/v1/endpoints/__init__.py
Normal file
0
app/api/v1/endpoints/__init__.py
Normal file
37
app/api/v1/endpoints/convert.py
Normal file
37
app/api/v1/endpoints/convert.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Markdown to DOCX conversion endpoint."""
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi.responses import Response
|
||||
|
||||
from app.core.dependencies import get_docx_converter
|
||||
from app.schemas.convert import MarkdownToDocxRequest
|
||||
from app.services.docx_converter import DocxConverter
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/docx")
|
||||
async def convert_markdown_to_docx(
|
||||
request: MarkdownToDocxRequest,
|
||||
converter: DocxConverter = Depends(get_docx_converter),
|
||||
) -> Response:
|
||||
"""Convert markdown content to DOCX file.
|
||||
|
||||
Returns the generated DOCX file as a binary download.
|
||||
"""
|
||||
try:
|
||||
docx_bytes = converter.convert(request.markdown)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Conversion failed: {e}")
|
||||
|
||||
# Determine filename
|
||||
filename = request.filename or "output"
|
||||
if not filename.endswith(".docx"):
|
||||
filename = f"{filename}.docx"
|
||||
|
||||
return Response(
|
||||
content=docx_bytes,
|
||||
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||
)
|
||||
|
||||
59
app/api/v1/endpoints/image.py
Normal file
59
app/api/v1/endpoints/image.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Image OCR endpoint."""
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
|
||||
from app.core.dependencies import get_image_processor, get_layout_detector, get_ocr_service
|
||||
from app.schemas.image import ImageOCRRequest, ImageOCRResponse
|
||||
from app.services.image_processor import ImageProcessor
|
||||
from app.services.layout_detector import LayoutDetector
|
||||
from app.services.ocr_service import OCRService
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/ocr", response_model=ImageOCRResponse)
|
||||
async def process_image_ocr(
|
||||
request: ImageOCRRequest,
|
||||
image_processor: ImageProcessor = Depends(get_image_processor),
|
||||
layout_detector: LayoutDetector = Depends(get_layout_detector),
|
||||
ocr_service: OCRService = Depends(get_ocr_service),
|
||||
) -> ImageOCRResponse:
|
||||
"""Process an image and extract content as LaTeX, Markdown, and MathML.
|
||||
|
||||
The processing pipeline:
|
||||
1. Load and preprocess image (add 30% whitespace padding)
|
||||
2. Detect layout using DocLayout-YOLO
|
||||
3. Based on layout:
|
||||
- If plain text exists: use PP-DocLayoutV2 for mixed recognition
|
||||
- Otherwise: use PaddleOCR-VL with formula prompt
|
||||
4. Convert output to LaTeX, Markdown, and MathML formats
|
||||
"""
|
||||
try:
|
||||
# 1. Load and preprocess image
|
||||
image = image_processor.preprocess(
|
||||
image_url=request.image_url,
|
||||
image_base64=request.image_base64,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
try:
|
||||
# 2. Detect layout
|
||||
layout_info = layout_detector.detect(image)
|
||||
except RuntimeError as e:
|
||||
raise HTTPException(status_code=500, detail=f"Layout detection failed: {e}")
|
||||
|
||||
try:
|
||||
# 3. Perform OCR based on layout
|
||||
ocr_result = ocr_service.recognize(image, layout_info)
|
||||
except RuntimeError as e:
|
||||
raise HTTPException(status_code=503, detail=str(e))
|
||||
|
||||
# 4. Return response
|
||||
return ImageOCRResponse(
|
||||
latex=ocr_result.get("latex", ""),
|
||||
markdown=ocr_result.get("markdown", ""),
|
||||
mathml=ocr_result.get("mathml", ""),
|
||||
layout_info=layout_info,
|
||||
recognition_mode=ocr_result.get("recognition_mode", ""),
|
||||
)
|
||||
13
app/api/v1/router.py
Normal file
13
app/api/v1/router.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""API v1 router combining all endpoints."""
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
from app.api.v1.endpoints import convert, image
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
# Include image processing endpoints
|
||||
api_router.include_router(image.router, prefix="/image", tags=["Image OCR"])
|
||||
|
||||
# Include conversion endpoints
|
||||
api_router.include_router(convert.router, prefix="/convert", tags=["Conversion"])
|
||||
0
app/core/__init__.py
Normal file
0
app/core/__init__.py
Normal file
52
app/core/config.py
Normal file
52
app/core/config.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""Application configuration using Pydantic Settings."""
|
||||
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Application settings loaded from environment variables."""
|
||||
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
case_sensitive=False,
|
||||
)
|
||||
|
||||
# API Settings
|
||||
api_prefix: str = "/doc_process/v1"
|
||||
debug: bool = False
|
||||
|
||||
# PaddleOCR-VL Settings
|
||||
paddleocr_vl_url: str = "http://localhost:8080/v1"
|
||||
|
||||
# Model Paths
|
||||
doclayout_model_path: str = "app/model/DocLayout"
|
||||
pp_doclayout_model_dir: str = "app/model/PP-DocLayout"
|
||||
|
||||
# Image Processing
|
||||
max_image_size_mb: int = 10
|
||||
image_padding_ratio: float = 0.15 # 15% on each side = 30% total expansion
|
||||
|
||||
# Server Settings
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8053
|
||||
|
||||
@property
|
||||
def doclayout_model_file(self) -> Path:
|
||||
"""Get the DocLayout model file path."""
|
||||
return Path(self.doclayout_model_path)
|
||||
|
||||
@property
|
||||
def pp_doclayout_dir(self) -> Path:
|
||||
"""Get the PP-DocLayout model directory path."""
|
||||
return Path(self.pp_doclayout_model_dir)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_settings() -> Settings:
|
||||
"""Get cached settings instance."""
|
||||
return Settings()
|
||||
|
||||
42
app/core/dependencies.py
Normal file
42
app/core/dependencies.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Application dependencies."""
|
||||
|
||||
from app.services.image_processor import ImageProcessor
|
||||
from app.services.layout_detector import LayoutDetector
|
||||
from app.services.ocr_service import OCRService
|
||||
from app.services.docx_converter import DocxConverter
|
||||
|
||||
# Global instances (initialized on startup)
|
||||
_layout_detector: LayoutDetector | None = None
|
||||
|
||||
|
||||
def init_layout_detector(model_path: str) -> None:
|
||||
"""Initialize the global layout detector.
|
||||
|
||||
Called during application startup.
|
||||
"""
|
||||
global _layout_detector
|
||||
_layout_detector = LayoutDetector(model_path=model_path)
|
||||
_layout_detector.load_model()
|
||||
|
||||
|
||||
def get_layout_detector() -> LayoutDetector:
|
||||
"""Get the global layout detector instance."""
|
||||
if _layout_detector is None:
|
||||
raise RuntimeError("Layout detector not initialized. Call init_layout_detector() first.")
|
||||
return _layout_detector
|
||||
|
||||
|
||||
def get_image_processor() -> ImageProcessor:
|
||||
"""Get an image processor instance."""
|
||||
return ImageProcessor()
|
||||
|
||||
|
||||
def get_ocr_service() -> OCRService:
|
||||
"""Get an OCR service instance."""
|
||||
return OCRService()
|
||||
|
||||
|
||||
def get_docx_converter() -> DocxConverter:
|
||||
"""Get a DOCX converter instance."""
|
||||
return DocxConverter()
|
||||
|
||||
39
app/main.py
Normal file
39
app/main.py
Normal file
@@ -0,0 +1,39 @@
|
||||
"""FastAPI application entry point."""
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app.api.v1.router import api_router
|
||||
from app.core.config import get_settings
|
||||
from app.core.dependencies import init_layout_detector
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Application lifespan handler for startup/shutdown."""
|
||||
# Startup: Load models
|
||||
init_layout_detector(model_path=settings.doclayout_model_path)
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown: Cleanup happens automatically
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="DocProcesser API",
|
||||
description="Document processing API - Image to LaTeX/Markdown/MathML and Markdown to DOCX",
|
||||
version="0.1.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# Include API router
|
||||
app.include_router(api_router, prefix=settings.api_prefix)
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
return {"status": "healthy"}
|
||||
0
app/model/__init__.py
Normal file
0
app/model/__init__.py
Normal file
0
app/schemas/__init__.py
Normal file
0
app/schemas/__init__.py
Normal file
19
app/schemas/convert.py
Normal file
19
app/schemas/convert.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Request and response schemas for markdown to DOCX conversion endpoint."""
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
|
||||
class MarkdownToDocxRequest(BaseModel):
|
||||
"""Request body for markdown to DOCX conversion endpoint."""
|
||||
|
||||
markdown: str = Field(..., description="Markdown content to convert")
|
||||
filename: str | None = Field(None, description="Optional output filename (without extension)")
|
||||
|
||||
@field_validator("markdown")
|
||||
@classmethod
|
||||
def validate_markdown_not_empty(cls, v: str) -> str:
|
||||
"""Validate that markdown content is not empty."""
|
||||
if not v or not v.strip():
|
||||
raise ValueError("Markdown content cannot be empty")
|
||||
return v
|
||||
|
||||
48
app/schemas/image.py
Normal file
48
app/schemas/image.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Request and response schemas for image OCR endpoint."""
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
|
||||
class LayoutRegion(BaseModel):
|
||||
"""A detected layout region in the document."""
|
||||
|
||||
type: str = Field(..., description="Region type: text, formula, table, figure")
|
||||
bbox: list[float] = Field(..., description="Bounding box [x1, y1, x2, y2]")
|
||||
confidence: float = Field(..., description="Detection confidence score")
|
||||
|
||||
|
||||
class LayoutInfo(BaseModel):
|
||||
"""Layout detection information."""
|
||||
|
||||
regions: list[LayoutRegion] = Field(default_factory=list)
|
||||
has_plain_text: bool = Field(False, description="Whether plain text was detected")
|
||||
has_formula: bool = Field(False, description="Whether formulas were detected")
|
||||
|
||||
|
||||
class ImageOCRRequest(BaseModel):
|
||||
"""Request body for image OCR endpoint."""
|
||||
|
||||
image_url: str | None = Field(None, description="URL to fetch the image from")
|
||||
image_base64: str | None = Field(None, description="Base64-encoded image data")
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_input(self):
|
||||
"""Validate that exactly one of image_url or image_base64 is provided."""
|
||||
if self.image_url is None and self.image_base64 is None:
|
||||
raise ValueError("Either image_url or image_base64 must be provided")
|
||||
if self.image_url is not None and self.image_base64 is not None:
|
||||
raise ValueError("Only one of image_url or image_base64 should be provided")
|
||||
return self
|
||||
|
||||
|
||||
class ImageOCRResponse(BaseModel):
|
||||
"""Response body for image OCR endpoint."""
|
||||
|
||||
latex: str = Field("", description="LaTeX representation of the content")
|
||||
markdown: str = Field("", description="Markdown representation of the content")
|
||||
mathml: str = Field("", description="MathML representation (empty if no math detected)")
|
||||
layout_info: LayoutInfo = Field(default_factory=LayoutInfo)
|
||||
recognition_mode: str = Field(
|
||||
"", description="Recognition mode used: mixed_recognition or formula_recognition"
|
||||
)
|
||||
|
||||
0
app/services/__init__.py
Normal file
0
app/services/__init__.py
Normal file
335
app/services/docx_converter.py
Normal file
335
app/services/docx_converter.py
Normal file
@@ -0,0 +1,335 @@
|
||||
"""Markdown to DOCX conversion service.
|
||||
|
||||
Reference implementation based on https://github.com/YogeLiu/markdown_2_docx
|
||||
"""
|
||||
|
||||
import io
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
from docx import Document
|
||||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||||
from docx.oxml import OxmlElement
|
||||
from docx.oxml.ns import qn
|
||||
from docx.shared import Inches, Pt
|
||||
|
||||
|
||||
@dataclass
|
||||
class MarkdownElement:
|
||||
"""Parsed markdown element."""
|
||||
|
||||
type: str # heading, paragraph, list_item, code_block, table, math
|
||||
content: str
|
||||
level: int = 0 # For headings and lists
|
||||
language: str = "" # For code blocks
|
||||
|
||||
|
||||
class DocxConverter:
|
||||
"""Converts markdown content to DOCX format."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the converter."""
|
||||
self.heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$")
|
||||
self.list_pattern = re.compile(r"^(\s*)[-*+]\s+(.+)$")
|
||||
self.ordered_list_pattern = re.compile(r"^(\s*)\d+\.\s+(.+)$")
|
||||
self.code_block_pattern = re.compile(r"^```(\w*)$")
|
||||
self.inline_code_pattern = re.compile(r"`([^`]+)`")
|
||||
self.bold_pattern = re.compile(r"\*\*([^*]+)\*\*")
|
||||
self.italic_pattern = re.compile(r"\*([^*]+)\*")
|
||||
self.math_block_pattern = re.compile(r"\$\$(.+?)\$\$", re.DOTALL)
|
||||
self.inline_math_pattern = re.compile(r"\$([^$]+)\$")
|
||||
|
||||
def convert(self, markdown: str) -> bytes:
|
||||
"""Convert markdown content to DOCX.
|
||||
|
||||
Args:
|
||||
markdown: Markdown content to convert.
|
||||
|
||||
Returns:
|
||||
DOCX file as bytes.
|
||||
"""
|
||||
doc = Document()
|
||||
elements = self._parse_markdown(markdown)
|
||||
|
||||
for element in elements:
|
||||
self._add_element_to_doc(doc, element)
|
||||
|
||||
# Save to bytes
|
||||
buffer = io.BytesIO()
|
||||
doc.save(buffer)
|
||||
buffer.seek(0)
|
||||
return buffer.getvalue()
|
||||
|
||||
def _parse_markdown(self, markdown: str) -> list[MarkdownElement]:
|
||||
"""Parse markdown into elements.
|
||||
|
||||
Args:
|
||||
markdown: Markdown content.
|
||||
|
||||
Returns:
|
||||
List of parsed elements.
|
||||
"""
|
||||
elements: list[MarkdownElement] = []
|
||||
lines = markdown.split("\n")
|
||||
i = 0
|
||||
in_code_block = False
|
||||
code_content = []
|
||||
code_language = ""
|
||||
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
|
||||
# Code block handling
|
||||
code_match = self.code_block_pattern.match(line)
|
||||
if code_match:
|
||||
if in_code_block:
|
||||
elements.append(
|
||||
MarkdownElement(
|
||||
type="code_block",
|
||||
content="\n".join(code_content),
|
||||
language=code_language,
|
||||
)
|
||||
)
|
||||
code_content = []
|
||||
in_code_block = False
|
||||
else:
|
||||
in_code_block = True
|
||||
code_language = code_match.group(1)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if in_code_block:
|
||||
code_content.append(line)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Math block ($$...$$)
|
||||
if line.strip().startswith("$$"):
|
||||
math_content = []
|
||||
if line.strip() == "$$":
|
||||
i += 1
|
||||
while i < len(lines) and lines[i].strip() != "$$":
|
||||
math_content.append(lines[i])
|
||||
i += 1
|
||||
else:
|
||||
# Single line $$...$$ or start
|
||||
content = line.strip()[2:]
|
||||
if content.endswith("$$"):
|
||||
math_content.append(content[:-2])
|
||||
else:
|
||||
math_content.append(content)
|
||||
i += 1
|
||||
while i < len(lines):
|
||||
if lines[i].strip().endswith("$$"):
|
||||
math_content.append(lines[i].strip()[:-2])
|
||||
break
|
||||
math_content.append(lines[i])
|
||||
i += 1
|
||||
|
||||
elements.append(
|
||||
MarkdownElement(type="math", content="\n".join(math_content))
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Heading
|
||||
heading_match = self.heading_pattern.match(line)
|
||||
if heading_match:
|
||||
level = len(heading_match.group(1))
|
||||
content = heading_match.group(2)
|
||||
elements.append(
|
||||
MarkdownElement(type="heading", content=content, level=level)
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Unordered list
|
||||
list_match = self.list_pattern.match(line)
|
||||
if list_match:
|
||||
indent = len(list_match.group(1))
|
||||
content = list_match.group(2)
|
||||
elements.append(
|
||||
MarkdownElement(type="list_item", content=content, level=indent // 2)
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Ordered list
|
||||
ordered_match = self.ordered_list_pattern.match(line)
|
||||
if ordered_match:
|
||||
indent = len(ordered_match.group(1))
|
||||
content = ordered_match.group(2)
|
||||
elements.append(
|
||||
MarkdownElement(
|
||||
type="ordered_list_item", content=content, level=indent // 2
|
||||
)
|
||||
)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Table (simple detection)
|
||||
if "|" in line and i + 1 < len(lines) and "---" in lines[i + 1]:
|
||||
table_lines = [line]
|
||||
i += 1
|
||||
while i < len(lines) and "|" in lines[i]:
|
||||
table_lines.append(lines[i])
|
||||
i += 1
|
||||
elements.append(
|
||||
MarkdownElement(type="table", content="\n".join(table_lines))
|
||||
)
|
||||
continue
|
||||
|
||||
# Regular paragraph
|
||||
if line.strip():
|
||||
elements.append(MarkdownElement(type="paragraph", content=line))
|
||||
|
||||
i += 1
|
||||
|
||||
return elements
|
||||
|
||||
def _add_element_to_doc(self, doc: Document, element: MarkdownElement) -> None:
|
||||
"""Add a markdown element to the document.
|
||||
|
||||
Args:
|
||||
doc: Word document.
|
||||
element: Parsed markdown element.
|
||||
"""
|
||||
if element.type == "heading":
|
||||
self._add_heading(doc, element.content, element.level)
|
||||
elif element.type == "paragraph":
|
||||
self._add_paragraph(doc, element.content)
|
||||
elif element.type == "list_item":
|
||||
self._add_list_item(doc, element.content, element.level, ordered=False)
|
||||
elif element.type == "ordered_list_item":
|
||||
self._add_list_item(doc, element.content, element.level, ordered=True)
|
||||
elif element.type == "code_block":
|
||||
self._add_code_block(doc, element.content)
|
||||
elif element.type == "table":
|
||||
self._add_table(doc, element.content)
|
||||
elif element.type == "math":
|
||||
self._add_math(doc, element.content)
|
||||
|
||||
def _add_heading(self, doc: Document, content: str, level: int) -> None:
|
||||
"""Add a heading to the document."""
|
||||
# Map markdown levels to Word heading styles
|
||||
heading_level = min(level, 9) # Word supports up to Heading 9
|
||||
doc.add_heading(content, level=heading_level)
|
||||
|
||||
def _add_paragraph(self, doc: Document, content: str) -> None:
|
||||
"""Add a paragraph with inline formatting."""
|
||||
para = doc.add_paragraph()
|
||||
self._add_formatted_text(para, content)
|
||||
|
||||
def _add_formatted_text(self, para, content: str) -> None:
|
||||
"""Add text with inline formatting (bold, italic, code)."""
|
||||
# Simple approach: process inline patterns
|
||||
remaining = content
|
||||
|
||||
while remaining:
|
||||
# Find next formatting marker
|
||||
bold_match = self.bold_pattern.search(remaining)
|
||||
italic_match = self.italic_pattern.search(remaining)
|
||||
code_match = self.inline_code_pattern.search(remaining)
|
||||
math_match = self.inline_math_pattern.search(remaining)
|
||||
|
||||
matches = [
|
||||
(bold_match, "bold"),
|
||||
(italic_match, "italic"),
|
||||
(code_match, "code"),
|
||||
(math_match, "math"),
|
||||
]
|
||||
matches = [(m, t) for m, t in matches if m]
|
||||
|
||||
if not matches:
|
||||
para.add_run(remaining)
|
||||
break
|
||||
|
||||
# Find earliest match
|
||||
earliest = min(matches, key=lambda x: x[0].start())
|
||||
match, match_type = earliest
|
||||
|
||||
# Add text before match
|
||||
if match.start() > 0:
|
||||
para.add_run(remaining[: match.start()])
|
||||
|
||||
# Add formatted text
|
||||
run = para.add_run(match.group(1))
|
||||
if match_type == "bold":
|
||||
run.bold = True
|
||||
elif match_type == "italic":
|
||||
run.italic = True
|
||||
elif match_type == "code":
|
||||
run.font.name = "Courier New"
|
||||
run.font.size = Pt(10)
|
||||
elif match_type == "math":
|
||||
run.italic = True
|
||||
|
||||
remaining = remaining[match.end() :]
|
||||
|
||||
def _add_list_item(
|
||||
self, doc: Document, content: str, level: int, ordered: bool
|
||||
) -> None:
|
||||
"""Add a list item."""
|
||||
para = doc.add_paragraph(style="List Bullet" if not ordered else "List Number")
|
||||
para.paragraph_format.left_indent = Inches(0.25 * level)
|
||||
self._add_formatted_text(para, content)
|
||||
|
||||
def _add_code_block(self, doc: Document, content: str) -> None:
|
||||
"""Add a code block."""
|
||||
para = doc.add_paragraph()
|
||||
para.paragraph_format.left_indent = Inches(0.5)
|
||||
|
||||
run = para.add_run(content)
|
||||
run.font.name = "Courier New"
|
||||
run.font.size = Pt(9)
|
||||
|
||||
# Add shading
|
||||
shading = OxmlElement("w:shd")
|
||||
shading.set(qn("w:val"), "clear")
|
||||
shading.set(qn("w:fill"), "F0F0F0")
|
||||
para._p.get_or_add_pPr().append(shading)
|
||||
|
||||
def _add_table(self, doc: Document, content: str) -> None:
|
||||
"""Add a table from markdown table format."""
|
||||
lines = [l.strip() for l in content.split("\n") if l.strip()]
|
||||
if len(lines) < 2:
|
||||
return
|
||||
|
||||
# Parse header
|
||||
header = [c.strip() for c in lines[0].split("|") if c.strip()]
|
||||
|
||||
# Skip separator line
|
||||
data_lines = lines[2:] if len(lines) > 2 else []
|
||||
|
||||
# Create table
|
||||
table = doc.add_table(rows=1, cols=len(header))
|
||||
table.style = "Table Grid"
|
||||
|
||||
# Add header
|
||||
header_cells = table.rows[0].cells
|
||||
for i, text in enumerate(header):
|
||||
header_cells[i].text = text
|
||||
header_cells[i].paragraphs[0].runs[0].bold = True
|
||||
|
||||
# Add data rows
|
||||
for line in data_lines:
|
||||
cells = [c.strip() for c in line.split("|") if c.strip()]
|
||||
row_cells = table.add_row().cells
|
||||
for i, text in enumerate(cells):
|
||||
if i < len(row_cells):
|
||||
row_cells[i].text = text
|
||||
|
||||
def _add_math(self, doc: Document, content: str) -> None:
|
||||
"""Add a math block.
|
||||
|
||||
For proper OMML rendering, this would need more complex conversion.
|
||||
Currently renders as italic text with the LaTeX source.
|
||||
"""
|
||||
para = doc.add_paragraph()
|
||||
para.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||||
|
||||
run = para.add_run(content)
|
||||
run.italic = True
|
||||
run.font.name = "Cambria Math"
|
||||
run.font.size = Pt(12)
|
||||
|
||||
139
app/services/image_processor.py
Normal file
139
app/services/image_processor.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Image preprocessing service using OpenCV."""
|
||||
|
||||
import base64
|
||||
import io
|
||||
from urllib.request import urlopen
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from app.core.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
class ImageProcessor:
|
||||
"""Service for image preprocessing operations."""
|
||||
|
||||
def __init__(self, padding_ratio: float | None = None):
|
||||
"""Initialize with padding ratio.
|
||||
|
||||
Args:
|
||||
padding_ratio: Ratio for padding on each side (default from settings).
|
||||
0.15 means 15% padding on each side = 30% total expansion.
|
||||
"""
|
||||
self.padding_ratio = padding_ratio or settings.image_padding_ratio
|
||||
|
||||
def load_image_from_url(self, url: str) -> np.ndarray:
|
||||
"""Load image from URL.
|
||||
|
||||
Args:
|
||||
url: Image URL to fetch.
|
||||
|
||||
Returns:
|
||||
Image as numpy array in BGR format.
|
||||
|
||||
Raises:
|
||||
ValueError: If image cannot be loaded from URL.
|
||||
"""
|
||||
try:
|
||||
with urlopen(url, timeout=30) as response:
|
||||
image_data = response.read()
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
return cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to load image from URL: {e}") from e
|
||||
|
||||
def load_image_from_base64(self, base64_str: str) -> np.ndarray:
|
||||
"""Load image from base64 string.
|
||||
|
||||
Args:
|
||||
base64_str: Base64-encoded image data.
|
||||
|
||||
Returns:
|
||||
Image as numpy array in BGR format.
|
||||
|
||||
Raises:
|
||||
ValueError: If image cannot be decoded.
|
||||
"""
|
||||
try:
|
||||
# Handle data URL format
|
||||
if "," in base64_str:
|
||||
base64_str = base64_str.split(",", 1)[1]
|
||||
|
||||
image_data = base64.b64decode(base64_str)
|
||||
image = Image.open(io.BytesIO(image_data))
|
||||
return cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to decode base64 image: {e}") from e
|
||||
|
||||
def add_padding(self, image: np.ndarray) -> np.ndarray:
|
||||
"""Add whitespace padding around the image.
|
||||
|
||||
Adds padding equal to padding_ratio * max(height, width) on each side.
|
||||
This expands the image by approximately 30% total (15% on each side).
|
||||
|
||||
Args:
|
||||
image: Input image as numpy array in BGR format.
|
||||
|
||||
Returns:
|
||||
Padded image as numpy array.
|
||||
"""
|
||||
height, width = image.shape[:2]
|
||||
padding = int(max(height, width) * self.padding_ratio)
|
||||
|
||||
# Add white padding on all sides
|
||||
padded_image = cv2.copyMakeBorder(
|
||||
image,
|
||||
top=padding,
|
||||
bottom=padding,
|
||||
left=padding,
|
||||
right=padding,
|
||||
borderType=cv2.BORDER_CONSTANT,
|
||||
value=[255, 255, 255], # White
|
||||
)
|
||||
|
||||
return padded_image
|
||||
|
||||
def preprocess(self, image_url: str | None, image_base64: str | None) -> np.ndarray:
|
||||
"""Load and preprocess image with padding.
|
||||
|
||||
Args:
|
||||
image_url: URL to fetch image from (optional).
|
||||
image_base64: Base64-encoded image (optional).
|
||||
|
||||
Returns:
|
||||
Preprocessed image with padding.
|
||||
|
||||
Raises:
|
||||
ValueError: If neither input is provided or loading fails.
|
||||
"""
|
||||
if image_url:
|
||||
image = self.load_image_from_url(image_url)
|
||||
elif image_base64:
|
||||
image = self.load_image_from_base64(image_base64)
|
||||
else:
|
||||
raise ValueError("Either image_url or image_base64 must be provided")
|
||||
|
||||
return self.add_padding(image)
|
||||
|
||||
def image_to_base64(self, image: np.ndarray, format: str = "PNG") -> str:
|
||||
"""Convert numpy image to base64 string.
|
||||
|
||||
Args:
|
||||
image: Image as numpy array in BGR format.
|
||||
format: Output format (PNG, JPEG).
|
||||
|
||||
Returns:
|
||||
Base64-encoded image string.
|
||||
"""
|
||||
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
pil_image = Image.fromarray(image_rgb)
|
||||
|
||||
buffer = io.BytesIO()
|
||||
pil_image.save(buffer, format=format)
|
||||
buffer.seek(0)
|
||||
|
||||
return base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||||
|
||||
119
app/services/layout_detector.py
Normal file
119
app/services/layout_detector.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""DocLayout-YOLO wrapper for document layout detection."""
|
||||
|
||||
import numpy as np
|
||||
|
||||
from app.schemas.image import LayoutInfo, LayoutRegion
|
||||
|
||||
|
||||
class LayoutDetector:
|
||||
"""Wrapper for DocLayout-YOLO model."""
|
||||
|
||||
# Class names from DocLayout-YOLO
|
||||
CLASS_NAMES = {
|
||||
0: "title",
|
||||
1: "plain_text",
|
||||
2: "abandon",
|
||||
3: "figure",
|
||||
4: "figure_caption",
|
||||
5: "table",
|
||||
6: "table_caption",
|
||||
7: "table_footnote",
|
||||
8: "isolate_formula",
|
||||
9: "formula_caption",
|
||||
}
|
||||
|
||||
# Classes considered as plain text
|
||||
PLAIN_TEXT_CLASSES = {"title", "plain_text", "figure_caption", "table_caption", "table_footnote"}
|
||||
|
||||
# Classes considered as formula
|
||||
FORMULA_CLASSES = {"isolate_formula", "formula_caption"}
|
||||
|
||||
def __init__(self, model_path: str, confidence_threshold: float = 0.2):
|
||||
"""Initialize the layout detector.
|
||||
|
||||
Args:
|
||||
model_path: Path to the DocLayout-YOLO model weights.
|
||||
confidence_threshold: Minimum confidence for detections.
|
||||
"""
|
||||
self.model_path = model_path
|
||||
self.confidence_threshold = confidence_threshold
|
||||
self.model = None
|
||||
|
||||
def load_model(self) -> None:
|
||||
"""Load the DocLayout-YOLO model.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If model cannot be loaded.
|
||||
"""
|
||||
try:
|
||||
from doclayout_yolo import YOLOv10
|
||||
|
||||
self.model = YOLOv10(self.model_path)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to load DocLayout-YOLO model: {e}") from e
|
||||
|
||||
def detect(self, image: np.ndarray, image_size: int = 1024) -> LayoutInfo:
|
||||
"""Detect document layout regions.
|
||||
|
||||
Args:
|
||||
image: Input image as numpy array in BGR format.
|
||||
image_size: Image size for prediction.
|
||||
|
||||
Returns:
|
||||
LayoutInfo with detected regions.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If model not loaded.
|
||||
"""
|
||||
if self.model is None:
|
||||
raise RuntimeError("Model not loaded. Call load_model() first.")
|
||||
|
||||
# Run prediction
|
||||
results = self.model.predict(
|
||||
image,
|
||||
imgsz=image_size,
|
||||
conf=self.confidence_threshold,
|
||||
device="cuda:0",
|
||||
)
|
||||
|
||||
regions: list[LayoutRegion] = []
|
||||
has_plain_text = False
|
||||
has_formula = False
|
||||
|
||||
if results and len(results) > 0:
|
||||
result = results[0]
|
||||
if result.boxes is not None:
|
||||
for box in result.boxes:
|
||||
cls_id = int(box.cls[0].item())
|
||||
confidence = float(box.conf[0].item())
|
||||
bbox = box.xyxy[0].tolist()
|
||||
|
||||
class_name = self.CLASS_NAMES.get(cls_id, f"unknown_{cls_id}")
|
||||
|
||||
# Map to simplified type
|
||||
if class_name in self.PLAIN_TEXT_CLASSES:
|
||||
region_type = "text"
|
||||
has_plain_text = True
|
||||
elif class_name in self.FORMULA_CLASSES:
|
||||
region_type = "formula"
|
||||
has_formula = True
|
||||
elif class_name in {"figure"}:
|
||||
region_type = "figure"
|
||||
elif class_name in {"table"}:
|
||||
region_type = "table"
|
||||
else:
|
||||
region_type = class_name
|
||||
|
||||
regions.append(
|
||||
LayoutRegion(
|
||||
type=region_type,
|
||||
bbox=bbox,
|
||||
confidence=confidence,
|
||||
)
|
||||
)
|
||||
|
||||
return LayoutInfo(
|
||||
regions=regions,
|
||||
has_plain_text=has_plain_text,
|
||||
has_formula=has_formula,
|
||||
)
|
||||
303
app/services/ocr_service.py
Normal file
303
app/services/ocr_service.py
Normal file
@@ -0,0 +1,303 @@
|
||||
"""PaddleOCR-VL client service for text and formula recognition."""
|
||||
|
||||
import io
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.schemas.image import LayoutInfo
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
class OCRService:
|
||||
"""Service for OCR using PaddleOCR-VL."""
|
||||
|
||||
FORMULA_PROMPT = "Please recognize the mathematical formula in this image and output in LaTeX format."
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vl_server_url: str | None = None,
|
||||
pp_doclayout_model_dir: str | None = None,
|
||||
):
|
||||
"""Initialize OCR service.
|
||||
|
||||
Args:
|
||||
vl_server_url: URL of the vLLM server for PaddleOCR-VL.
|
||||
pp_doclayout_model_dir: Path to PP-DocLayoutV2 model directory.
|
||||
"""
|
||||
self.vl_server_url = vl_server_url or settings.paddleocr_vl_url
|
||||
self.pp_doclayout_model_dir = pp_doclayout_model_dir or settings.pp_doclayout_model_dir
|
||||
self._pipeline = None
|
||||
|
||||
def _get_pipeline(self):
|
||||
"""Get or create PaddleOCR-VL pipeline.
|
||||
|
||||
Returns:
|
||||
PaddleOCRVL pipeline instance.
|
||||
"""
|
||||
if self._pipeline is None:
|
||||
from paddleocr import PaddleOCRVL
|
||||
|
||||
self._pipeline = PaddleOCRVL(
|
||||
vl_rec_backend="vllm-server",
|
||||
vl_rec_server_url=self.vl_server_url,
|
||||
layout_detection_model_name="PP-DocLayoutV2",
|
||||
layout_detection_model_dir=self.pp_doclayout_model_dir,
|
||||
)
|
||||
return self._pipeline
|
||||
|
||||
def _save_temp_image(self, image: np.ndarray) -> str:
|
||||
"""Save image to a temporary file.
|
||||
|
||||
Args:
|
||||
image: Image as numpy array in BGR format.
|
||||
|
||||
Returns:
|
||||
Path to temporary file.
|
||||
"""
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
|
||||
cv2.imwrite(f.name, image)
|
||||
return f.name
|
||||
|
||||
def recognize_mixed(self, image: np.ndarray) -> dict:
|
||||
"""Recognize mixed content (text + formulas) using PP-DocLayoutV2.
|
||||
|
||||
This mode uses PaddleOCR-VL with PP-DocLayoutV2 for document-aware
|
||||
recognition of mixed content.
|
||||
|
||||
Args:
|
||||
image: Input image as numpy array in BGR format.
|
||||
|
||||
Returns:
|
||||
Dict with 'markdown', 'latex', 'mathml' keys.
|
||||
"""
|
||||
try:
|
||||
pipeline = self._get_pipeline()
|
||||
temp_path = self._save_temp_image(image)
|
||||
|
||||
try:
|
||||
results = list(pipeline.predict(temp_path))
|
||||
|
||||
markdown_content = ""
|
||||
for result in results:
|
||||
# PaddleOCR-VL results can be saved to markdown
|
||||
md_buffer = io.StringIO()
|
||||
result.save_to_markdown(save_path=md_buffer)
|
||||
markdown_content += md_buffer.getvalue()
|
||||
|
||||
# Convert markdown to other formats
|
||||
latex = self._markdown_to_latex(markdown_content)
|
||||
mathml = self._extract_mathml(markdown_content)
|
||||
|
||||
return {
|
||||
"markdown": markdown_content,
|
||||
"latex": latex,
|
||||
"mathml": mathml,
|
||||
}
|
||||
finally:
|
||||
Path(temp_path).unlink(missing_ok=True)
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Mixed recognition failed: {e}") from e
|
||||
|
||||
def recognize_formula(self, image: np.ndarray) -> dict:
|
||||
"""Recognize formula/math content using PaddleOCR-VL with prompt.
|
||||
|
||||
This mode uses PaddleOCR-VL directly with a formula recognition prompt.
|
||||
|
||||
Args:
|
||||
image: Input image as numpy array in BGR format.
|
||||
|
||||
Returns:
|
||||
Dict with 'latex', 'markdown', 'mathml' keys.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
temp_path = self._save_temp_image(image)
|
||||
|
||||
try:
|
||||
# Use vLLM API directly for formula recognition
|
||||
import base64
|
||||
|
||||
with open(temp_path, "rb") as f:
|
||||
image_base64 = base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
# Call vLLM server with formula prompt
|
||||
response = httpx.post(
|
||||
f"{self.vl_server_url}/chat/completions",
|
||||
json={
|
||||
"model": "paddleocr-vl",
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": self.FORMULA_PROMPT},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{image_base64}"},
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
"max_tokens": 1024,
|
||||
},
|
||||
timeout=60.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
latex = result["choices"][0]["message"]["content"].strip()
|
||||
|
||||
# Convert latex to other formats
|
||||
markdown = self._latex_to_markdown(latex)
|
||||
mathml = self._latex_to_mathml(latex)
|
||||
|
||||
return {
|
||||
"latex": latex,
|
||||
"markdown": markdown,
|
||||
"mathml": mathml,
|
||||
}
|
||||
finally:
|
||||
Path(temp_path).unlink(missing_ok=True)
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise RuntimeError(f"Formula recognition failed: HTTP {e.response.status_code}") from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Formula recognition failed: {e}") from e
|
||||
|
||||
def recognize(self, image: np.ndarray, layout_info: LayoutInfo) -> dict:
|
||||
"""Recognize content based on layout detection results.
|
||||
|
||||
Args:
|
||||
image: Input image as numpy array in BGR format.
|
||||
layout_info: Layout detection results.
|
||||
|
||||
Returns:
|
||||
Dict with recognition results including mode used.
|
||||
"""
|
||||
# Decision logic:
|
||||
# - If plain text exists -> use mixed_recognition (PP-DocLayoutV2)
|
||||
# - Otherwise -> use formula_recognition (VL with prompt)
|
||||
if layout_info.has_plain_text:
|
||||
result = self.recognize_mixed(image)
|
||||
result["recognition_mode"] = "mixed_recognition"
|
||||
else:
|
||||
result = self.recognize_formula(image)
|
||||
result["recognition_mode"] = "formula_recognition"
|
||||
|
||||
return result
|
||||
|
||||
def _markdown_to_latex(self, markdown: str) -> str:
|
||||
"""Convert markdown to LaTeX.
|
||||
|
||||
Simple conversion - wraps content in LaTeX document structure.
|
||||
|
||||
Args:
|
||||
markdown: Markdown content.
|
||||
|
||||
Returns:
|
||||
LaTeX representation.
|
||||
"""
|
||||
# Basic conversion: preserve math blocks, convert structure
|
||||
lines = []
|
||||
in_code_block = False
|
||||
|
||||
for line in markdown.split("\n"):
|
||||
if line.startswith("```"):
|
||||
in_code_block = not in_code_block
|
||||
if in_code_block:
|
||||
lines.append("\\begin{verbatim}")
|
||||
else:
|
||||
lines.append("\\end{verbatim}")
|
||||
elif in_code_block:
|
||||
lines.append(line)
|
||||
elif line.startswith("# "):
|
||||
lines.append(f"\\section{{{line[2:]}}}")
|
||||
elif line.startswith("## "):
|
||||
lines.append(f"\\subsection{{{line[3:]}}}")
|
||||
elif line.startswith("### "):
|
||||
lines.append(f"\\subsubsection{{{line[4:]}}}")
|
||||
elif line.startswith("- "):
|
||||
lines.append(f"\\item {line[2:]}")
|
||||
elif line.startswith("$$"):
|
||||
lines.append(line.replace("$$", "\\[").replace("$$", "\\]"))
|
||||
elif "$" in line:
|
||||
# Keep inline math as-is
|
||||
lines.append(line)
|
||||
else:
|
||||
lines.append(line)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _latex_to_markdown(self, latex: str) -> str:
|
||||
"""Convert LaTeX to markdown.
|
||||
|
||||
Args:
|
||||
latex: LaTeX content.
|
||||
|
||||
Returns:
|
||||
Markdown representation.
|
||||
"""
|
||||
# Wrap LaTeX in markdown math block
|
||||
if latex.strip():
|
||||
return f"$$\n{latex}\n$$"
|
||||
return ""
|
||||
|
||||
def _latex_to_mathml(self, latex: str) -> str:
|
||||
"""Convert LaTeX to MathML.
|
||||
|
||||
Args:
|
||||
latex: LaTeX content.
|
||||
|
||||
Returns:
|
||||
MathML representation.
|
||||
"""
|
||||
# Basic LaTeX to MathML conversion
|
||||
# For production, consider using latex2mathml library
|
||||
if not latex.strip():
|
||||
return ""
|
||||
|
||||
try:
|
||||
# Try to use latex2mathml if available
|
||||
from latex2mathml.converter import convert
|
||||
|
||||
return convert(latex)
|
||||
except ImportError:
|
||||
# Fallback: wrap in basic MathML structure
|
||||
return f'<math xmlns="http://www.w3.org/1998/Math/MathML"><mtext>{latex}</mtext></math>'
|
||||
except Exception:
|
||||
return f'<math xmlns="http://www.w3.org/1998/Math/MathML"><mtext>{latex}</mtext></math>'
|
||||
|
||||
def _extract_mathml(self, markdown: str) -> str:
|
||||
"""Extract and convert math from markdown to MathML.
|
||||
|
||||
Args:
|
||||
markdown: Markdown content.
|
||||
|
||||
Returns:
|
||||
MathML for any math content found.
|
||||
"""
|
||||
import re
|
||||
|
||||
# Find all math blocks
|
||||
math_blocks = re.findall(r"\$\$(.*?)\$\$", markdown, re.DOTALL)
|
||||
inline_math = re.findall(r"\$([^$]+)\$", markdown)
|
||||
|
||||
all_math = math_blocks + inline_math
|
||||
|
||||
if not all_math:
|
||||
return ""
|
||||
|
||||
# Convert each to MathML and combine
|
||||
mathml_parts = []
|
||||
for latex in all_math:
|
||||
mathml = self._latex_to_mathml(latex.strip())
|
||||
if mathml:
|
||||
mathml_parts.append(mathml)
|
||||
|
||||
return "\n".join(mathml_parts)
|
||||
Reference in New Issue
Block a user