-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMarkovBot.py
292 lines (264 loc) · 12.9 KB
/
MarkovBot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import json
import os
import re
import smtplib
import subprocess
import sys
import urllib2 # for querying data to scrape
from datetime import datetime
from threading import Thread
from time import sleep, strftime
import colors
import tweepy
from MarkovChains import Chain
from Tokenizer import Tokenizer
from bs4 import BeautifulSoup
from keys import email_key
from tweepy import TweepError
from twitter_scraping.scrape import scrape
"""
This is a bot that uses markov chaining to generate tweets when given a twitter user's handle
It is particularly efficient in its analysis of the corpus.
Using http://trumptwitterarchive.com/ as the original source of tweets from the president (includes deleted).
Partially inspired by https://boingboing.net/2017/11/30/correlation-between-trump-twee.html
"""
# constants
TWEET_MAX_LENGTH = 280
MIN_TWEET_LENGTH = 15 # arbitrary
class MarkovBot:
def __init__(self, api_key, other_handle, active_hours=range(24), max_chains=5,
min_word_freq=3, seed=None, scrape_from=None):
self.keys = api_key
self.active = active_hours # NOTE: limited use rn
(self.api, self.me, self.handle, self.fancy_handle) = self.verify(other_handle)
self.folder = "bot_files/%s/" % self.handle
self.log = self.folder + "%s_log.txt" % self.handle
self.corpus = self.folder + "%s.json" % self.handle
self.replied_tweets = self.folder + "%s_replied_tweets.txt" % self.handle # custom reply file
self.tokenizer = self.check_corpus(scrape_from, min_word_freq)
self.chain_maker = Chain(self.handle, max_chains=max_chains, seed=seed)
def verify(self, handle):
"""
Verifies that the user has valid credentials for accessing Tweepy API
:param handle: the handle of the twitter user that the bot operator wishes to mimic
:return: a 4-tuple of an API object, the bot's handle, the standardized handle of the other user, and the
actual handle of the other user (can have uppercase letters)
"""
sys.stdout.write(colors.yellow("verifying credentials"))
thread = Thread(target=self.loading()) # lol
thread.daemon = True # kill this thread if program exits
thread.start()
api = self.authorize()
handle = handle.strip().lower() # standardize name formatting for folder name
try:
who_am_i = handle if handle in "test" else api.get_user(handle).screen_name # test that API works
me = api.me().screen_name
except TweepError as e:
err = e[:][0][0]["message"]
raise ValueError("Awh dang dude, you gave me something bad: %s" % err)
thread.join() # lol
print colors.white(" verified\n") + colors.cyan("starting up bot ") + colors.white("@" + me) + colors.cyan(
" as ") + colors.white("@" + who_am_i) + colors.cyan("!\n")
return api, me, handle, who_am_i # api, the bot's name, the other user's name, full version of user's name
def check_corpus(self, scrape_from_when, min_word_freq):
"""
Checks if there are pre-existing files or if they will have to be regenerated. If data needs to be scraped
the bot will go ahead and do that and immediately generate a corpus for the collected data.
:type min_word_freq: the minimum number of times a word must appear in the corpus to be in the user's vocab
:param scrape_from_when: When the bot will start grabbing tweets from
"""
if min_word_freq < 1:
raise ValueError(colors.red("Word frequency threshold must be greater than 0"))
if self.handle in "test":
return None # nothing to do here
scraped = False
if not os.path.exists(self.corpus): # check for corpus file
print colors.red("no corpus.json file found - generating...")
if not os.path.exists(self.folder): # check if they even have a folder yet
os.mkdir(self.folder)
scrape(self.handle, self.keys, start=scrape_from_when if scrape_from_when else self.get_join_date())
scraped = True
if scrape_from_when and not scraped: # they already had a corpus and need a special scrape
scrape(self.handle, self.keys, start=scrape_from_when)
tokenizer = Tokenizer(min_word_freq)
tokenizer.generate(self.handle)
return tokenizer # always return the Tokenizer object
def update(self, starting=None):
"""
By default, checks "{usr}_all_ids.json" for when tweets were most recently scraped, then scrapes from then until
the present. If no tweets were collected or not file was found, begins scraping from their join date.
:param starting: The date to start scraping from (FORMAT: YYYY-MM-DD)
"""
if not starting: # if not given, look from beginning
starting = self.get_join_date()
print colors.cyan("Updating corpus.json")
scrape(self.handle, start=starting, api=self.keys)
def chain(self, max_length=TWEET_MAX_LENGTH):
"""
Generates and prints a sentence using Markov chains. User can specify the maximum length of the tweet lest it
defaults to the maximum tweet length
:param max_length: the maximum number of characters allowed in the tweet - by default, max tweet length
:return: the markov chain text that was generated
"""
if max_length < MIN_TWEET_LENGTH:
raise ValueError(colors.red("Tweets must be larger than %s chars" % MIN_TWEET_LENGTH))
chain_text = self.chain_maker.generate_chain(max_length)
return colors.white("@" + self.fancy_handle) + colors.yellow(" says: ") + chain_text
def tweet_chain(self, max_length=TWEET_MAX_LENGTH, safe=True):
"""
Bot issues a tweet made by markov chaining
:param max_length: the maximum number of characters allowed in the tweet - by default, max tweet length
:param safe: if True, bot will remove all "@" symbols so that twitter doesn't get mad :(
:return: the text of the tweet that was tweeted
"""
tweet_text = self.chain_maker.generate_chain(max_length=max_length)
if safe:
tweet_text = tweet_text.replace("@", "#") # :(
self.tweet(tweet_text)
return tweet_text
@staticmethod # hehe
def loading():
for x in [".", ".", "."]:
sys.stdout.write(colors.yellow(x))
sys.stdout.flush()
sleep(0.5)
def get_join_date(self):
"""
Helper method - checks a user's twitter page for the date they joined
:return: the "%day %month %year" a user joined
"""
page = urllib2.urlopen("https://twitter.com/" + self.handle)
soup = BeautifulSoup(page, "html.parser")
date_string = str(soup.find("span", {"class": "ProfileHeaderCard-joinDateText"})["title"]).split(" - ")[1]
date_string = str(0) + date_string if date_string[1] is " " else date_string
return str(datetime.strptime(date_string, "%d %b %Y"))[0:10]
def regenerate(self, new_min_frequency): # change threshold - convenience method ig
"""
Regenerates the corpus with a non-default minimum word frequency
:param new_min_frequency: the minimum number of times a word must appear in the corpus to be in the vocab
"""
if new_min_frequency < 1:
raise ValueError(colors.red("Word frequency threshold must be greater than 0"))
print colors.yellow("regenerating vocab with required min frequency at %s...\n" % new_min_frequency)
self.tokenizer.generate(self.handle, new_min_frequency)
def authorize(self):
"""
Uses keys to create an API accessor and returns it
:return: an object used to access the Tweepy API
"""
auth = tweepy.OAuthHandler(self.keys["consumer_key"], self.keys["consumer_secret"])
auth.set_access_token(self.keys["access_token"], self.keys["access_token_secret"])
return tweepy.API(auth)
def tweet(self, tweet=None, at=None):
"""
General tweeting method. It will divide up long bits of text into multiple messages, and return the first tweet
that it makes. Multi-tweets (including to other people) will have second and third messages made in response
to self.
:param at: who the user is tweeting at
:param tweet: the text to tweet
:return: the first tweet if successful, else None
"""
if not tweet:
return None
num_tweets, tweets = self.divide_tweet(tweet, at)
if num_tweets > 0:
my_ret = self.api.update_status(tweets[0])
for remaining in xrange(1, len(tweets)):
self.api.update_status(tweets[remaining])
return my_ret # return first tweet - multi-tweets will be responding to it
else:
return None
def clear_tweets(self):
"""
DANGER: removes all tweets from current bot account
"""
for status in tweepy.Cursor(self.api.user_timeline).items():
try:
self.api.destroy_status(status.id)
print colors.white("deleted successfully")
except tweepy.TweepError:
print colors.red("Failed to delete:"), status.id
def is_replied(self, tweet): # check if replied. if not, add to list and reply
"""
This bot tries to reply to everyone who @'s it, so it will use a list of tweet IDs to keep track
It is assumed that if a tweet is un-replied, the bot will reply to it (add to list)
:param tweet: the tweet in question
:return: if the tweet had a reply or not
"""
with open(self.replied_tweets, "rb") as replied_tweets:
replies = replied_tweets.readlines()
replied = (str(tweet.id) + "\n") in replies
if not replied:
with open(self.replied_tweets, "ab") as replied_tweets:
replied_tweets.write(str(tweet.id) + "\n")
return replied
def is_active(self):
"""
The bot tries not to tweet at times when no one will see
:return: whether it's late enough or not
"""
current_time = datetime.now().hour
early = self.active[0]
return current_time >= early
def respond(self, tweet): # provide translation of custom message or username
"""
Given a tweet, formulate a response
:param tweet: tweet to respond to
:return: the tweet to make in response
"""
username = str(tweet.user.screen_name)
text = tweet.full_text
if username != self.me: # don't respond to self
if not is_replied(tweet):
# TODO automatic response
pass
def divide_tweet(self, long_tweet, at=None):
"""
A method for exceptionally long tweets
:rtype: the number of tweets, followed by the tweets
:param at: the person you're responding to/at
:param long_tweet: the long-ass tweet you're trying to make
:return: an array of up to 3 tweets
"""
# 1 tweet
handle = "@" + at + " " if at else ""
my_handle = "@" + self.me
numbered = len("(x/y) ")
single_tweet_length = (TWEET_MAX_LENGTH - len(handle))
first_tweet_length = (TWEET_MAX_LENGTH - len(handle) - numbered)
self_tweet_length = (TWEET_MAX_LENGTH - len(my_handle) - numbered)
two_tweets_length = first_tweet_length + self_tweet_length
three_tweets_length = two_tweets_length + self_tweet_length
# 1 tweet
if len(long_tweet) <= single_tweet_length:
return 1, [handle + long_tweet]
# too many characters (edge case)
elif len(long_tweet) >= three_tweets_length:
return 0, None
# 3 tweets
elif len(long_tweet) > two_tweets_length:
return 3, [handle + "(1/3) "
+ long_tweet[:first_tweet_length],
my_handle + "(2/3) "
+ long_tweet[first_tweet_length: two_tweets_length],
my_handle + "(3/3) "
+ long_tweet[two_tweets_length: len(long_tweet)]]
# 2 tweets
else:
return 2, [handle + "(1/2) "
+ long_tweet[: first_tweet_length],
my_handle + "(2/2) "
+ long_tweet[first_tweet_length: len(long_tweet)]]
def check_tweets(self):
"""
tweet upkeep multi-processing method, can be run continuously to check for anyone tweeting @ the bot
"""
print(colors.cyan("Beginning polling...\n"))
while 1:
try:
for tweet in tweepy.Cursor(self.api.search, q="@%s -filter:retweets" % self.me,
tweet_mode="extended").items():
respond(tweet)
except tweepy.TweepError as e:
print RED + e.api_code + RESET
sleep(30)