REC
浪漫庄园公告自动发布
潼语喧声

浪漫庄园公告自动发布

潼语
1年前发布 /正在检测是否收录...
温馨提示:
本文最后更新于2024年08月21日,已超过238天没有更新,若内容或图片失效,请留言反馈。

前言

当前脚本使用的开发语言是Python,功能是通过脚本的运行获取浪漫庄园每次游戏更新后的更新内容,并且通过钉钉api将内容实时发送到钉钉群内。

教程

第一步

在代码开头将编码设置为“gbk”

# -*- coding: gbk -*-

设置这一步是防止后续出现乱码的问题

第二步

需要添加相关的扩展

from sqlite3 import Timestamp
import requests  
import chardet  
from bs4 import BeautifulSoup  
from datetime import datetime  
import mysql.connector
import re

其中需要安装的有:requests、bs4、mysql

第三步

数据库配置

# MySQL数据库配置  
db_config = {  
    'user': 'user',  //数据库用户名
    'password': 'password', //数据库密码 
    'host': 'ip',  //数据库IP地址
    'database': 'database',  //数据库名
    'raise_on_warnings': True  
    # 要保存的表名  
table_name = 'huancun'  //将“huancun”换成自己数据库中定义的表名
}  

第四步

获取网页中需要的内容

def format_current_date():  
    # 获取当前日期  
    now = datetime.now()  
    # 提取年份的后两位  
    year_last_two = str(now.year)[-2:]  
  
    # 提取月份和日期的两位数字  
    month_two = str(now.month).zfill(2)  # zfill确保总是两位数字  
    day_two = str(now.day).zfill(2)      # 同上  
  
    # 组合成字符串并加上01  
    formatted_date = year_last_two + month_two + day_two + '01'  
    
    return formatted_date  
  
def get_webpage_text_content(url_template):  
    try:  
        # 假设URL模板是 http://rc.leeuu.com/data/news/{date}.htm  
        formatted_date = format_current_date()  
        url = url_template.format(date=formatted_date)
        #url = url_template.format(date='24062601')    
        
  
        response = requests.get(url)  
  
        # 尝试检测编码(如果响应内容不是UTF-8)  
        raw_data = response.content  
        if not response.encoding.lower() == 'utf-8':  
            result = chardet.detect(raw_data)  
            encoding = result['encoding']  
            content = raw_data.decode(encoding, errors='ignore')  # 使用检测到的编码解码  
        else:  
            content = response.text  # 如果已经是UTF-8,则直接使用text  
  
        # 确保请求成功  
        if response.status_code == 200:  
            # 使用BeautifulSoup解析HTML  
            soup = BeautifulSoup(content, 'html.parser')  
            # 提取所有文本内容(包括段落、标题等)  
            text_content1 = soup.get_text(strip=True, separator='\n')  # strip=True 去除多余空白,separator='\n' 以换行符分隔文本块  
            text_content = re.sub(r'(搜索.*\n复制.*)', '', text_content1, flags=re.DOTALL | re.MULTILINE)
            return text_content  
        else:  
           return f"Failed to retrieve the webpage. Status code: {response.status_code}" #报告网页错误 
           
           

    except requests.RequestException as e:  
        return f"Oops: Something Error {e}"  
    except UnicodeDecodeError as e:  
        return f"Error decoding the webpage: {e}"  

第五步

# 发送钉钉文本消息的函数  
def send_dingtalk_text_message(content, webhook_url):  
    headers = {  
        'Content-Type': 'application/json',  
        'Charset': 'UTF-8',  
    }  
    message = {  
        "msgtype": "text",  
        "text": {  
            "content": content  
        }  
    }  
    try:  
        response = requests.post(webhook_url, json=message, headers=headers)  
        response.raise_for_status()  # 如果请求失败,抛出HTTPError异常  
        print("DingTalk text message sent successfully.")  
    except requests.RequestException as e:  
        print(f"Error sending DingTalk text message: {e}") 
        #send_dingtalk_text_message("今日暂无更新", DINGTALK_WEBHOOK_URL)
 # 钉钉Webhook的URL  
DINGTALK_WEBHOOK_URL = 'https://oapi.dingtalk.com/robot/send?access_token=XXXX' //将XXXX替换为自己钉钉群内自定义机器人中的数据
# 使用URL模板
url_template = "http://rc.leeuu.com/data/news/{date}.htm"
content = get_webpage_text_content(url_template)

第六步

连接数据库并且添加获取到的内容进数据库,同时向钉钉群内发送消息

# 假设这是你从某个数据源获取的新内容及其相关信息
if "404" in content:
    new_content = "今日无更新内容。"
else:
    new_content = content

new_key_value = format_current_date()
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # 获取当前时间戳

# 连接到MySQL数据库
cnx = mysql.connector.connect(**db_config)
cursor = cnx.cursor()

# 定义表名和字段
table_name = "huancun"
key_field = 'key'  # 替换为你的唯一键字段名
content_field = 'content'  # 内容字段名
timestamp_field = 'timestamp'  # 时间戳字段名

# 执行查询
query = f"SELECT {content_field} FROM {table_name} WHERE `{key_field}` = %s"
cursor.execute(query, (new_key_value,))  # 使用参数化查询来防止SQL注入

# 获取查询结果
result = cursor.fetchone()  # 因为是根据主键查询,所以预期只返回一个结果

# 检查 result 是否存在
if result:
    content1 = result[0]
    if content1 == new_content:
        print("当前数据未更新!")
    else:
        # 使用 f-string 构建 SQL 查询
        insert_query = f"""
        INSERT INTO `{table_name}` (`{key_field}`, `{content_field}`, `{timestamp_field}`)
        VALUES (%s, %s, %s)
        ON DUPLICATE KEY UPDATE `{content_field}` = VALUES({content_field}), `{timestamp_field}` = VALUES({timestamp_field})
        """

        try:
            cursor.execute(insert_query, (new_key_value, new_content, timestamp))
            cnx.commit()
            print("新内容已保存到数据库,或已存在的记录已更新。")
            
            if "404" not in new_content:
                send_dingtalk_text_message(new_content, DINGTALK_WEBHOOK_URL)
        except Exception as e:
            print(f"发生错误:{e}")
else:
    # 如果没有找到记录,直接插入新数据
    insert_query = f"""
    INSERT INTO `{table_name}` (`{key_field}`, `{content_field}`, `{timestamp_field}`)
    VALUES (%s, %s, %s)
    ON DUPLICATE KEY UPDATE `{content_field}` = VALUES({content_field}), `{timestamp_field}` = VALUES({timestamp_field})
    """

    try:
        cursor.execute(insert_query, (new_key_value, new_content, timestamp))
        cnx.commit()
        print("新内容已保存到数据库,或已存在的记录已更新。")
        
        if "404" not in new_content:
            send_dingtalk_text_message(new_content, DINGTALK_WEBHOOK_URL)
    except Exception as e:
        print(f"发生错误:{e}")

# 关闭游标和连接
cursor.close()
cnx.close()

代码到此完毕,后续根据实际情况会进行优化删减,使代码运行更加方便

第七步

最后一步将文件上传到服务器上,添加定时任务中的shell脚本填写以下代码:

sudo -u root bash -c '#!/bin/bash
. /etc/profile
. ~/.bash_profile
/usr/bin/XX /XXX/XXX/浪漫庄园公告.py'
喜欢就支持一下吧
点赞 0 分享 收藏
评论
当前页面的评论已关闭