-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_streamer_utils.py
executable file
·97 lines (87 loc) · 4.04 KB
/
twitter_streamer_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from datetime import datetime
from elasticsearch.exceptions import TransportError
import re
terms = ['reserve bank', 'glenn stevens', 'graeme wheeler', 'philip lowe', 'phillip lowe', 'adrian orr',
'bank of canada', 'poloz', 'boc rate', 'boc inflation', 'boc monetary', 'boc financial',
'ecb', 'draghi', 'european central bank',
'bank of england', 'mark carney', 'boe rate', 'boe inflation', 'boe monetary', 'boe financial',
'fed', 'federal reserve', 'FOMC', 'yellen', 'powell',
'bank of japan', 'kuroda', 'boj rate', 'boj inflation', 'boj monetary', 'boj financial']
groups = {}
groups['aus'] = ['reserve bank', 'glenn stevens', 'graeme wheeler', 'philip lowe', 'phillip lowe', 'adrian orr']
groups['boc'] = ['bank of canada', 'poloz', 'boc rate', 'boc inflation', 'boc monetary', 'boc financial']
groups['ecb'] = ['ecb', 'draghi', 'european central bank']
groups['boe'] = ['bank of england', 'mark carney', 'boe rate', 'boe inflation', 'boe monetary', 'boe financial']
groups['fed'] = ['fed', 'FOMC', 'yellen', 'powell']
groups['boj'] = ['bank of japan', 'kuroda', 'boj rate', 'boj inflation', 'boj monetary', 'boj financial']
banks = {}
banks['aus'] = [re.compile('^(?=.*reserve)(?=.*bank).*$'), re.compile('^(?=.*glenn)(?=.*stevens).*$'), re.compile('^(?=.*graeme)(?=.*wheeler).*$'), re.compile('^(?=.*phill?ip)(?=.*lowe).*$')]
banks['boc'] = [re.compile('^(?=.*bank)(?=.*of)(?=.*canada).*$'), 'poloz', re.compile('^(?=.*boc)(?=.*(inflation|rate|monetary|financial)).*$')]
banks['ecb'] = ['ecb', 'draghi', re.compile('^(?=.*european)(?=.*central)(?=.*bank).*$')]
banks['boe'] = [re.compile('^(?=.*bank)(?=.*of)(?=.*england).*$'), re.compile('^(?=.*mark)(?=.*carney).*$'), re.compile('^(?=.*boe)(?=.*(inflation|rate|monetary|financial)).*$')]
banks['fed'] = ['fed', 'fomc', 'yellen', 'powell']
banks['boj'] = [re.compile('^(?=.*bank)(?=.*of)(?=.*japan).*$'), 'kuroda', re.compile('^(?=.*boj)(?=.*(inflation|rate|monetary|financial)).*$')]
def create_twitter_index(client, index='cb_twitter'):
create_index_body = {
'settings': {
# just one shard, no replicas for testing
'number_of_shards': 5,
'number_of_replicas': 1,
'mappings': {
'tweet': {
"dynamic_templates": [{
"created_at_as_datetime": {
"match_mapping_type": "*",
"match": "*created_at",
"mapping": {
'type': 'date',
'format': "EEE MMM dd HH:mm:ss Z yyyy"
}
}
}
]}
}
}
}
# create empty index
try:
client.indices.create(
index=index,
body=create_index_body,
)
except TransportError as e:
# ignore already existing index
if e.error == 'index_already_exists_exception':
pass
else:
raise
def categorize_tweet(tweet, banks):
"""
Twitter searches for keywords in the tweet text as well as the body of the tweet
that is being retweeted or quoted. Tweets may or may not have those fields depending
on their type. This function agglomerates text from all available places and then
searches it.
"""
categories = []
text = ""
if 'retweeted_status' in tweet.keys():
try:
text += tweet['retweeted_status']['extended_tweet']['full_text']
except:
text += tweet['retweeted_status']['text']
if 'quoted_status' in tweet.keys():
try:
text += tweet['quoted_status']['extended_tweet']['full_text']
except:
text += tweet['quoted_status']['text']
if tweet['truncated']:
text += tweet['extended_tweet']['full_text']
else:
text += tweet['text']
text = text.lower()
for bank, keywords in banks.iteritems():
for pattern in keywords:
if bool(re.search(pattern, text)):
categories.append(bank)
continue
return categories