功能与应用场景
配置与使用 添加机器人:在企业微信群聊中,可以通过点击群聊右上角的“...”选择添加群机器人,创建新的机器人并获取webhook地址 点击新创建一个机器人 输入机器人名字后,点击添加机器人 点击确定后,出现如下界面。会反馈webhook地址以及这个机器人单独的key值。将webhook地址以及key保存下来 python脚本: import requests import json import hashlib import base64 import os from urllib import parse from requests_toolbelt import MultipartEncoder
global _msg_robot msg_robot = '这里填上创建机器人返回的key值'
#说明: ''' 支持纯文本、markdown、图片、文件、图文 a = wx.WxRobot('1111-222-333-444-55555') #webhook a.sendMessage('文本内容', '@对象') a.sendMarkdown('markdown内容') a.sendImage('d:/图片.jpg') #图片文件的绝对路径 a.sendMedia('d:/文件.jpg') #文件的绝对路径 a.sendNews('[{ 'title' : '***', 'description' : '****', 'url' : 'www.qq.com', 'picurl' : '****' }]') '''
class WxRobot(): headers = {'Content-Type': 'application/json'} req_message = {'errcode': 1, 'errmessage': '请求微信企业失败,请检查网络'}
#这里post_url由创建机器人的webhook以及key共同组成 def __init__(self, webhook): self.post_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={0}'.format(webhook) #发送消息 def _req(self, data): try: return requests.post(self.post_url, data=json.dumps(data), headers=self.headers, timeout=10).json() except Exception as e: return self.req_message
def sendMessage(self, message): data = {'msgtype': 'text', 'text': {'content': str(message)}} return self._req(data)
def sendMarkdown(self, message): data = {'msgtype': 'markdown', 'markdown': {'content': str(message)}} return self._req(data)
def sendNews(self, message): data = {'msgtype': 'news', 'news': {'articles': list(message)}} print(data) return self._req(data)
def sendImage(self, image_path): if os.path.exists(image_path): with open(image_path, 'br') as f: fcont = f.read() ls_f = base64.b64encode(fcont) fmd5 = hashlib.md5(fcont) data = {'msgtype': 'image', 'image': {'base64': ls_f.decode('utf8'), 'md5': fmd5.hexdigest()}} return self._req(data) else: self.req_message['errmessage'] = '图片文件不存在' return self.req_message
def sendMedia(self, file_path): if os.path.exists(file_path): upload_url = self.post_url.replace('send', 'upload_media') + '&type=file' try: media_id = requests.post(upload_url, files=[('media', open(file_path, 'rb'))]).json()['media_id'] print(media_id) except Exception as e: self.req_message['errmessage'] = '上传文件失败,请检查网络' return self.req_message data = {'msgtype': 'file', 'file': {'media_id': media_id}} return self._req(data) else: self.req_message['errmessage'] = '文件不存在' return self.req_message
def sendFile(self,filepath): ''' 发送微信群组机器人文件 :param type_t: 测试类型 :param filepath: 文件路径 :return: ''' # url为群组机器人WebHook,配置项 url = self.post_url headers = { 'content-type': 'application/json' } # 发送文件需要先上传文件获取media_id media_id = self.UploadFile(filepath, url) # 发送请求 try: msg = {'msgtype': 'file', 'file': {'media_id': media_id}} result = requests.post(url, headers=headers, json=msg) if result.status_code == 200: print('文件上传成功') else: print('文件上传失败')
return True except Exception as e: print('企业微信机器人发送文件失败, 详细信息:' + str(e)) return False
def UploadFile(self,filepath, webHookUrl): ''' 企业微信机器人上传文件,发送文件前需要先上传--要求文件大小在5B~20M之间 :param filepath: 文件路径 :param webHookUrl: 群组机器人WebHook :return: media_id ''' # url为群组机器人WebHook,配置项 params = parse.parse_qs(parse.urlparse(webHookUrl).query) webHookKey = params['key'][0] upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={webHookKey}&type=file' headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36' } try: multipart = MultipartEncoder( fields={ 'filename': os.path.basename(filepath), 'filelength': '', 'name': 'media', 'media': (os.path.basename(filepath), open(filepath, 'rb'), 'application/octet-stream') }, boundary='-------------------------acebdf13572468') headers['Content-Type'] = multipart.content_type resp = requests.post(upload_url, headers=headers, data=multipart) json_res = resp.json() if json_res.get('media_id'): print(f'企业微信机器人上传文件成功,file:{filepath}') return json_res.get('media_id') except Exception as e: print('企业微信机器人上传文件失败,详细信息:' + str(e)) return ''
if __name__ == '__main__': robot=WxRobot(msg_robot) #发送消息 robot.sendMessage('企业微信测试机器人') #发送图片 robot.sendImage('d:\test.img') #发送视频 robot.sendMedia('d:\test.mp4') #发送文件 robot.sendFile('d:\行业报告')
上面的代码实现了往企业微信群中发送消息,文件,视频,图片等功能。有了这些功能后,我们可以定制很多自动化任务。 1 周期发送通知消息 2 和AI对接后,发送回答 3 和爬虫对接后,发送从web获取到的内容 |
|