返回
"""
@description:微信公众号消息推送
@config_file:配置文件/config.ini
"""
import datetime
import json
import requests
from bs4 import BeautifulSoup
from configparser import ConfigParser
class SendMessage(): # 定义发送消息的类
def __init__(self):
config = ConfigParser()
config.read('./配置文件/config.ini', encoding='utf-8')
date = self.get_date()
weather = self.get_weather()
rsdate = self.get_rsday()
body = rsdate
self.dataJson = {"first": "早上好!",
"date": date + '',
"body": body + " ",
"weather": weather + ' 城市:重庆',
"last": "今天也是开心的一天!!!"
}
self.appID = config.get('message', 'appID') # appid 注册时有
self.appsecret = config.get('message', 'appsecret') # appsecret 同上
self.template_id = config.get('message', 'template_id') # 模板id
self.access_token = self.get_access_token() # 获取 access token
openid_data = config.get('message', 'openid')
self.openid_list = [i.strip(" ") for i in openid_data.split(',')]
self.opend_ids = self.get_openid() # 获取关注用户的openid
# 爬取天气数据
def get_weather(self):
url = 'http://www.weather.com.cn/weather/101040100.shtml'
sysdate = datetime.date.today()
r = requests.get(url, timeout=30)
r.raise_for_status()
r.encoding = r.apparent_encoding
html = r.text
final_list = []
soup = BeautifulSoup(html, 'html.parser')
body = soup.body
data = body.find('div', {'id': '7d'})
ul = data.find('ul')
lis = ul.find_all('li')
for day in lis:
temp_list = []
date = day.find('h1').string
if date.string.split('日')[0] == str(sysdate.day):
temp_list = []
date = day.find('h1').string
temp_list.append(date)
info = day.find_all('p')
temp_list.append(info[0].string)
if info[1].find('span') is None:
temperature_highest = ' '
else:
temperature_highest = info[1].find('span').string
temperature_highest = temperature_highest.replace('℃', ' ')
if info[1].find('i') is None:
temperature_lowest = ' '
else:
temperature_lowest = info[1].find('i').string
temperature_lowest = temperature_lowest.replace('℃', ' ')
temp_list.append(temperature_highest)
temp_list.append(temperature_lowest)
final_list.append(temp_list)
return '天气情况:' + final_list[0][1] + ' ' + '温度:' + final_list[0][3].strip() + '~' + \
final_list[0][2].strip() + '℃'
# 计算日期和时间
def get_date(self):
sysdate = datetime.date.today() # 只获取日期
now_time = datetime.datetime.now() # 获取日期加时间
week_day = sysdate.isoweekday() # 获取周几
week = ['星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期天']
return '现在是' + str(now_time)[0:16] + ' ' + week[week_day - 1]
# 计算认识时间
def get_rsday(self):
today = datetime.datetime.now()
data_str = today.strftime('%Y-%m-%d')
oneDay = datetime.date(2023, 9, 29)
d = today.toordinal() - oneDay.toordinal()
return "我们已经认识 %d 天" % (d)
def get_access_token(self):
"""
获取access_token
"""
url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={}'. \
format(self.appID, self.appsecret)
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36'
}
response = requests.get(url, headers=headers).json()
access_token = response.get('access_token')
return access_token
def get_openid(self):
"""
获取所有用户的openid
"""
next_openid = ''
url_openid = 'https://api.weixin.qq.com/cgi-bin/user/get?access_token=%s&next_openid=%s' % (
self.access_token, next_openid)
ans = requests.get(url_openid)
open_ids = json.loads(ans.content)['data']['openid']
return open_ids
def sendmsg(self, openid_list=None):
"""
给指定列表中的用户发送消息。如果列表为空则给所有用户发送消息
"""
url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token)
# 如果没有传入 openid_list 或者 openid_list 为空,则发送给所有用户
target_openids = openid_list if openid_list else self.opend_ids
if target_openids:
for open_id in target_openids:
body = {
"touser": open_id,
"template_id": self.template_id,
"url": "https://xn--39q850b0xyr2hdkx3d.top/",
"topcolor": "#FF0000",
# 对应模板中的数据模板
"data": {
"first": {
"value": self.dataJson.get("first"),
"color": "#EA0000" # 文字颜色
},
"body": {
"value": self.dataJson.get("body"),
"color": "#EA0000"
},
"weather": {
"value": self.dataJson.get("weather"),
"color": "#00EC00"
},
"date": {
"value": self.dataJson.get("date"),
"color": "#6F00D2"
},
"last": {
"value": self.dataJson.get("last"),
"color": "#66CCFF"
}
}
}
data = bytes(json.dumps(body, ensure_ascii=False).encode('utf-8'))
response = requests.post(url, data=data)
result = response.json()
if result["errcode"] == 0:
print(f"消息发送成功给用户 {open_id}")
else:
print(f"消息发送失败给用户 {open_id}, 错误码: {result['errcode']}")
else:
print("当前没有用户关注该公众号!")
def main(self):
sends = SendMessage()
sends.sendmsg(openid_list=self.openid_list)
if __name__ == "__main__":
sends = SendMessage()
sends.main()