2024-08-09T03:13:59.png

介绍

通过searx搜索引擎+python实现的简单的BT磁力链接搜索工具

搭建

搭建searx搜索引擎,之前讲过搭建方法:https://blog.hgtrojan.com/index.php/archives/10/

python源代码

import tkinter as tk
from tkinter import messagebox
import requests
from bs4 import BeautifulSoup
import threading
import queue
import re
import urllib.parse  # 用于 URL 解码


class MagnetLinkSearcher:
    def __init__(self, root):
        self.root = root
        self.root.title("磁力链接搜索")

        self.links_queue = queue.Queue()
        self.lock = threading.Lock()

        # 创建并布局组件
        tk.Label(root, text="搜索关键词:").pack(pady=5)
        self.entry_search = tk.Entry(root, width=50)
        self.entry_search.pack(pady=5)

        tk.Label(root, text="搜索深度(页数):").pack(pady=5)
        self.entry_depth = tk.Entry(root, width=10)
        self.entry_depth.pack(pady=5)

        tk.Label(root, text="线程数:").pack(pady=5)
        self.entry_threads = tk.Entry(root, width=10)
        self.entry_threads.pack(pady=5)

        tk.Button(root, text="搜索", command=self.search_magnet).pack(pady=10)

        tk.Label(root, text="结果:").pack(pady=5)
        self.text_result = tk.Text(root, height=20, width=80)
        self.text_result.pack(expand=True, fill=tk.BOTH, padx=5, pady=5)  # 添加 expand 和 fill 选项

    def fetch_page(self, page, query, headers):
        url = f'https://yoursearx.com/search?q={query}&categories=files&pageno={page}&language=zh-CN'
        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            # 提取磁力链接
            items = soup.find_all('a', href=True)
            for item in items:
                link = item.get('href')
                if link and link.startswith('magnet:'):
                    # 解析 magnet 链接中的参数
                    params = urllib.parse.parse_qs(link[8:])  # 移除 'magnet:' 前缀并解析参数
                    title_encoded = params.get('dn', ['无标题'])[0]

                    # 解码 URL 编码
                    title = urllib.parse.unquote(title_encoded)

                    # 清理标题,只保留主要部分
                    title = re.sub(r'^\d+', '', title)  # 去除开头的数字部分
                    title = re.sub(r'[^\w\s\.\-]', '', title)  # 清理特殊字符
                    title = title.strip()  # 去除两端的空白字符

                    with self.lock:
                        self.links_queue.put((title, link))

        except Exception as e:
            print(f"页面 {page} 抓取失败: {e}")

    def start_threads(self, depth, query, headers, thread_count):
        threads = []
        pages_per_thread = (depth + thread_count - 1) // thread_count  # Calculate pages per thread

        for i in range(thread_count):
            start_page = i * pages_per_thread + 1
            end_page = min(start_page + pages_per_thread - 1, depth)
            thread = threading.Thread(target=self.fetch_pages_range, args=(start_page, end_page, query, headers))
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

    def fetch_pages_range(self, start_page, end_page, query, headers):
        for page in range(start_page, end_page + 1):
            self.fetch_page(page, query, headers)

    def search_magnet(self):
        query = self.entry_search.get()
        try:
            depth = int(self.entry_depth.get())
            thread_count = int(self.entry_threads.get())

            # 进行范围检查
            if not (2 <= thread_count <= 10):
                raise ValueError("线程数必须在2到10之间。")
            if not (6 <= depth <= 1000):
                raise ValueError("深度必须在6到1000之间。")

        except ValueError as e:
            messagebox.showwarning("输入错误", str(e))
            return

        if not query:
            messagebox.showwarning("输入错误", "请输入搜索关键词。")
            return

        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Cookie': 'tokens=yourtokens'  # 替换成实际的Cookie值
        }

        self.links_queue = queue.Queue()
        self.start_threads(depth, query, headers, thread_count)

        all_links = []
        while not self.links_queue.empty():
            all_links.append(self.links_queue.get())

        if all_links:
            self.text_result.delete(1.0, tk.END)
            for title, link in all_links:
                self.text_result.insert(tk.END, f"{title}:\n{link}\n\n")
        else:
            messagebox.showinfo("无结果", "未找到磁力链接。")


# 创建GUI窗口并启动
root = tk.Tk()
app = MagnetLinkSearcher(root)
root.mainloop()

一键下载地址:main.rar

文章目录