Switch PDF page rendering from external pdftoppm/pdftocairo subprocess calls to github.com/gen2brain/go-fitz (MuPDF wrapper), eliminating the poppler-utils runtime dependency. Enable CGO in Dockerfile builder stage and install gcc/musl-dev for the static MuPDF link; runtime image remains unchanged.
339 lines
9.6 KiB
Go
339 lines
9.6 KiB
Go
package service
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/base64"
|
||
"encoding/json"
|
||
"fmt"
|
||
"image/png"
|
||
"io"
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/gen2brain/go-fitz"
|
||
|
||
pdfmodel "gitea.com/texpixel/document_ai/internal/model/pdf"
|
||
"gitea.com/texpixel/document_ai/internal/storage/cache"
|
||
"gitea.com/texpixel/document_ai/internal/storage/dao"
|
||
"gitea.com/texpixel/document_ai/pkg/common"
|
||
"gitea.com/texpixel/document_ai/pkg/httpclient"
|
||
"gitea.com/texpixel/document_ai/pkg/log"
|
||
"gitea.com/texpixel/document_ai/pkg/oss"
|
||
"gitea.com/texpixel/document_ai/pkg/requestid"
|
||
"gitea.com/texpixel/document_ai/pkg/utils"
|
||
"gorm.io/gorm"
|
||
|
||
"gitea.com/texpixel/document_ai/internal/model/formula"
|
||
)
|
||
|
||
const (
|
||
pdfMaxPages = 10
|
||
pdfOCREndpoint = "https://cloud.texpixel.com:10443/doc_process/v1/image/ocr"
|
||
)
|
||
|
||
// PDFRecognitionService 处理 PDF 识别任务
|
||
type PDFRecognitionService struct {
|
||
db *gorm.DB
|
||
queueLimit chan struct{}
|
||
stopChan chan struct{}
|
||
httpClient *httpclient.Client
|
||
}
|
||
|
||
func NewPDFRecognitionService() *PDFRecognitionService {
|
||
s := &PDFRecognitionService{
|
||
db: dao.DB,
|
||
queueLimit: make(chan struct{}, 3),
|
||
stopChan: make(chan struct{}),
|
||
httpClient: httpclient.NewClient(nil),
|
||
}
|
||
|
||
utils.SafeGo(func() {
|
||
lock, err := cache.GetPDFDistributedLock(context.Background())
|
||
if err != nil || !lock {
|
||
log.Error(context.Background(), "func", "NewPDFRecognitionService", "msg", "获取PDF分布式锁失败")
|
||
return
|
||
}
|
||
s.processPDFQueue(context.Background())
|
||
})
|
||
|
||
return s
|
||
}
|
||
|
||
// CreatePDFTask 创建识别任务并入队
|
||
func (s *PDFRecognitionService) CreatePDFTask(ctx context.Context, req *pdfmodel.CreatePDFRecognitionRequest) (*dao.RecognitionTask, error) {
|
||
task := &dao.RecognitionTask{
|
||
UserID: req.UserID,
|
||
TaskUUID: utils.NewUUID(),
|
||
TaskType: dao.TaskTypePDF,
|
||
Status: dao.TaskStatusPending,
|
||
FileURL: req.FileURL,
|
||
FileName: req.FileName,
|
||
FileHash: req.FileHash,
|
||
IP: common.GetIPFromContext(ctx),
|
||
}
|
||
|
||
if err := dao.NewRecognitionTaskDao().Create(dao.DB.WithContext(ctx), task); err != nil {
|
||
log.Error(ctx, "func", "CreatePDFTask", "msg", "创建任务失败", "error", err)
|
||
return nil, common.NewError(common.CodeDBError, "创建任务失败", err)
|
||
}
|
||
|
||
if _, err := cache.PushPDFTask(ctx, task.ID); err != nil {
|
||
log.Error(ctx, "func", "CreatePDFTask", "msg", "推入队列失败", "error", err)
|
||
return nil, common.NewError(common.CodeSystemError, "推入队列失败", err)
|
||
}
|
||
|
||
return task, nil
|
||
}
|
||
|
||
// GetPDFTask 查询任务状态和结果
|
||
func (s *PDFRecognitionService) GetPDFTask(ctx context.Context, taskNo string, callerUserID int64) (*pdfmodel.GetPDFTaskResponse, error) {
|
||
sess := dao.DB.WithContext(ctx)
|
||
task, err := dao.NewRecognitionTaskDao().GetByTaskNo(sess, taskNo)
|
||
if err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
return nil, common.NewError(common.CodeNotFound, "任务不存在", err)
|
||
}
|
||
return nil, common.NewError(common.CodeDBError, "查询任务失败", err)
|
||
}
|
||
|
||
// 类型校验:防止公式任务被当成 PDF 解析
|
||
if task.TaskType != dao.TaskTypePDF {
|
||
return nil, common.NewError(common.CodeNotFound, "任务不存在", nil)
|
||
}
|
||
|
||
// 归属校验:已登录用户只能查询自己的任务
|
||
if callerUserID != 0 && task.UserID != 0 && callerUserID != task.UserID {
|
||
return nil, common.NewError(common.CodeNotFound, "任务不存在", nil)
|
||
}
|
||
|
||
resp := &pdfmodel.GetPDFTaskResponse{
|
||
TaskNo: taskNo,
|
||
Status: int(task.Status),
|
||
}
|
||
|
||
if task.Status != dao.TaskStatusCompleted {
|
||
return resp, nil
|
||
}
|
||
|
||
result, err := dao.NewRecognitionResultDao().GetByTaskID(sess, task.ID)
|
||
if err != nil || result == nil {
|
||
return nil, common.NewError(common.CodeDBError, "查询识别结果失败", err)
|
||
}
|
||
|
||
pages, err := result.GetPDFContent()
|
||
if err != nil {
|
||
return nil, common.NewError(common.CodeSystemError, "解析识别结果失败", err)
|
||
}
|
||
|
||
resp.TotalPages = len(pages)
|
||
for _, p := range pages {
|
||
resp.Pages = append(resp.Pages, pdfmodel.PDFPageResult{
|
||
PageNumber: p.PageNumber,
|
||
Markdown: p.Markdown,
|
||
})
|
||
}
|
||
|
||
return resp, nil
|
||
}
|
||
|
||
// processPDFQueue 持续消费队列
|
||
func (s *PDFRecognitionService) processPDFQueue(ctx context.Context) {
|
||
for {
|
||
select {
|
||
case <-s.stopChan:
|
||
return
|
||
default:
|
||
s.processOnePDFTask(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *PDFRecognitionService) processOnePDFTask(ctx context.Context) {
|
||
s.queueLimit <- struct{}{}
|
||
defer func() { <-s.queueLimit }()
|
||
|
||
taskID, err := cache.PopPDFTask(ctx)
|
||
if err != nil {
|
||
log.Error(ctx, "func", "processOnePDFTask", "msg", "获取任务失败", "error", err)
|
||
return
|
||
}
|
||
|
||
task, err := dao.NewRecognitionTaskDao().GetTaskByID(dao.DB.WithContext(ctx), taskID)
|
||
if err != nil || task == nil {
|
||
log.Error(ctx, "func", "processOnePDFTask", "msg", "任务不存在", "task_id", taskID)
|
||
return
|
||
}
|
||
|
||
ctx = context.WithValue(ctx, utils.RequestIDKey, task.TaskUUID)
|
||
requestid.SetRequestID(task.TaskUUID, func() {
|
||
if err := s.processPDFTask(ctx, taskID, task.FileURL); err != nil {
|
||
log.Error(ctx, "func", "processOnePDFTask", "msg", "处理PDF任务失败", "error", err)
|
||
}
|
||
})
|
||
}
|
||
|
||
// processPDFTask 核心处理:下载 → pre-hook → 逐页OCR → 写入DB
|
||
func (s *PDFRecognitionService) processPDFTask(ctx context.Context, taskID int64, fileURL string) error {
|
||
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||
defer cancel()
|
||
|
||
taskDao := dao.NewRecognitionTaskDao()
|
||
resultDao := dao.NewRecognitionResultDao()
|
||
|
||
isSuccess := false
|
||
defer func() {
|
||
status, remark := dao.TaskStatusFailed, "任务处理失败"
|
||
if isSuccess {
|
||
status, remark = dao.TaskStatusCompleted, ""
|
||
}
|
||
_ = taskDao.Update(dao.DB.WithContext(context.Background()),
|
||
map[string]interface{}{"id": taskID},
|
||
map[string]interface{}{"status": status, "completed_at": time.Now(), "remark": remark},
|
||
)
|
||
}()
|
||
|
||
// 更新为处理中
|
||
if err := taskDao.Update(dao.DB.WithContext(ctx),
|
||
map[string]interface{}{"id": taskID},
|
||
map[string]interface{}{"status": dao.TaskStatusProcessing},
|
||
); err != nil {
|
||
return fmt.Errorf("更新任务状态失败: %w", err)
|
||
}
|
||
|
||
// 下载 PDF
|
||
reader, err := oss.DownloadFile(ctx, fileURL)
|
||
if err != nil {
|
||
return fmt.Errorf("下载PDF失败: %w", err)
|
||
}
|
||
defer reader.Close()
|
||
|
||
pdfBytes, err := io.ReadAll(reader)
|
||
if err != nil {
|
||
return fmt.Errorf("读取PDF数据失败: %w", err)
|
||
}
|
||
|
||
// pre-hook: 用 pdftoppm 渲染前 pdfMaxPages 页为 PNG
|
||
pageImages, err := renderPDFPages(ctx, pdfBytes, pdfMaxPages)
|
||
if err != nil {
|
||
return fmt.Errorf("渲染PDF页面失败: %w", err)
|
||
}
|
||
|
||
processPages := len(pageImages)
|
||
log.Info(ctx, "func", "processPDFTask", "msg", "开始处理PDF", "task_id", taskID, "process_pages", processPages)
|
||
|
||
// 逐页 OCR,结果收集
|
||
var pages []dao.PDFPageContent
|
||
for i, imgBytes := range pageImages {
|
||
ocrResult, err := s.callOCR(ctx, imgBytes)
|
||
if err != nil {
|
||
return fmt.Errorf("OCR第%d页失败: %w", i+1, err)
|
||
}
|
||
|
||
pages = append(pages, dao.PDFPageContent{
|
||
PageNumber: i + 1,
|
||
Markdown: ocrResult.Markdown,
|
||
})
|
||
log.Info(ctx, "func", "processPDFTask", "msg", "页面OCR完成", "page", i+1, "total", processPages)
|
||
}
|
||
|
||
// 序列化并写入 DB(单行)
|
||
contentJSON, err := dao.MarshalPDFContent(pages)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化PDF内容失败: %w", err)
|
||
}
|
||
|
||
dbResult := dao.RecognitionResult{
|
||
TaskID: taskID,
|
||
TaskType: dao.TaskTypePDF,
|
||
Content: contentJSON,
|
||
}
|
||
if err := dbResult.SetMetaData(dao.ResultMetaData{TotalNum: processPages}); err != nil {
|
||
return fmt.Errorf("序列化MetaData失败: %w", err)
|
||
}
|
||
if err := resultDao.Create(dao.DB.WithContext(ctx), dbResult); err != nil {
|
||
return fmt.Errorf("保存PDF结果失败: %w", err)
|
||
}
|
||
|
||
isSuccess = true
|
||
return nil
|
||
}
|
||
|
||
// renderPDFPages 使用 go-fitz 将 PDF 渲染为 PNG 字节切片,最多渲染 maxPages 页
|
||
func renderPDFPages(ctx context.Context, pdfBytes []byte, maxPages int) ([][]byte, error) {
|
||
doc, err := fitz.NewFromMemory(pdfBytes)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("打开PDF失败: %w", err)
|
||
}
|
||
defer doc.Close()
|
||
|
||
total := doc.NumPage()
|
||
if total == 0 {
|
||
return nil, fmt.Errorf("PDF不包含任何页面")
|
||
}
|
||
if maxPages > 0 && total > maxPages {
|
||
total = maxPages
|
||
}
|
||
|
||
pages := make([][]byte, 0, total)
|
||
for i := 0; i < total; i++ {
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil, ctx.Err()
|
||
default:
|
||
}
|
||
|
||
img, err := doc.Image(i)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("渲染第%d页失败: %w", i+1, err)
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := png.Encode(&buf, img); err != nil {
|
||
return nil, fmt.Errorf("编码第%d页PNG失败: %w", i+1, err)
|
||
}
|
||
pages = append(pages, buf.Bytes())
|
||
}
|
||
|
||
return pages, nil
|
||
}
|
||
|
||
// callOCR 调用与公式识别相同的下游 OCR 接口
|
||
func (s *PDFRecognitionService) callOCR(ctx context.Context, imgBytes []byte) (*formula.ImageOCRResponse, error) {
|
||
reqBody := map[string]string{
|
||
"image_base64": base64.StdEncoding.EncodeToString(imgBytes),
|
||
}
|
||
jsonData, err := json.Marshal(reqBody)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
headers := map[string]string{
|
||
"Content-Type": "application/json",
|
||
utils.RequestIDHeaderKey: utils.GetRequestIDFromContext(ctx),
|
||
}
|
||
|
||
resp, err := s.httpClient.RequestWithRetry(ctx, http.MethodPost, pdfOCREndpoint, bytes.NewReader(jsonData), headers)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("请求OCR接口失败: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 下游非 2xx 视为失败,避免把错误响应 body 当成识别结果存库
|
||
if resp.StatusCode != http.StatusOK {
|
||
body, _ := io.ReadAll(resp.Body)
|
||
return nil, fmt.Errorf("OCR接口返回非200状态: %d, body: %s", resp.StatusCode, string(body))
|
||
}
|
||
|
||
var ocrResp formula.ImageOCRResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&ocrResp); err != nil {
|
||
return nil, fmt.Errorf("解析OCR响应失败: %w", err)
|
||
}
|
||
|
||
return &ocrResp, nil
|
||
}
|
||
|
||
func (s *PDFRecognitionService) Stop() {
|
||
close(s.stopChan)
|
||
}
|