-
Notifications
You must be signed in to change notification settings - Fork 6
/
camus2hive.sh
executable file
·236 lines (199 loc) · 8.17 KB
/
camus2hive.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/bin/bash
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##################
### Camus2Hive ###
##################
# Dependencies validation
command -v hive >/dev/null 2>&1 || {
echo "The hive command must be defined. Aborting."
exit 1
}
command -v hdfs >/dev/null 2>&1 || {
echo "The hdfs command must be defined. Aborting."
exit 1
}
function print_usage() {
echo "Usage: `basename $0` <camus_destination_dir> [-d database] [-r repository_uri]"
echo ""
echo "camus_destination_dir"
echo " HDFS path where Camus stores its destination directory."
echo ""
echo "-h,--help"
echo " prints this message"
echo ""
echo "-d,--database <database>"
echo " name of database to use, default is 'default'"
echo ""
echo "-r,--repository <repository_uri>"
echo " http uri to the schema repository"
echo ""
exit 1
}
# Process the arguments
# Remove trailing slashes (if the supplied path is just / then $CAMUS_DESTINATION_DIR will be empty, but that's ok since commands below always add a slash after anyway...)
CAMUS_DESTINATION_DIR=`echo $1 | sed -e 's%\(/\)*$%%g'`
if [[ -z "$CAMUS_DESTINATION_DIR" ]]; then
print_usage
exit 1
fi
shift
while [[ $# -gt 0 ]]; do
opt="$1"
shift
current_arg="$1"
case "$opt" in
"-d"|"--database")
DATABASE=$current_arg
shift
;;
"-r"|"--repository")
AVRO_SCHEMA_REPOSITORY=$current_arg
shift
;;
"-h"|"--help")
print_usage
exit 0
;;
*)
echo "Invalid argument $opt"
print_usage
exit 1
;;
esac
done
if [[ -z "$DATABASE" ]]; then
DATABASE="default"
fi
HIVE="hive --database $DATABASE -S"
# What namenode Hive is communicating with for this database
NAME_NODE_URI=$(${HIVE} -e "describe database $DATABASE;" | sed -re 's%.*\t(hdfs://[a-zA-Z0-9]+)(:[0-9]+)?.*%\1\2%')
# Behavior config
REQUERY_HADOOP_DIRS=true
EXIT_ON_ERROR=false
PRINT_HIVE_STDERR=false
# This directory and file hold state for the whole job
WORK_DIR='temp_camus2hive'
TOPIC_NAMES="$WORK_DIR/topic_names"
# These files hold state per table/topic (and are zero-ed out between each)
EXISTING_HIVE_PARTITIONS_WITH_SLASHES="$WORK_DIR/hive_partitions_with_slashes"
EXISTING_HIVE_PARTITIONS="$WORK_DIR/hive_partitions"
EXISTING_CAMUS_PARTITIONS="$WORK_DIR/camus_partitions"
HIVE_PARTITIONS_TO_ADD="$WORK_DIR/hive_partitions_to_add"
HIVE_ADD_PARTITION_STATEMENTS="$WORK_DIR/hive_add_partitions_statements"
HIVE_STDERR="$WORK_DIR/hive_stderr"
# Return 0 if everything is ok
function hive_success_check {
local status=$?
MESSAGE=$1
if [[ $status -ne 0 ]]; then
if [[ -z $MESSAGE ]]; then
echo "HIVE ERROR :'((( ..."
else
echo "HIVE ERROR: $MESSAGE"
fi
if $PRINT_HIVE_STDERR ; then cat $HIVE_STDERR; fi
if $EXIT_ON_ERROR ; then exit 1; fi
return 1
else
return 0
fi
}
function latest_schema_for_topic {
local uri="$AVRO_SCHEMA_REPOSITORY/$1/latest"
# This gets returned in the format ID\tSCHEMA
local latest=$(curl -fs ${uri})
if [[ -z $latest ]]; then
# We need to crap out here because if this fails, we could lose data
echo "Could not access avro repository at $uri"
exit 1
fi
local latest_id=$(echo $latest | awk '{print $1}')
local latest_schema=$(echo $latest | awk '{$1=""; print substr($0,2)}')
eval "$2='$latest_id'"
eval "$3='$latest_schema'"
}
# Let's get to work
mkdir -p $WORK_DIR
if $REQUERY_HADOOP_DIRS ; then
hdfs dfs -ls $CAMUS_DESTINATION_DIR/ | grep -v 'Found .* items' | sed s%.*$CAMUS_DESTINATION_DIR/%% > $TOPIC_NAMES
fi
while read topic; do
# Zero-out the per-topic state files (probably not necessary but whatever...)
> $EXISTING_HIVE_PARTITIONS_WITH_SLASHES
> $EXISTING_HIVE_PARTITIONS
> $EXISTING_CAMUS_PARTITIONS
> $HIVE_PARTITIONS_TO_ADD
> $HIVE_ADD_PARTITION_STATEMENTS
> $HIVE_STDERR
if [[ ! -z "$AVRO_SCHEMA_REPOSITORY" ]]; then
latest_schema_for_topic $topic SCHEMA_ID SCHEMA_TEXT
echo $SCHEMA_TEXT > $WORK_DIR/$SCHEMA_ID
SCHEMA_DIR=$CAMUS_DESTINATION_DIR/$topic/schemas
LATEST_SCHEMA=$SCHEMA_DIR/$SCHEMA_ID
hdfs dfs -mkdir -p $SCHEMA_DIR
hdfs dfs -test -e $LATEST_SCHEMA
if [[ $? -ne 0 ]]; then
hdfs dfs -put $WORK_DIR/$SCHEMA_ID $LATEST_SCHEMA
else
echo "Schema '$SCHEMA_ID' already exists for topic '${topic}'"
fi
fi
# Check if the table already exists in Hive
${HIVE} -e "SHOW PARTITIONS $topic" 1> $EXISTING_HIVE_PARTITIONS_WITH_SLASHES 2> $HIVE_STDERR
if ! hive_success_check "Table '$topic' does not currently exist in Hive (or Hive returned some other error on SHOW PARTITIONS $topic)."; then
if [[ ! -z "$AVRO_SCHEMA_REPOSITORY" ]]; then
# Create the table from the latest schema, this assumes a Validator already made sure it is safe to do so
echo "Creating table ${topic} from ${LATEST_SCHEMA}"
${HIVE} -e "\
CREATE EXTERNAL TABLE ${topic} \
PARTITIONED BY (year int, month int, day int, hour int) \
ROW FORMAT SERDE \
'org.apache.hadoop.hive.serde2.avro.AvroSerDe' \
STORED AS INPUTFORMAT \
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' \
OUTPUTFORMAT \
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' \
TBLPROPERTIES ( \
'avro.schema.url'='${NAME_NODE_URI}${LATEST_SCHEMA}' \
);" > /dev/null 2> $HIVE_STDERR
if hive_success_check "Some errors occurred while creating the table '$topic'"; then
echo "Successfully created Hive table '$topic' from schema $SCHEMA_ID :D !"
fi
fi
else
if [[ ! -z "$AVRO_SCHEMA_REPOSITORY" ]]; then
# Update the hive table to the latest schema, which may not have been updated, but oh well
${HIVE} -e "\
ALTER TABLE ${topic} \
SET TBLPROPERTIES ( \
'avro.schema.url'='${NAME_NODE_URI}${LATEST_SCHEMA}' \
\
);" > /dev/null 2> $HIVE_STDERR
if hive_success_check "Some errors occurred while updating schema for the table '$topic'"; then
echo "Successfully updated Hive table '$topic' to schema $SCHEMA_ID :D !"
fi
fi
fi
cat $EXISTING_HIVE_PARTITIONS_WITH_SLASHES | sed 's%/%, %g' > $EXISTING_HIVE_PARTITIONS
# Extract all partitions currently ingested by Camus
hdfs dfs -ls -R $CAMUS_DESTINATION_DIR/$topic | sed "s%.*$CAMUS_DESTINATION_DIR/$topic/hourly/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*%year=\1, month=\2, day=\3, hour=\4%" | grep "year.*" | sort | uniq > $EXISTING_CAMUS_PARTITIONS
grep -v -f $EXISTING_HIVE_PARTITIONS $EXISTING_CAMUS_PARTITIONS > $HIVE_PARTITIONS_TO_ADD
echo "$topic currently has $(cat $EXISTING_CAMUS_PARTITIONS | wc -l) partitions in Camus directories, $(cat $EXISTING_HIVE_PARTITIONS | wc -l) in Hive and thus $(cat $HIVE_PARTITIONS_TO_ADD | wc -l) left to add to Hive"
sed "s%\(year=\([0-9]*\), month=\([0-9]*\), day=\([0-9]*\), hour=\([0-9]*\)\)%ALTER TABLE $topic ADD IF NOT EXISTS PARTITION (\1) LOCATION '$CAMUS_DESTINATION_DIR/$topic/hourly/\2/\3/\4/\5';%" < $HIVE_PARTITIONS_TO_ADD > $HIVE_ADD_PARTITION_STATEMENTS
${HIVE} -f $HIVE_ADD_PARTITION_STATEMENTS > /dev/null 2> $HIVE_STDERR
if hive_success_check "Some errors occurred while adding partitions to table '$topic'" && [[ -s $HIVE_PARTITIONS_TO_ADD ]]; then
echo "$(cat $HIVE_PARTITIONS_TO_ADD | wc -l) partitions successfully added to Hive table '$topic' :D !"
fi
echo ""
done < $TOPIC_NAMES
echo "Finished processing $(cat $TOPIC_NAMES | wc -l) topic(s) :)"