-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwxclass.py
110 lines (90 loc) · 2.72 KB
/
wxclass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import lxml.etree as ET
from hashlib import sha1
#from copy import deepcopy
from hashlib import sha1
from time import time
class WxError(Exception):
"""docstring for WxError"""
def __init__(self, errstr):
super(WxError, self).__init__()
self.errstr = errstr
class WxRequest(object):
"""docstring for WxRequest"""
def __init__(self, xmlinf, fromstr=True, debug=False):
# super(WxRequest, self).__init__()
if debug:
self.argdict= xmlinf
return None
if fromstr:
xmlobj= ET.fromstring(xmlinf)
else:
xmlobj= ET.parse(xmlinf)
self.argdict= WxRequest._parse(xmlobj)
@staticmethod
def _parse(xmlobj):
tmpdict={}
for elem in xmlobj:
if len(elem):
tmpdict[elem.tag]= WxRequest.parse(elem)
else:
tmpdict[elem.tag]= elem.text
return tmpdict
def __getitem__(self, keystr):
i=self.argdict
for key in keystr.rsplit('.'):
i=i[key]
return i
def __setitem__(self, key, value):
self.argdict[key]= value
def __getattr__(self, key):
return self.argdict[key]
def reply(self, msgtype, msgarg):
if msgtype== 'raw':
# print('Using raw mode.')
return WxResponse(dict(msgarg,**WxResponse.api['common'](self)))
else:
return WxResponse(dict(WxResponse.api['common'](self),**WxResponse.api[msgtype](msgarg)))
class WxResponse(object):
# """docstring for WxResponse"""
api={
'text': lambda x: {'Content':x,'MsgType':'text'},
'news': lambda x: {'Articles': [dict(zip(('Title','Description','PicUrl','Url'),t)) for t in x ],'ArticleCount': len(x),'MsgType':'news'},
'music': lambda x: dict(zip(('Title','Description','MusicUrl','HQMusicUrl'),x),MsgType='music') ,
'common': lambda x: {'ToUserName': x['FromUserName'],'FromUserName':x['ToUserName'],'CreateTime':int(time()),'FuncFlag':0},
}
def __init__(self, argd, caller=None):
# super(WxResponse, self).__init__()
self.argd= argd
self.type= argd['MsgType']
self.caller= None
# self.pass_to= None
def __getitem__(self, key):
return self.argd[key]
def __setitem__(self, key, value):
self.argd[key]= value
def star(self):
self.argd['FuncFlag']=1
return self
def setCaller(self,caller):
self.caller=caller
return self
#
#
# def _autofill(self):
# self.argdict[]
class WxAuth(object):
"""docstring for WxAuth"""
_check = lambda argd: argd['signature']==sha1(''.join(sorted([argd['token'],argd['timestamp'],argd['nonce']])).encode()).hexdigest()
_sendback = lambda argd: argd['echostr']
def __init__(self, arg, **additional): #You should always give a token='xxxx' pram !
# super(WxAuth, self).__init__()
self.arg = arg
self.arg.update(additional)
self.ok= bool(WxAuth._check(self.arg))
def __bool__(self):
return self.ok
def reply(self):
if self.ok:
return WxAuth._sendback(self.arg)
else:
return 'FUCK YOU'