Bootstrap SomeTools Icon
返回
"""
@description:提取视频字幕
"""
from functools import partial
import cv2
from cnocr import CnOcr
import easyocr
from paddleocr import PaddleOCR
import pandas as pd
import Levenshtein


class VideoProcessor:
    def __init__(self, video_path):
        self.cap = cv2.VideoCapture(video_path)
        self.ocr_results = []  # 存储 OCR 识别结果
        self.fps_show = True  # 当前帧是否展示
        self.fps = 1  # 帧数
        self.drawing = False  # 记录是否正在绘制
        self.frame = None  # 原始帧
        self.temp_frame = None  # 临时帧(用于绘制)
        self.rectangle_data = {
            'top_left': (0, 0),
            'bottom_right': (0, 0),
        }
        self.ocr = CnOcr()
        frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
        video_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        self.frame_list = [i for i in range(1, frame_count+1, video_fps//3)]
    @staticmethod
    def preprocess_image(image):
        """ 预处理OCR图像,提高识别率 """
        if image is None:
            return None
        # 转为灰度图
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        # 调整对比度和亮度
        gray = cv2.convertScaleAbs(gray, alpha=1.5, beta=20)
        # 去除噪声
        gray = cv2.GaussianBlur(gray, (3, 3), 0)

        return gray

    def draw_rectangle(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            # 重新读取原始帧,清空旧的绘制
            self.temp_frame = self.frame.copy()
            self.drawing = True
            self.rectangle_data['top_left'] = (x, y)
            cv2.imshow("videoFirst", self.temp_frame)  # 清空旧的矩形

        elif event == cv2.EVENT_MOUSEMOVE:
            if self.drawing:
                # 重新绘制临时帧上的矩形
                self.temp_frame = self.frame.copy()
                bottom_right = (x, y)
                cv2.rectangle(self.temp_frame, self.rectangle_data["top_left"], bottom_right, (0, 255, 0), 1)
                cv2.imshow("videoFirst", self.temp_frame)

        elif event == cv2.EVENT_LBUTTONUP:
            # 记录最终矩形,并更新 self.frame
            self.drawing = False
            self.rectangle_data['bottom_right'] = (x, y)
            self.frame = self.frame.copy()  # 重新拷贝原始帧,避免叠加
            cv2.rectangle(self.temp_frame, self.rectangle_data["top_left"], self.rectangle_data["bottom_right"], (0, 255, 0), 1)
            cv2.imshow("videoFirst", self.temp_frame)

    def first_frame(self):
        # 待优化点:读取第一张有字幕的图片
        print("区域框选~~~") 
        ret, self.frame = self.cap.read()
        if self.fps_show and ret:
            # 读取第一帧,并创建窗口
            cv2.namedWindow("videoFirst")
            cv2.setMouseCallback("videoFirst", partial(self.draw_rectangle))
            self.temp_frame = self.frame.copy()  # 复制初始帧,防止绘制干扰原始帧
            while True:
                cv2.imshow("videoFirst", self.temp_frame)  # 持续显示当前帧
                key = cv2.waitKey(20) & 0xFF
                if key == 27:  # 按 ESC 退出
                    break
            self.fps_show = False
            cv2.destroyWindow("videoFirst")

    def read_video(self):
        global out_text
        print("帧读取~~~")
        x1, y1 = (min(self.rectangle_data["top_left"][0], self.rectangle_data["bottom_right"][0]),
                  min(self.rectangle_data["top_left"][1], self.rectangle_data["bottom_right"][1]))
        x2, y2 = (max(self.rectangle_data["top_left"][0], self.rectangle_data["bottom_right"][0]),
                  max(self.rectangle_data["top_left"][1], self.rectangle_data["bottom_right"][1]))

        # 限制裁剪区域在图片范围内
        height, width = self.frame.shape[:2]
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(width, x2), min(height, y2)

        while True:
            # ret, fra = self.cap.read()
            # if not ret:
            #     raise f"视频读取失败或结束,第 {self.fps} 帧"
            if len(self.frame_list) == 0:
                break
            else:
                self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.frame_list[0])
                ret, frame = self.cap.read()
            # 预处理图像
            # frame = VideoProcessor.preprocess_image(fra)

            # 裁剪图像
            cropped_image = frame[y1:y2, x1:x2]

            if cropped_image.size == 0:
                print("裁剪区域无效,跳过当前帧")
                continue  # 跳过当前帧

            try:
                '''
                # cnocr
                ocr = CnOcr()  # 使用默认OCR设置
                out = ocr.ocr_for_single_line(cropped_image)
                if len(out) == 0:
                    out_text = ""
                else:
                    out_text = out["text"]
                '''
                '''
                # easyocr
                # 初始化阅读器(支持多语言)
                reader = easyocr.Reader(['ch_sim'])  # 简体中文
                # 提取文字
                results = reader.readtext(cropped_image)
                if len(results) > 0:
                    for (bbox, text, confidence) in results:
                        out_text = text
                if len(results) == 0:
                    out_text = "/"
                '''
                ocr = PaddleOCR(use_angle_cls=True, lang='ch')  # 中文
                # 识别图片
                result = ocr.ocr(cropped_image, cls=True)
                # 提取文字
                if not result or not result[0]:
                    # out_text = ""
                    pass
                else:
                    out_text = " ".join([line[1][0] for block in result for line in block if line])
                print("----:" + out_text)
                self.ocr_results.append((self.fps, self.fps, out_text))
            except Exception as e:
                print(f"第 {self.fps} 帧OCR处理失败: {e}")

            self.fps = self.frame_list[0]
            self.frame_list.pop(0)

    @staticmethod
    def convert_seconds_to_hms(seconds):
        """ 将秒数转换为 hh:mm:ss.mmm 格式 """
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        seconds = seconds % 60
        milliseconds = int((seconds - int(seconds)) * 1000)  # 获取毫秒部分
        return f"{hours:02}:{minutes:02}:{int(seconds):02}.{milliseconds:03}"

    def write_result(self):
        df = pd.DataFrame(self.ocr_results, columns=["FPS_begin", "FPS_end", "Text"])
        df["status"] = 0  # 1代表被删除/跳过,0表示保留
        video_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        first_fps_time = round(1 / video_fps, 3)
        end_fps_time = round(1 - (first_fps_time * 29), 3)

        main_index = 0
        next_index = 1
        for index in range(0, df.shape[0] - 1):
            text_1 = df.loc[main_index, "Text"]
            text_2 = df.loc[next_index, "Text"]
            distance = Levenshtein.distance(text_1, text_2)
            similarity = round(1 - distance / max(len(text_1), len(text_2)), 2)
            if distance <= 2 and similarity >= 0.85:
                df.loc[main_index, "FPS_end"] = df.loc[next_index, "FPS_end"]
                df.loc[main_index, "Text"] = df.loc[next_index, "Text"]
                df.loc[next_index, "status"] = 1
                next_index += 1
            else:
                main_index = next_index
                next_index += 1
        result = df[df["status"] == 0]
        for index, caption in result.iterrows():
            beg_second = int(caption["FPS_begin"]) // 30  # 秒数
            beg_point = int(caption["FPS_begin"]) % 30 * first_fps_time  # 秒小数
            end_second = int(caption["FPS_end"]) // 30
            end_point = int(caption["FPS_end"]) % 30 * first_fps_time
            # 将开始时间和结束时间转换为 hh:mm:ss.mmm 格式
            result.loc[index, "FPS_begin"] = VideoProcessor.convert_seconds_to_hms(beg_second + beg_point)
            result.loc[index, "FPS_end"] = VideoProcessor.convert_seconds_to_hms(end_second + end_point)
        result_caption = result.iloc[:, :-1].values.tolist()
        with open(file="files/captions.txt", mode="w", encoding="utf-8") as f:
            for caption in result_caption:
                f.write(f"{caption[0]},{caption[1]},{caption[2]}\n")

    def close(self):
        self.cap.release()


def run():
    video_processor = VideoProcessor("video/1.mp4")
    video_processor.first_frame()
    video_processor.read_video()
    video_processor.write_result()
    video_processor.close()
    cv2.destroyAllWindows()


if __name__ == "__main__":
    try:
        run()
    except Exception as e:
        print(e)