import os import json import logging import requests import base64 from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS import openai import PyPDF2 import time from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 获取API密钥 #openai_api_key = os.getenv("OPENAI_API_KEY") #openai_api_key = "sk-95ab48a1e0754ad39c13e2987f73fe37" #openai_base_url = "https://api.deepseek.com" openai_api_key = "sk-iVgiSZeNbLbTtp0lCvpIz2P0TpBGFLrcWdp5vDFtUFGfXCOs" openai_base_url = "https://api.chatanywhere.tech" llm_model = "gpt-4o-mini-2024-07-18" # TTS API地址 TTS_BASE_URL = "http://feng-arch.cn:31006" if not openai_api_key: logger.warning("OpenAI API key not found. AI explanation will use fallback mode.") # 加载设置 try: with open('setting.json', 'r') as f: settings = json.load(f) port = settings.get('websocket_port', 6006) except Exception as e: logger.error(f"Error loading settings: {e}") port = 6006 app = Flask(__name__, static_url_path='') CORS(app) # 存储当前加载的PDF路径 current_pdf_path = None pdfpages = None def extract_page_text(pdf_path, page_num): """提取PDF文档指定页面的文本内容""" try: with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) # 检查页码是否有效 if page_num < 1 or page_num > len(reader.pages): return { "success": False, "error": f"无效的页码: {page_num},PDF共有 {len(reader.pages)} 页" } # 提取指定页面的文本 page = reader.pages[page_num - 1] # 页码从1开始,但索引从0开始 page_text = page.extract_text() return { "success": True, "page_text": page_text, "page_count": len(reader.pages) } except Exception as e: logger.error(f"Error extracting PDF text: {e}") return { "success": False, "error": str(e) } def generate_explanation(page_num,page_text): """为单个页面生成讲解内容""" if not openai_api_key: return "这是一个示例讲解。请设置OpenAI API密钥以获取真实的AI讲解内容。" try: client = openai.OpenAI(api_key=openai_api_key, base_url=openai_base_url) response = client.chat.completions.create( model= llm_model, messages=[ {"role": "system", "content": "你是一位幽默的教师,正在为学生讲解PDF文档内容。请提供清晰、简洁的解释,重点突出关键概念。"}, {"role": "user", "content": f"请讲解第{page_num}页(总页数{pdfpages})ppt的内容:{page_text},首先判断是否要详细或者简略,比如标题页只需要简略,示例稍微展开,记住ppt不宜讲得太长。你的输出应符合老师的风格,句子间连贯,幽默风趣。"} ] ) return response.choices[0].message.content.strip() except Exception as e: logger.error(f"Error generating explanation: {e}") return f"生成讲解时出错: {str(e)}" def split_text_to_sentences(text): """将文本分割为句子""" # 使用正则表达式分割句子 import re # 匹配中文和英文的句子结束标志 sentence_endings = r'(?<=[。!?.!?])\s*' sentences = re.split(sentence_endings, text) # 过滤空句子 sentences = [s.strip() for s in sentences if s.strip()] return sentences def text_to_speech(text, voice="zf_xiaoxiao", speed=1.5): """将文本转换为语音,返回每两句话的音频数据和时间戳""" try: # 分割文本为句子 sentences = split_text_to_sentences(text) if not sentences: return { "success": False, "error": "无法分割文本为句子" } # 将句子按2句一组进行分组 sentence_pairs = [] i = 0 while i < len(sentences): if i + 1 < len(sentences) and len(sentences[i]) + len(sentences[i+1]) < 40: sentence_pairs.append({ "text": sentences[i] + " " + sentences[i+1], "sentences": [sentences[i], sentences[i+1]], "indices": [i, i+1] }) i += 2 # 正确跳过已合并的句子 else: sentence_pairs.append({ "text": sentences[i], "sentences": [sentences[i]], "indices": [i] }) i += 1 # 正常递增 # 存储所有音频段和时间戳 audio_segments = [] timestamps = [] # 为每组句子生成音频 for pair_idx, pair_data in enumerate(sentence_pairs): # 构建请求数据 url = f"{TTS_BASE_URL}/tts" payload = { "text": pair_data["text"], "voice": voice, "speed": speed, "return_type": "base64" } response = requests.post(url, json=payload) if response.status_code != 200: logger.error(f"TTS API error for pair {pair_idx}: {response.status_code} - {response.text}") continue data = response.json() audio_base64 = data.get("audio_base64") if not audio_base64: logger.error(f"No audio data returned for pair {pair_idx}") continue # 添加音频段 audio_segments.append({ "audio_base64": audio_base64, "sentences": pair_data["sentences"], "indices": pair_data["indices"] }) return { "success": True, "audio_segments": audio_segments, "sentences": sentences } except Exception as e: logger.error(f"Error in text_to_speech: {e}") return { "success": False, "error": str(e) } @app.route('/') def index(): return send_from_directory('', 'index.html') @app.route('/') def serve_static(path): return send_from_directory('', path) @app.route('/api/explain', methods=['POST']) def explain(): data = request.json text = data.get('text', '') page_num = data.get('page', None) # 如果提供了页码但没有提供文本,尝试从PDF中提取 if page_num and not text and current_pdf_path: result = extract_page_text(current_pdf_path, page_num) if result["success"]: text = result["page_text"] else: return jsonify({ 'success': False, 'explanation': f"无法提取页面文本: {result['error']}" }) explanation = generate_explanation(page_num,text) return jsonify({ 'success': True, 'explanation': explanation }) @app.route('/api/tts', methods=['POST']) def tts(): data = request.json text = data.get('text', '') voice = data.get('voice', 'zf_xiaoxiao') speed = data.get('speed', 1.0) if not text: return jsonify({ 'success': False, 'error': '文本不能为空' }) # 将文本转换为语音 result = text_to_speech(text, voice, speed) if result["success"]: return jsonify({ 'success': True, 'audio_segments': result["audio_segments"], 'sentences': result.get("sentences", []) }) else: return jsonify({ 'success': False, 'error': result["error"] }) @app.route('/api/explain_with_audio', methods=['POST']) def explain_with_audio(): data = request.json text = data.get('text', '') page_num = data.get('page', None) voice = data.get('voice', 'zf_xiaoxiao') speed = data.get('speed', 1.0) # 如果提供了页码但没有提供文本,尝试从PDF中提取 if page_num and not text and current_pdf_path: result = extract_page_text(current_pdf_path, page_num) if result["success"]: text = result["page_text"] else: return jsonify({ 'success': False, 'explanation': f"无法提取页面文本: {result['error']}", 'error': result["error"] }) # 生成讲解 explanation = generate_explanation(page_num,text) # 将讲解转换为语音 tts_result = text_to_speech(explanation, voice, speed) if tts_result["success"]: return jsonify({ 'success': True, 'explanation': explanation, 'audio_segments': tts_result["audio_segments"], 'sentences': tts_result.get("sentences", []) }) else: return jsonify({ 'success': True, 'explanation': explanation, 'audio_segments': None, 'tts_error': tts_result["error"] }) @app.route('/api/load_pdf', methods=['POST']) def load_pdf(): global current_pdf_path data = request.json pdf_path = data.get('path', './public/pdf/test.pdf') try: # 检查PDF是否存在 if not os.path.exists(pdf_path): return jsonify({ 'success': False, 'message': f'PDF文件不存在: {pdf_path}' }) # 尝试打开PDF以验证其有效性 with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) page_count = len(reader.pages) # 更新当前PDF路径 current_pdf_path = pdf_path pdfpages = page_count return jsonify({ 'success': True, 'message': '已成功加载PDF', 'page_count': page_count }) except Exception as e: logger.error(f"Error loading PDF: {e}") return jsonify({ 'success': False, 'message': f'加载PDF失败: {str(e)}' }) @app.route('/api/voices', methods=['GET']) def get_voices(): """获取可用的TTS声音列表""" voices = [ {"id": "zf_xiaoxiao", "name": "小小", "gender": "female", "lang": "zh"}, {"id": "zf_xiaoni", "name": "小妮", "gender": "female", "lang": "zh"}, {"id": "zf_xiaoyi", "name": "小怡", "gender": "female", "lang": "zh"}, {"id": "zf_xiaobei", "name": "小贝", "gender": "female", "lang": "zh"}, {"id": "zm_yunxi", "name": "云熙", "gender": "male", "lang": "zh"}, {"id": "zm_yunyang", "name": "云扬", "gender": "male", "lang": "zh"}, {"id": "zm_yunxia", "name": "云夏", "gender": "male", "lang": "zh"}, {"id": "zm_yunjian", "name": "云健", "gender": "male", "lang": "zh"}, {"id": "af_heart", "name": "Heart", "gender": "female", "lang": "en"}, {"id": "af_bella", "name": "Bella", "gender": "female", "lang": "en"}, {"id": "am_michael", "name": "Michael", "gender": "male", "lang": "en"}, {"id": "am_puck", "name": "Puck", "gender": "male", "lang": "en"} ] return jsonify({ 'success': True, 'voices': voices }) if __name__ == '__main__': # 设置默认PDF路径 default_pdf_path = './public/pdf/test.pdf' if os.path.exists(default_pdf_path): current_pdf_path = default_pdf_path logger.info(f"默认PDF已设置: {default_pdf_path}") app.run(host='0.0.0.0', port=port, debug=True)