forked from liuwons/wxBot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
109 lines (93 loc) · 4.23 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python
# coding: utf-8
from wxbot import *
import ConfigParser
import json
class TulingWXBot(WXBot):
def __init__(self):
WXBot.__init__(self)
self.tuling_key = ""
self.robot_switch = True
try:
cf = ConfigParser.ConfigParser()
cf.read('conf.ini')
self.tuling_key = cf.get('main', 'key')
except Exception:
pass
print 'tuling_key:', self.tuling_key
def tuling_auto_reply(self, uid, msg):
if self.tuling_key:
url = "http://www.tuling123.com/openapi/api"
user_id = uid.replace('@', '')[:30]
body = {'key': self.tuling_key, 'info': msg.encode('utf8'), 'userid': user_id}
r = requests.post(url, data=body)
respond = json.loads(r.text)
result = ''
if respond['code'] == 100000:
result = respond['text'].replace('<br>', ' ')
result = result.replace(u'\xa0', u' ')
elif respond['code'] == 200000:
result = respond['url']
elif respond['code'] == 302000:
for k in respond['list']:
result = result + u"【" + k['source'] + u"】 " +\
k['article'] + "\t" + k['detailurl'] + "\n"
else:
result = respond['text'].replace('<br>', ' ')
result = result.replace(u'\xa0', u' ')
print ' ROBOT:', result
return result
else:
return u"知道啦"
def auto_switch(self, msg):
msg_data = msg['content']['data']
stop_cmd = [u'退下', u'走开', u'关闭', u'关掉', u'休息', u'滚开']
start_cmd = [u'出来', u'启动', u'工作']
if self.robot_switch:
for i in stop_cmd:
if i == msg_data:
self.robot_switch = False
self.send_msg_by_uid(u'[Robot]' + u'机器人已关闭!', msg['to_user_id'])
else:
for i in start_cmd:
if i == msg_data:
self.robot_switch = True
self.send_msg_by_uid(u'[Robot]' + u'机器人已开启!', msg['to_user_id'])
def handle_msg_all(self, msg):
if not self.robot_switch and msg['msg_type_id'] != 1:
return
if msg['msg_type_id'] == 1 and msg['content']['type'] == 0: # reply to self
self.auto_switch(msg)
elif msg['msg_type_id'] == 4 and msg['content']['type'] == 0: # text message from contact
self.send_msg_by_uid(self.tuling_auto_reply(msg['user']['id'], msg['content']['data']), msg['user']['id'])
elif msg['msg_type_id'] == 3 and msg['content']['type'] == 0: # group text message
if 'detail' in msg['content']:
my_names = self.get_group_member_name(msg['user']['id'], self.my_account['UserName'])
if my_names is None:
my_names = {}
if 'NickName' in self.my_account and self.my_account['NickName']:
my_names['nickname2'] = self.my_account['NickName']
if 'RemarkName' in self.my_account and self.my_account['RemarkName']:
my_names['remark_name2'] = self.my_account['RemarkName']
is_at_me = False
for detail in msg['content']['detail']:
if detail['type'] == 'at':
for k in my_names:
if my_names[k] and my_names[k] == detail['value']:
is_at_me = True
break
if is_at_me:
src_name = msg['content']['user']['name']
reply = 'to ' + src_name + ': '
if msg['content']['type'] == 0: # text message
reply += self.tuling_auto_reply(msg['content']['user']['id'], msg['content']['desc'])
else:
reply += u"对不起,只认字,其他杂七杂八的我都不认识,,,Ծ‸Ծ,,"
self.send_msg_by_uid(reply, msg['user']['id'])
def main():
bot = TulingWXBot()
bot.DEBUG = True
bot.conf['qr'] = 'png'
bot.run()
if __name__ == '__main__':
main()