TG RSS推送机器人
简介
为了实现将博客内容自动化推送到 Telegram 频道的功能,我基于 Python 开发了一个 RSS 推送机器人。
其原理是:首先拉取并解析指定的 RSS 源内容,然后利用 Telegram Bot 将解析后的消息推送到指定的频道 ID。如果你想将内容推送到自己的频道,除了需要一个 Bot,还需要获取该频道的 CHAT_ID。
CHAT_ID 可以通过 Web 版 Telegram 获取。例如,我的频道的 CHAT_ID 是 -1002321694691
。
代码如下,大家有需要的可以自行获取。记得将 Bot 添加到频道中,这样才能正常推送消息。完成部署后(需要安装 Python 和相关依赖环境),请务必添加 cron 任务,以确保定时推送。
编辑 cron 任务:
crontab -e
0 * * * * /usr/bin/python3 /程序所在路径/程序名.py
代码
import requests
import xml.etree.ElementTree as ET
import os
# Telegram Bot Token 和 Channel ID
TELEGRAM_TOKEN = 'YOUR_TELEGRAM_TOKEN';
CHAT_ID = 'YOUR_CHAT_ID';
SENT_LINKS_FILE = '../bot/sent_links.txt' # 存储已推送过的链接
# 获取 RSS 内容
def fetch_rss():
url = 'https://blog.hgtrojan.com/index.php/feed' #换成你的RSS地址
response = requests.get(url)
if response.status_code == 200:
return response.text
else:
print(f"获取 RSS 失败,状态码:{response.status_code}")
return None
# 解析 RSS 内容
def parse_rss(rss_content):
root = ET.fromstring(rss_content)
items = []
for item in root.findall('.//item'):
title = item.find('title').text
link = item.find('link').text
description = item.find('description').text
items.append({
'title': title,
'link': link,
'description': description
})
return items
# 检查链接是否已经推送过
def is_link_sent(link):
if not os.path.exists(SENT_LINKS_FILE):
return False
with open(SENT_LINKS_FILE, 'r') as file:
sent_links = file.readlines()
if link + '\n' in sent_links:
return True
return False
# 记录已推送的链接
def record_sent_link(link):
with open(SENT_LINKS_FILE, 'a') as file:
file.write(link + '\n')
# 发送消息到 Telegram
def send_to_telegram(news_message):
url = f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage'
message = f"<b>{news_message['title']}</b>\n\n{news_message['description']}\n\n<a href='{news_message['link']}'>阅读全文</a>"
payload = {
'chat_id': CHAT_ID,
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(url, data=payload)
if response.status_code == 200:
print(f"消息已发送到 Telegram: {news_message['title']}")
record_sent_link(news_message['link']) # 记录已发送的链接
else:
print(f"发送消息到 Telegram 失败: {response.status_code}")
# 主函数,执行以上步骤
def main():
print("正在获取 RSS 内容...")
rss_content = fetch_rss()
if rss_content:
print("正在解析 RSS 内容...")
news_items = parse_rss(rss_content)
print(f"正在发送 {len(news_items)} 条消息到 Telegram...")
for news_item in news_items:
if not is_link_sent(news_item['link']): # 检查是否已经推送过
send_to_telegram(news_item)
else:
print(f"跳过已推送的文章: {news_item['title']}")
else:
print("没有获取到 RSS 内容")
if __name__ == '__main__':
main()
打赏: 支付宝
本人所有文章均为技术分享,均用于防御为目的的记录,所有操作均在实验环境下进行,请勿用于其他用途,否则后果自负。 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!