博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python利用企业微信api来进行发送自定义报警的类实现
阅读量:4030 次
发布时间:2019-05-24

本文共 3540 字,大约阅读时间需要 11 分钟。

http://blog.51cto.com/7706173/1955976

python利用企业微信api来进行发送自定义报警的类实现

  1. 企业微信注册

    打开http://work.weixin.qq.com/企业微信主页;

    wKioL1mQWsHgzC_4AAUT2UUM0Nw651.png-wh_50a

    点击企业注册;

wKioL1mQWyHz_PniAAB76ksmaao280.png-wh_50

填写相关信息,营业执照和注册号可以不用填,直接下一步,按照提示操作即可;

注册完成后,登陆,就显示如下界面:

wKiom1mQXEbTXR84AAD2Y4eqMnc163.png-wh_50

点击我的企业标签:

wKioL1mQXMuzIM8CAACK0TjYlnM839.png-wh_50

看到如上界面,复制CorpID对应的值;

点击企业应用

wKiom1mQXWfilkKhAAB28XmBUZI997.png-wh_50

点击 创建应用

wKioL1mQXciBgtotAABo__DcRVg329.png-wh_50

填写对应内容,点击创建应用即可;

然后再点击企业应用,就可以在自建应用里看到自己创建的应用;

点击应用图标,看到如下图

wKioL1mQXjKDZtrIAACidJ1quzE226.png-wh_50

复制AgentId、Secret

至此我们需要的三个值就都有(AgentId、Secret、CorpID

2.程序实现

接下来我们用python的类来实现

git clone https://github.com/yxxhero/weixinalarm.git

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
# encoding: utf-8
# -*- coding: utf8 -*-
import 
urllib
import 
urllib2
import 
json
import 
sys
import 
time
import 
os
import 
logging
reload
(sys)
sys.setdefaultencoding(
'utf8'
)
#日志模式初始化
logging.basicConfig(level
=
"DEBUG"
,
                
format
=
'%(asctime)s  %(levelname)s %(message)s'
,
                
datefmt
=
'%Y-%m-%d %H:%M:%S'
,
                
filename
=
'./log/dark_status.log'
,
                
filemode
=
'a'
)
class 
weixinalarm(
object
):
    
def 
__init__(
self
,corpid,secrect,agentid): 
#构造函数,获取关键变量
        
self
.corpid
=
corpid
        
self
.secrect
=
secrect
        
self
.agentid
=
agentid
    
def 
get_access_token(
self
): 
#获取token
        
access_token_url
=
"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid="
+
self
.corpid
+
"&corpsecret="
+
self
.secrect
        
try
:
            
res_data 
= 
urllib2.urlopen(access_token_url,timeout
=
3
)
            
access_token
=
json.loads(res_data.read())[
"access_token"
]
        
except 
Exception,e:
            
logging.info(
str
(e))
            
logging.info(
"access_token获取超时"
)
            
return 
None 
        
else
:
            
return 
access_token
    
def 
check_token(
self
): 
#检查token的缓存文件是否存在或者超时,如果不存在或者超时重新获取。
        
if 
os.path.exists(
"/tmp/weixinalarm"
):
            
with 
open
(
"/tmp/weixinalarm"
,
"r+"
) as fd:
            
result_info
=
fd.read().split(
"^"
)
                
timestamp
=
result_info[
1
]
                
if 
time.time()
-
int
(timestamp) <
7200
:
                    
access_token
=
result_info[
0
]
            
return 
access_token
                
else
:
                    
access_token
=
self
.get_access_token()
                    
timestamp
=
time.time()
                    
tokentime
=
access_token
+
"^"
+
str
(timestamp).split(
"."
)[
0
]
                    
with 
open
(
"/tmp/weixinalarm"
,
"w"
) as fd:
                        
fd.write(tokentime)
            
return 
access_token
        
else
:
            
access_token
=
self
.get_access_token()
            
timestamp
=
time.time()
            
tokentime
=
access_token
+
"^"
+
str
(timestamp).split(
"."
)[
0
]
            
with 
open
(
"/tmp/weixinalarm"
,
"w"
) as fd:
                
fd.write(tokentime)
        
return 
access_token
    
def 
sendmsg(
self
,title,description): 
#利用api发送消息,具体的api格式可以参考官方文档
        
try
:
        
access_token
=
self
.check_token()
            
if 
access_token:
                
send_url
=
"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="
+
access_token
                
send_info
=
{
                
"touser" 
"@all"
,
                
"msgtype" 
"text"
,
                
"agentid" 
self
.agentid,
                
"text"
:{
                    
"content"
:
str
(title)
+
":"
+
str
(description)
                    
}
                
}
                
logging.info(send_info)
                
send_info_urlencode 
= 
json.dumps(send_info)
                
#send_info_urlencode = json.dumps(send_info,ensure_ascii=False)
                
req
=
urllib2.Request(url 
= 
send_url,data 
=
send_info_urlencode)
                
response
=
urllib2.urlopen(req,timeout
=
3
)
                
res_info
=
response.read()
            
else
:
                
logging.error(
"no access_token"
)
        
except 
Exception,e:
            
logging.error(
str
(e))
        
else
:
            
alarm_result
=
json.loads(res_info)
            
if 
int
(alarm_result[
"errcode"
])
=
=
0
:
                
logging.info(
"报警正常"
)
            
else
:
                
logging.info(alarm_result[
"errmsg"
])
        
finally
:
            
if 
response:
                
response.close()

代码很简单,有想和我交流的可以加我微信18333610114

基本使用:

1
2
3
4
5
6
7
from 
weixinalarm 
import 
weixinalarm
sender
=
weixinalarm(corpid
=
Secret,secrect
=
Secret,agentid
=
AgentId)
发送消息:
sender.sendmsg(title
=
"自定义"
,description
=
"自定义"
)
即可
日志放在log下,log目录需自行创建,和weixinalarm.py同

转载地址:http://adlbi.baihongyu.com/

你可能感兴趣的文章
AsyncTask、View.post(Runnable)、ViewTreeObserver三种方式总结frame animation自动启动
查看>>
Android中AsyncTask的简单用法
查看>>
S3C6410启动模式介绍
查看>>
Jlink + ADS调试 S3C2440
查看>>
2440初始化存储器原理(接上一篇)
查看>>
S3C2440 USB 设备控制器(转)
查看>>
Linux usb 设备驱动 (1)
查看>>
解决跨网场景下,CAS重定向无法登录的问题(无需修改现有代码)
查看>>
java反编译命令
查看>>
activemq依赖包获取
查看>>
概念区别
查看>>
关于静态块、静态属性、构造块、构造方法的执行顺序
查看>>
final 的作用
查看>>
在Idea中使用Eclipse编译器
查看>>
idea讲web项目部署到tomcat,热部署
查看>>
优化IDEA启动速度,快了好多。后面有什么优化点,会继续往里面添加
查看>>
JMeter 保持sessionId
查看>>
IDEA Properties中文unicode转码问题
查看>>
Idea下安装Lombok插件
查看>>
zookeeper
查看>>