-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmcp2brave.py
474 lines (396 loc) · 17.1 KB
/
mcp2brave.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
import os
import logging
import sys
import requests
from datetime import datetime
from dotenv import load_dotenv
from fastmcp import FastMCP
from logging.handlers import RotatingFileHandler
# 设置默认编码为UTF-8
if sys.stdout.encoding != 'utf-8':
try:
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
pass
# 读取环境变量
load_dotenv()
# 准备日志
def setup_logger(name):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# 创建logs目录(如果不存在)
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 文件处理器 - 使用 RotatingFileHandler 限制文件大小
log_file = os.path.join(log_dir, f"{name}.log")
file_handler = RotatingFileHandler(
log_file,
maxBytes=1024*1024, # 1MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# 设置格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# 使用新的日志设置
logger = setup_logger("mcp2brave")
logger.info("Logger initialized - outputs to both console and file in logs directory")
# Create an MCP server
mcp = FastMCP("mcp2brave", dependencies=["python-dotenv", "requests"])
# 准备API密钥
API_KEY = os.getenv("BRAVE_API_KEY")
if not API_KEY:
logger.error("BRAVE_API_KEY environment variable not found")
raise ValueError("BRAVE_API_KEY environment variable required")
def _detect_language(text: str) -> str:
"""检测文本语言并返回对应的语言代码"""
# 定义语言检测规则
LANGUAGE_PATTERNS = {
# 中文 (简体和繁体)
'zh-hans': ('\u4e00', '\u9fff'), # 简体中文
'zh-hant': ('\u4e00', '\u9fff'), # 繁体中文
# 日文
'jp': ('\u3040', '\u309f', '\u30a0', '\u30ff'), # 平假名和片假名
# 韩文
'ko': ('\uac00', '\ud7af'), # 谚文
# 俄文
'ru': ('\u0400', '\u04ff'), # 西里尔字母
# 阿拉伯文
'ar': ('\u0600', '\u06ff'),
# 希伯来文
'he': ('\u0590', '\u05ff'),
# 泰文
'th': ('\u0e00', '\u0e7f'),
# 越南文 (使用扩展拉丁字母)
'vi': ('àáạảãâầấậẩẫăằắặẳẵèéẹẻẽêềếệểễìíịỉĩòóọỏõôồốộổỗơờớợởỡùúụủũưừứựửữỳýỵỷỹđ'),
# 印地文
'hi': ('\u0900', '\u097f'),
# 泰米尔文
'ta': ('\u0b80', '\u0bff'),
# 特卢固文
'te': ('\u0c00', '\u0c7f'),
}
def contains_chars_in_range(text, *ranges):
"""检查文本是否包含指定Unicode范围内的字符"""
if len(ranges) % 2 == 0: # 范围对
for i in range(0, len(ranges), 2):
start, end = ranges[i:i+2]
if any(start <= char <= end for char in text):
return True
else: # 字符列表
return any(char in ranges[0] for char in text)
return False
# 检测常见的非拉丁文字系统
for lang, pattern in LANGUAGE_PATTERNS.items():
if contains_chars_in_range(text, *pattern):
# 对中文进行简繁体识别(这里使用简单规则,实际应用可能需要更复杂的逻辑)
if lang in ['zh-hans', 'zh-hant']:
# 这里可以添加更复杂的简繁体识别逻辑
return 'zh-hans' # 默认返回简体中文
return lang
# 检测拉丁字母语言(简单示例)
# 注意:这是一个非常简化的实现,实际应用可能需要更复杂的语言检测
LATIN_PATTERNS = {
'es': ['ñ', 'á', 'é', 'í', 'ó', 'ú', '¿', '¡'],
'fr': ['é', 'è', 'ê', 'à', 'ç', 'ù', 'û', 'ï'],
'de': ['ä', 'ö', 'ü', 'ß'],
'pt-pt': ['ã', 'õ', 'á', 'é', 'í', 'ó', 'ú', 'â', 'ê', 'ô'],
'it': ['à', 'è', 'é', 'ì', 'ò', 'ó', 'ù'],
}
for lang, patterns in LATIN_PATTERNS.items():
if any(pattern in text.lower() for pattern in patterns):
return lang
# 默认返回英语
return "en"
def _extract_text_from_html(html_content: str) -> str:
"""从HTML内容中提取有意义的文本"""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# 移除不需要的元素
for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'iframe', 'ad', '.advertisement']):
element.decompose()
# 优先提取文章主要内容
article = soup.find('article')
if article:
content = article
else:
# 尝试找到主要内容区域
content = soup.find(['main', '.content', '#content', '.post-content', '.article-content'])
if not content:
content = soup
# 获取文本
text = content.get_text(separator='\n')
# 文本清理
lines = []
for line in text.split('\n'):
line = line.strip()
# 跳过空行和太短的行
if line and len(line) > 30:
lines.append(line)
# 组合文本,限制在1000字符以内
cleaned_text = ' '.join(lines)
if len(cleaned_text) > 1000:
# 尝试在句子边界截断
end_pos = cleaned_text.rfind('. ', 0, 1000)
if end_pos > 0:
cleaned_text = cleaned_text[:end_pos + 1]
else:
cleaned_text = cleaned_text[:1000]
return cleaned_text
except Exception as e:
logger.error(f"Error extracting text from HTML: {str(e)}")
# 如果无法处理HTML,返回原始内容的一部分
text = html_content.replace('<', ' <').replace('>', '> ').split()
return ' '.join(text)[:500]
def _do_search_with_summary(query: str) -> str:
"""Internal function to handle the search logic with summary support"""
try:
query = query.encode('utf-8').decode('utf-8')
url = "https://api.search.brave.com/res/v1/web/search"
headers = {
"Accept": "application/json",
"X-Subscription-Token": API_KEY
}
params = {
"q": query,
"count": 5,
"result_filter": "web",
"enable_summarizer": True,
"format": "json"
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
logger.debug("API Response Structure:")
logger.debug(f"Response Keys: {list(data.keys())}")
# 处理搜索结果
summary_text = ""
search_results = []
# 获取网页搜索结果
if 'web' in data and 'results' in data['web']:
results = data['web']['results']
# 获取摘要
if 'summarizer' in data:
logger.debug("Found official summarizer data")
summary = data.get('summarizer', {})
summary_text = summary.get('text', '')
else:
logger.debug("No summarizer found, generating summary from top results")
# 使用前两个结果的内容作为摘要
try:
summaries = []
for result in results[:2]: # 只处理前两个结果
url = result.get('url')
if url:
logger.debug(f"Fetching content from: {url}")
content = _get_url_content_direct(url)
# 提取HTML中的文本内容
raw_content = content.split('---\n\n')[-1]
text_content = _extract_text_from_html(raw_content)
if text_content:
# 添加标题和来源信息
title = result.get('title', 'No title')
date = result.get('age', '') or result.get('published_time', '')
summaries.append(f"### {title}")
if date:
summaries.append(f"Published: {date}")
summaries.append(text_content)
if summaries:
summary_text = "\n\n".join([
"Generated summary from top results:",
*summaries
])
logger.debug("Successfully generated summary from content")
else:
summary_text = results[0].get('description', '')
except Exception as e:
logger.error(f"Error generating summary from content: {str(e)}")
summary_text = results[0].get('description', '')
# 处理搜索结果显示
for result in results:
title = result.get('title', 'No title').encode('utf-8').decode('utf-8')
url = result.get('url', 'No URL')
description = result.get('description', 'No description').encode('utf-8').decode('utf-8')
search_results.append(f"- {title}\n URL: {url}\n Description: {description}\n")
# 组合输出
output = []
if summary_text:
output.append(f"Summary:\n{summary_text}\n")
if search_results:
output.append("Search Results:\n" + "\n".join(search_results))
logger.debug(f"Has summary: {bool(summary_text)}")
logger.debug(f"Number of results: {len(search_results)}")
return "\n".join(output) if output else "No results found for your query."
except Exception as e:
logger.error(f"Search error: {str(e)}")
logger.exception("Detailed error trace:")
return f"Error performing search: {str(e)}"
def _get_url_content_direct(url: str) -> str:
"""Internal function to get content directly using requests"""
try:
logger.debug(f"Directly fetching content from URL: {url}")
response = requests.get(url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
response.raise_for_status()
# 尝试检测编码
if 'charset' in response.headers.get('content-type', '').lower():
response.encoding = response.apparent_encoding
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# 移除不需要的元素
for element in soup(['script', 'style', 'header', 'footer', 'nav', 'aside', 'iframe', 'ad', '.advertisement']):
element.decompose()
# 尝试找到主要内容区域
main_content = None
possible_content_elements = [
soup.find('article'),
soup.find('main'),
soup.find(class_='content'),
soup.find(id='content'),
soup.find(class_='post-content'),
soup.find(class_='article-content'),
soup.find(class_='entry-content'),
soup.find(class_='main-content'),
soup.select_one('div[class*="content"]'), # 包含 "content" 的任何 class
]
for element in possible_content_elements:
if element:
main_content = element
break
if not main_content:
main_content = soup
text = main_content.get_text(separator='\n')
lines = []
for line in text.split('\n'):
line = line.strip()
if line and len(line) > 30:
lines.append(line)
cleaned_text = ' '.join(lines)
if len(cleaned_text) > 1000:
end_pos = cleaned_text.rfind('. ', 0, 1000)
if end_pos > 0:
cleaned_text = cleaned_text[:end_pos + 1]
else:
cleaned_text = cleaned_text[:1000]
metadata = f"URL: {url}\n"
metadata += f"Content Length: {len(response.text)} characters\n"
metadata += f"Content Type: {response.headers.get('content-type', 'Unknown')}\n"
metadata += "---\n\n"
return f"{metadata}{cleaned_text}"
except Exception as e:
logger.error(f"Error extracting text from HTML: {str(e)}")
return f"Error extracting text: {str(e)}"
except Exception as e:
logger.error(f"Error fetching URL content directly: {str(e)}")
return f"Error getting content: {str(e)}"
def _do_news_search(query: str, country: str = "all", search_lang: str = None) -> str:
"""Internal function to handle news search using Brave News API"""
try:
query = query.encode('utf-8').decode('utf-8')
# 如果未指定语言,自动检测
if search_lang is None:
search_lang = _detect_language(query)
logger.debug(f"Detected language: {search_lang} for query: {query}")
url = "https://api.search.brave.com/res/v1/news/search"
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": API_KEY
}
params = {
"q": query,
"count": 10,
"country": country,
"search_lang": search_lang,
"spellcheck": 1
}
logger.debug(f"Searching news for query: {query}")
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
# 处理新闻搜索结果
results = []
if 'results' in data:
for news in data['results']:
title = news.get('title', 'No title').encode('utf-8').decode('utf-8')
url = news.get('url', 'No URL')
description = news.get('description', 'No description').encode('utf-8').decode('utf-8')
date = news.get('published_time', 'Unknown date')
source = news.get('source', {}).get('name', 'Unknown source')
news_item = [
f"- {title}",
f" Source: {source}",
f" Date: {date}",
f" URL: {url}",
f" Description: {description}\n"
]
results.append("\n".join(news_item))
if not results:
return "No news found for your query."
return "News Results:\n\n" + "\n".join(results)
except requests.exceptions.RequestException as e:
logger.error(f"News API request error: {str(e)}")
return f"Error searching news: {str(e)}"
except Exception as e:
logger.error(f"News search error: {str(e)}")
logger.exception("Detailed error trace:")
return f"Error searching news: {str(e)}"
@mcp.tool()
def search_brave_with_summary(query: str) -> str:
"""Search the web using Brave Search API """
return _do_search_with_summary(query)
@mcp.tool()
def brave_search_summary(query: str) -> str:
"""使用Brave搜索引擎搜索网络信息"""
return _do_search_with_summary(query)
@mcp.tool()
def get_url_content_direct(url: str) -> str:
"""Get webpage content directly using HTTP request
Args:
url (str): The URL to fetch content from
Returns:
str: The webpage content and metadata
"""
return _get_url_content_direct(url)
@mcp.tool()
def url_content(url: str) -> str:
"""直接获取网页内容
参数:
url (str): 目标网页地址
返回:
str: 网页内容和元数据
"""
return _get_url_content_direct(url)
@mcp.tool()
def search_news(query: str) -> str:
"""Search news using Brave News API
Args:
query (str): The search query for news
Returns:
str: News search results including titles, sources, dates and descriptions
"""
return _do_news_search(query)
@mcp.tool()
def search_news_info(query: str) -> str:
"""使用Brave新闻API搜索新闻
参数:
query (str): 新闻搜索关键词
返回:
str: 新闻搜索结果,包含标题、来源、日期和描述
"""
return _do_news_search(query)