-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathuniversal_util.py
1446 lines (1068 loc) · 61.3 KB
/
universal_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from subprocess import Popen, PIPE, STDOUT, check_call
import glob
import boto3
import botocore
import constants_and_names as cn
import datetime
import rasterio
import logging
import csv
import psutil
from shutil import copyfile, move
import os
import multiprocessing
from multiprocessing.pool import Pool
from shutil import copy
import re
import pandas as pd
from osgeo import gdal
import time
from random import seed
from random import random
# Prints the date as YYYYmmdd_hhmmss
d = datetime.datetime.today()
date_today = d.strftime('%Y_%m_%d')
date_time_today = d.strftime('%Y%m%d_%h%m%s') # for Linux
# date_time_today = d.strftime('%Y%m%d_%H%M%S') # for Windows
# Uploads the output log to the designated s3 folder
def upload_log():
# Builds a slightly variable delay into the log uploading so that a ton of log uploads at once don't overwhelm s3
seed()
lag = random()*2
time.sleep(lag)
cmd = ['aws', 's3', 'cp', os.path.join(cn.docker_app, cn.model_log), cn.model_log_dir, '--only-show-errors']
check_call(cmd)
# Creates the log with a starting line
def initiate_log(tile_id_list):
# For some reason, logging gets turned off when AWS credentials aren't provided.
# This restores logging without AWS credentials.
if not check_aws_creds():
# https://stackoverflow.com/a/49202811
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(filename=os.path.join(cn.docker_app, cn.model_log),
format='%(levelname)s @ %(asctime)s: %(message)s',
datefmt='%Y/%m/%d %I:%M:%S %p',
level=logging.INFO)
if cn.SENSIT_TYPE == 'std':
sensit_type = 'standard model'
else:
sensit_type = cn.SENSIT_TYPE
logging.info(f'Log notes: {cn.LOG_NOTE}')
logging.info(f'Model version: {cn.version}')
logging.info(f'This is the start of the log for this model run. Below are the command line arguments for this run.')
logging.info(f'Sensitivity analysis type: {sensit_type}')
logging.info(f'Model stage argument: {cn.STAGE_INPUT}')
logging.info(f'Run model stages after the initial selected stage: {cn.RUN_THROUGH}')
logging.info(f'Run date: {cn.RUN_DATE}')
logging.info(f'Tile ID list: {tile_id_list}')
logging.info(f'Carbon emitted_pools to generate (optional): {cn.CARBON_POOL_EXTENT}')
logging.info(f'Emissions emitted_pools (optional): {cn.EMITTED_POOLS}')
logging.info(f'Standard net flux for comparison with sensitivity analysis net flux (optional): {cn.STD_NET_FLUX}')
logging.info(f'Include mangrove removal scripts in model run (optional): {cn.INCLUDE_MANGROVES}')
logging.info(f'Include US removal scripts in model run (optional): {cn.INCLUDE_US}')
logging.info(f'Do not upload anything to s3: {cn.NO_UPLOAD}')
logging.info(f'AWS credentials supplied: {check_aws_creds()}')
logging.info(f'Save intermediate outputs: {cn.SAVE_INTERMEDIATES}')
logging.info(f'Use single processor: {cn.SINGLE_PROCESSOR}')
logging.info(f'AWS ec2 instance type and AMI ID:')
# https://stackoverflow.com/questions/13735051/how-to-capture-curl-output-to-a-file
# https://stackoverflow.com/questions/625644/how-to-get-the-instance-id-from-within-an-ec2-instance
try:
cmd = ['curl', 'http://169.254.169.254/latest/meta-data/instance-type', '-o', 'instance_type.txt', '--silent']
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
with process.stdout:
log_subprocess_output(process.stdout)
cmd = ['curl', 'http://169.254.169.254/latest/meta-data/ami-id', '-o', 'ami_id.txt', '--silent']
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
with process.stdout:
log_subprocess_output(process.stdout)
type_file = open("instance_type.txt", "r")
type_lines = type_file.readlines()
for line in type_lines:
logging.info(f' Instance type: {line.strip()}')
ami_file = open("ami_id.txt", "r")
ami_lines = ami_file.readlines()
for line in ami_lines:
logging.info(f' AMI ID: {line.strip()}')
os.remove("ami_id.txt")
os.remove("instance_type.txt")
except:
logging.info(' Not running on AWS ec2 instance')
logging.info(f"Available processors: {cn.count}")
# Suppresses logging from rasterio and botocore below ERROR level for the entire model
logging.getLogger("rasterio").setLevel(logging.ERROR) # https://www.tutorialspoint.com/How-to-disable-logging-from-imported-modules-in-Python
logging.getLogger("botocore").setLevel(logging.ERROR) # "Found credentials in environment variables." is logged by botocore: https://github.com/boto/botocore/issues/1841
# If no_upload flag is not activated, log is uploaded
if not cn.NO_UPLOAD:
upload_log()
# Prints the output statement in the console and adds it to the log. It can handle an indefinite number of string to print
def print_log(*args):
# Empty string
full_statement = str(object='')
# Concatenates all individuals strings to the complete line to print
for arg in args:
full_statement = full_statement + str(arg) + " "
logging.info(full_statement)
# Prints to console
print("LOG: " + full_statement)
# # Every time a line is added to the log, it is copied to s3.
# # NOTE: During model 1.2.1 runs, I started getting repeated errors about uploading the log to s3.
# # I don't know why it happened, but my guess is that it's because I had too many things trying to copy
# # to that s3 location at once. So I'm reducing the occasions for uploading the log by removing uploads
# # whenever anything is printed. Instead, I'll upload at the end of each tile and each model stage.
# upload_log()
# Logs fatal errors to the log txt, uploads to s3, and then terminates the program with an exception in the console
def exception_log(*args):
# Empty string
full_statement = str(object='')
# Concatenates all individuals strings to the complete line to print
for arg in args:
full_statement = full_statement + str(arg) + " "
# Adds the exception to the log txt
logging.info(full_statement, stack_info=True)
# If no_upload flag is not activated (by choice or by lack of AWS credentials), output is uploaded
if not cn.NO_UPLOAD:
# Need to upload log before the exception stops the script
upload_log()
# Prints to console, ending the program
raise Exception(full_statement)
# Adds the subprocess output to the log and the console
# Solution is from second answer (jfs' answer) at this page: https://stackoverflow.com/questions/21953835/run-subprocess-and-print-output-to-logging
def log_subprocess_output(pipe):
# Reads all the output into a string
for full_out in iter(pipe.readline, b''): # b"\n"-separated lines
# Separates the string into an array, where each entry is one line of output
line_array = full_out.splitlines()
# For reasons I don't know, the array is backwards, so this prints it out in reverse (i.e. correct) order
for line in reversed(line_array):
logging.info(line.decode("utf-8")) #https://stackoverflow.com/questions/37016946/remove-b-character-do-in-front-of-a-string-literal-in-python-3, answer by krock
print(line.decode("utf-8"))
# # After the subprocess finishes, the log is uploaded to s3.
# # Having too many tiles finish running subprocesses at once can cause the upload to get overwhelmed and cause
# # an error. So, I've commented out the log upload because it's not really necessary here.
# upload_log()
def log_subprocess_output_simple(cmd):
# Solution for adding subprocess output to log is from https://stackoverflow.com/questions/21953835/run-subprocess-and-print-output-to-logging
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
with process.stdout:
log_subprocess_output(process.stdout)
def log_subprocess_output_full(cmd):
# Solution for adding subprocess output to log is from https://stackoverflow.com/questions/21953835/run-subprocess-and-print-output-to-logging
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
pipe = process.stdout
with pipe:
# Reads all the output into a string
for full_out in iter(pipe.readline, b''): # b"\n"-separated lines
# Separates the string into an array, where each entry is one line of output
line_array = full_out.splitlines()
# For reasons I don't know, the array is backwards, so this prints it out in reverse (i.e. correct) order
for line in reversed(line_array):
logging.info(line.decode(
"utf-8")) # https://stackoverflow.com/questions/37016946/remove-b-character-do-in-front-of-a-string-literal-in-python-3, answer by krock
print(line.decode(
"utf-8")) # https://stackoverflow.com/questions/37016946/remove-b-character-do-in-front-of-a-string-literal-in-python-3, answer by krock
# # After the subprocess finishes, the log is uploaded to s3
# upload_log()
# Checks if Amazon Web Services credentials are in the environment. Both the access key and secret key are needed.
def check_aws_creds():
if ('AWS_ACCESS_KEY_ID' in os.environ) and ('AWS_SECRET_ACCESS_KEY' in os.environ):
# print_log("s3 credentials found. Uploading and downloading enabled.")
return True
else:
# print_log("s3 credentials not found. Uploading to s3 disabled but downloading enabled.")
return False
# Checks the OS for how much storage is available in the system, what's being used, and what percent is being used
# https://stackoverflow.com/questions/12027237/selecting-specific-columns-from-df-h-output-in-python
def check_storage():
df_output_lines = [s.split() for s in os.popen("df -h").read().splitlines()]
used_storage = df_output_lines[5][2]
available_storage = df_output_lines[5][3]
percent_storage_used = df_output_lines[5][4]
print_log(f'Storage used: {used_storage}; Available storage: {available_storage}; Percent storage used: {percent_storage_used}')
# Obtains the absolute number of RAM gigabytes currently in use by the entire system (all processors).
# https://www.pragmaticlinux.com/2020/12/monitor-cpu-and-ram-usage-in-python-with-psutil/
# The outputs from this don't exactly match the memory shown in htop but I think it's close enough to be useful.
# It seems to slightly over-estimate memory usage (by ~1-2 GB).
def check_memory():
used_memory = (psutil.virtual_memory().total - psutil.virtual_memory().available)/1024/1024/1000
total_memory = psutil.virtual_memory().total/1024/1024/1000
percent_memory = used_memory/total_memory*100
print_log(f"Memory usage is: {round(used_memory,2)} GB out of {round(total_memory,2)} = {round(percent_memory,1)}% usage")
if percent_memory > 99:
print_log('WARNING: MEMORY USAGE DANGEROUSLY HIGH! TERMINATING PROGRAM.') # Not sure if this is necessary
exception_log('EXCEPTION: MEMORY USAGE DANGEROUSLY HIGH! TERMINATING PROGRAM.')
# Not currently using because it shows 1 when using with multiprocessing
# (although it seems to work fine when not using multiprocessing)
def counter(func):
"""
A decorator that counts and prints the number of times a function has been executed
https://stackoverflow.com/a/1594484 way down at the bottom of the post in the examples section
"""
@functools.wraps(func)
def wrapper_count(*args, **kwargs):
wrapper_count.count = wrapper_count.count + 1
print("Number of times {0} has been used: {1}".format(func.__name__, wrapper_count.count))
res = func(*args, **kwargs)
return res
wrapper_count.count = 0
return wrapper_count
# Gets the tile id from the full tile name using a regular expression
def get_tile_id(tile_name):
# based on https://stackoverflow.com/questions/20003025/find-1-letter-and-2-numbers-using-regex and https://docs.python.org/3.4/howto/regex.html
tile_id = re.search("[0-9]{2}[A-Z][_][0-9]{3}[A-Z]", tile_name).group()
return tile_id
# Gets the tile id from the full tile name using a regular expression
def get_tile_type(tile_name):
tile_type = tile_name[9:-4]
return tile_type
# Gets the tile name from the full tile name using a regular expression
def get_tile_name(tile):
tile_name = os.path.split(tile)[1]
return tile_name
# Gets the directory of the tile
def get_tile_dir(tile):
tile_dir = os.path.split(tile)[0]
return tile_dir
# Makes a complete tile name out of component tile id and pattern
def make_tile_name(tile_id, pattern):
return f'{tile_id}_{pattern}.tif'
# Lists the tiles in a folder in s3
def tile_list_s3(source, sensit_type='std'):
# Changes the directory to list tiles in if the model run is the biomass_swap or US_removals sensitivity analyses
# (JPL AGB extent and US extent, respectively)
if sensit_type == 'std':
new_source = source
elif sensit_type == 'US_removals':
new_source = cn.annual_gain_AGC_BGC_natrl_forest_US_dir
else:
new_source = source.replace('standard', sensit_type)
print_log("\n" + f'Creating list of tiles in {new_source}')
## For an s3 folder in a bucket using AWSCLI
# Captures the list of the files in the folder
# out = Popen(['aws', 's3', 'ls', new_source, '--no-sign-request'], stdout=PIPE, stderr=STDOUT)
out = Popen(['aws', 's3', 'ls', new_source], stdout=PIPE, stderr=STDOUT)
stdout, stderr = out.communicate()
# Writes the output string to a text file for easier interpretation
biomass_tiles = open(os.path.join(cn.docker_tmp, 'tiles.txt'), "wb")
biomass_tiles.write(stdout)
biomass_tiles.close()
file_list = []
# Iterates through the text file to get the names of the tiles and appends them to list
with open(os.path.join(cn.docker_tmp, 'tiles.txt'), 'r') as tile:
for line in tile:
num = len(line.strip("\n").split(" "))
tile_name = line.strip("\n").split(" ")[num - 1]
# Only tifs will be in the tile list
if '.tif' in tile_name:
tile_id = get_tile_id(tile_name)
file_list.append(tile_id)
if len(file_list) > 0:
return file_list
# In case the change of directories to look for sensitivity versions yields an empty folder.
# This could be done better by using boto3 to check the potential s3 folders for files upfront but I couldn't figure
# out how to do that.
print_log("\n" + f'Creating list of tiles in {source}')
## For an s3 folder in a bucket using AWSCLI
# Captures the list of the files in the folder
# out = Popen(['aws', 's3', 'ls', source, '--no-sign-request'], stdout=PIPE, stderr=STDOUT)
out = Popen(['aws', 's3', 'ls', source], stdout=PIPE, stderr=STDOUT)
stdout, stderr = out.communicate()
# Writes the output string to a text file for easier interpretation
biomass_tiles = open(os.path.join(cn.docker_tmp, 'tiles.txt'), "wb")
biomass_tiles.write(stdout)
biomass_tiles.close()
file_list = []
# Iterates through the text file to get the names of the tiles and appends them to list
with open(os.path.join(cn.docker_tmp, 'tiles.txt'), 'r') as tile:
for line in tile:
num = len(line.strip("\n").split(" "))
tile_name = line.strip("\n").split(" ")[num - 1]
# Only tifs will be in the tile list
if '.tif' in tile_name:
tile_id = get_tile_id(tile_name)
file_list.append(tile_id)
return file_list
# Lists the tiles on the spot machine
def tile_list_spot_machine(source, pattern):
## For an s3 folder in a bucket using AWSCLI
# Captures the list of the files in the folder
out = Popen(['ls', source], stdout=PIPE, stderr=STDOUT)
stdout, stderr = out.communicate()
# Writes the output string to a text file for easier interpretation
tiles = open(os.path.join(cn.docker_tmp, 'tiles.txt'), "wb")
tiles.write(stdout)
tiles.close()
file_list = []
# Iterates through the text file to get the names of the tiles and appends them to list
with open(os.path.join(cn.docker_tmp, 'tiles.txt'), 'r') as tile:
for line in tile:
num = len(line.strip("\n").split(" "))
tile_name = line.strip("\n").split(" ")[num - 1]
# Only files with the specified pattern will be in the tile list
if pattern in tile_name:
file_list.append(tile_name)
return file_list
# Creates a list of all tile ids found in input s3 folders, removes duplicate tile ids from the list, and orders them
def create_combined_tile_list(list_of_tile_dirs, sensit_type='std'):
print_log('Making a combined tile id list...')
# Changes the directory to list tiles according to the model run.
# If the model run is the biomass_swap or US_removals sensitivity analyses
# (JPL AGB extent and US extent, respectively), particular sets of tiles are designated.
# If the model run is standard, the names don't change.
# WARNING: Other sensitivity analyses aren't included in this and may result in unintended behaviors.
# WARNING: No sensitivity analyses have been tested with this function.
if sensit_type == 'biomass_swap':
source = cn.JPL_processed_dir
tile_list = tile_list_s3(source, sensit_type='std')
return tile_list
if sensit_type == 'US_removals':
source = cn.annual_gain_AGC_BGC_natrl_forest_US_dir
tile_list = tile_list_s3(source, sensit_type='std')
return tile_list
# Iterates through the s3 locations and makes a txt file of tiles for each one
for i, tile_set in enumerate(list_of_tile_dirs):
# out = Popen(['aws', 's3', 'ls', set1, '--no-sign-request'], stdout=PIPE, stderr=STDOUT)
out = Popen(['aws', 's3', 'ls', tile_set], stdout=PIPE, stderr=STDOUT)
stdout, stderr = out.communicate()
# Writes the output string to a text file for easier interpretation
set1_tiles = open(f'tile_set_{i}.txt', "wb")
set1_tiles.write(stdout)
set1_tiles.close()
# Empty lists for filling with tile ids
file_list_set = []
# The list of text files with tile info from s3
tile_set_txt_list = glob.glob('tile_set_*txt')
# Combines all tile text files into a single tile text file
# https://stackoverflow.com/a/13613375
with open('tile_set_consolidated.txt', 'w') as outfile:
for fname in tile_set_txt_list:
with open(fname) as infile:
outfile.write(infile.read())
# Iterates through the rows of the consolidated text file to get the tile ids and appends them to the list
with open('tile_set_consolidated.txt', 'r') as tile:
for line in tile:
num = len(line.strip("\n").split(" "))
tile_name = line.strip("\n").split(" ")[num - 1]
# Only tifs will be in the tile list
if '.tif' in tile_name:
tile_id = get_tile_id(tile_name)
file_list_set.append(tile_id)
# Tile list with tiles found in multiple lists removed, so now duplicates are gone
unique_tiles = list(set(file_list_set))
# Converts the set to a pandas dataframe to put the tiles in the correct order
df = pd.DataFrame(unique_tiles, columns=['tile_id'])
df = df.sort_values(by=['tile_id'])
# Converts the pandas dataframe back to a Python list
unique_tiles_ordered_list = df.tile_id.tolist()
# Removes the text files with the lists of tiles
tile_set_txt_list = glob.glob('tile_set_*txt') # Adds the consolidated tile txt to the list
for i in tile_set_txt_list:
os.remove(i)
print_log(f'There are {len(unique_tiles_ordered_list)} unique tiles in {len(list_of_tile_dirs)} s3 folders ({len(file_list_set)} tiles overall)')
return unique_tiles_ordered_list
# Counts the number of tiles in a folder in s3
def count_tiles_s3(source, pattern=None):
## For an s3 folder in a bucket using AWSCLI
# Captures the list of the files in the folder
# out = Popen(['aws', 's3', 'ls', source, '--no-sign-request'], stdout=PIPE, stderr=STDOUT)
out = Popen(['aws', 's3', 'ls', source], stdout=PIPE, stderr=STDOUT)
stdout, stderr = out.communicate()
# Writes the output string to a text file for easier interpretation
tile_list_name = "tiles.txt"
tile_file = open(os.path.join(cn.docker_tmp, tile_list_name), "wb")
tile_file.write(stdout)
tile_file.close()
file_list = []
if 'gfw-data-lake' in source:
#TODO: Change this function to count tiles in gfw-data-lake
print_log("Not counting gfw-data-lake tiles... No good mechanism for it, sadly.")
return
# Iterates through the text file to get the names of the tiles and appends them to list
with open(os.path.join(cn.docker_tmp, tile_list_name), 'r') as tile:
for line in tile:
num = len(line.strip("\n").split(" "))
tile_name = line.strip("\n").split(" ")[num - 1]
# For tcd, pixel area, and loss tiles (and their rewindowed versions),
# which have the tile_id after the the pattern
if pattern in [cn.pattern_tcd, cn.pattern_pixel_area, cn.pattern_loss]:
if tile_name.endswith('.tif'):
tile_id = get_tile_id(tile_name)
file_list.append(tile_id)
# If the counted tiles have to have a specific pattern
elif pattern != None:
if tile_name.endswith('{}.tif'.format(pattern)):
tile_id = get_tile_id(tile_name)
file_list.append(tile_id)
# If the counted tiles just have to be tifs
else:
if tile_name.endswith('.tif'):
tile_id = get_tile_id(tile_name)
file_list.append(tile_id)
# Count of tiles (ends in *tif)
return len(file_list)
# Gets the bounding coordinates of a tile
def coords(tile_id):
NS = tile_id.split("_")[0][-1:]
EW = tile_id.split("_")[1][-1:]
if NS == 'S':
ymax =-1*int(tile_id.split("_")[0][:2])
else:
ymax = int(str(tile_id.split("_")[0][:2]))
if EW == 'W':
xmin = -1*int(str(tile_id.split("_")[1][:3]))
else:
xmin = int(str(tile_id.split("_")[1][:3]))
ymin = str(int(ymax) - 10)
xmax = str(int(xmin) + 10)
return xmin, ymin, xmax, ymax
# General download utility. Can download individual tiles or entire folders depending on how many are in the input list
def s3_flexible_download(source_dir, pattern, dest, sensit_type, tile_id_list):
# For downloading all tiles in a folder when the list of tiles can't be specified
if tile_id_list == 'all':
s3_folder_download(source_dir, dest, sensit_type, pattern)
# For downloading test tiles (twenty or fewer).
elif len(tile_id_list) <= 20:
# Creates a full download name (path and file)
for tile_id in tile_id_list:
if pattern in [cn.pattern_tcd, cn.pattern_pixel_area, cn.pattern_loss]: # For tiles that do not have the tile_id first
source = f'{source_dir}{pattern}_{tile_id}.tif'
elif pattern in [cn.pattern_data_lake]:
source = f'{source_dir}{tile_id}.tif'
else: # For every other type of tile
source = f'{source_dir}{tile_id}_{pattern}.tif'
s3_file_download(source, dest, sensit_type)
# For downloading full sets of tiles
else:
s3_folder_download(source_dir, dest, sensit_type, pattern)
# Downloads all tiles in an s3 folder, adapating to sensitivity analysis type
# Source=source file on s3
# dest=where to download onto spot machine
# sensit_type = whether the model is standard or a sensitivity analysis model run
def s3_folder_download(source, dest, sensit_type, pattern = None):
# The number of tiles with the given pattern on the spot machine.
# Special cases are below.
local_tile_count = len(glob.glob(f'*{pattern}*.tif'))
# For data-lake tiles, which have a different pattern on the ec2 instance from s3
if pattern == cn.pattern_data_lake:
if source == cn.gain_dir:
ec2_pattern = cn.pattern_gain_ec2
elif source == cn.datalake_pf_agc_rf_dir:
ec2_pattern = cn.pattern_pf_rf_agc_ec2
elif source == cn.datalake_pf_agcbgc_rf_dir:
ec2_pattern = cn.pattern_pf_rf_agcbgc_ec2
elif source == cn.datalake_pf_agc_sd_dir:
ec2_pattern = cn.pattern_pf_sd_agc_ec2
elif source == cn.datalake_pf_agcbgc_sd_dir:
ec2_pattern = cn.pattern_pf_sd_agcbgc_ec2
elif source == cn.datalake_pf_simplename_dir:
ec2_pattern = cn.pattern_planted_forest_type
elif source == cn.datalake_pf_estab_year_dir:
ec2_pattern = cn.pattern_planted_forest_estab_year
local_tile_count = len(glob.glob(f'*{ec2_pattern}*.tif'))
print_log(f'There are {local_tile_count} tiles on the spot machine with the pattern {ec2_pattern}')
# For tile types that have the tile_id after the pattern
if pattern in [cn.pattern_tcd, cn.pattern_pixel_area, cn.pattern_loss]:
local_tile_count = len(glob.glob(f'{pattern}*.tif'))
print_log(f'There are {local_tile_count} tiles on the spot machine with the pattern {pattern}')
# Changes the path to download from based on the sensitivity analysis being run and whether that particular input
# has a sensitivity analysis path on s3
if sensit_type != 'std':
# Creates the appropriate path for getting sensitivity analysis tiles
source_sens = source.replace('standard', sensit_type)
print_log(f'Attempting to change source directory {source} to {source_sens} to reflect sensitivity analysis')
# Counts how many tiles are in the sensitivity analysis source s3 folder
s3_count_sens = count_tiles_s3(source_sens)
print_log(f'There are {s3_count_sens} tiles in sensitivity analysis folder {source_sens} with the pattern {pattern}')
# Counts how many tiles are in the standard model source s3 folder
s3_count_std = count_tiles_s3(source)
print_log(f'There are {s3_count_std} tiles in standard model folder {source} with the pattern {pattern}')
# Decides which source folder to use the count from: standard model or sensitivity analysis.
# If there are sensitivity analysis tiles, that source folder should be used.
# Otherwise, the standard folder should be used.
if s3_count_sens != 0:
s3_count = s3_count_sens
source_final = source_sens
else:
s3_count = s3_count_std
source_final = source
# If there are as many tiles on the spot machine with the relevant pattern as there are on s3, no tiles are downloaded
if local_tile_count == s3_count:
print_log(f'Tiles with pattern {pattern} are already on spot machine. Not downloading.', "\n")
return
# If there appears to be a full set of tiles in the sensitivity analysis folder (7 is semi arbitrary),
# the sensitivity folder is downloaded
if s3_count > 7:
print_log(f'Source directory used: {source_final}')
cmd = ['aws', 's3', 'cp', source_final, dest, '--no-sign-request', '--exclude', '*tiled/*',
'--exclude', '*geojason', '--exclude', '*vrt', '--exclude', '*csv', '--no-progress', '--recursive']
# cmd = ['aws', 's3', 'cp', source_final, dest, '--no-sign-request', '--exclude', '*tiled/*',
# '--exclude', '*geojason', '--exclude', '*vrt', '--exclude', '*csv', '--recursive']
log_subprocess_output_full(cmd)
print_log("\n")
# If there are fewer than 7 files in the sensitivity folder (i.e., either folder doesn't exist or it just has
# a few test tiles), the standard folder is downloaded.
# This can happen despite it being a sensitivity run because this input file type doesn't have a sensitivity version
# for this date.
else:
print_log(f'Source directory used: {source}')
cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--exclude', '*tiled/*',
'--exclude', '*geojason', '--exclude', '*vrt', '--exclude', '*csv', '--no-progress', '--recursive']
# cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--exclude', '*tiled/*',
# '--exclude', '*geojason', '--exclude', '*vrt', '--exclude', '*csv', '--recursive']
log_subprocess_output_full(cmd)
print_log("\n")
# For the standard model, the standard folder is downloaded.
else:
# Counts how many tiles are in the source s3 folder
if pattern == cn.pattern_data_lake:
s3_count = count_tiles_s3(source, pattern=ec2_pattern)
#print_log(f'There are {s3_count} tiles at {source} with the pattern {ec2_pattern}')
else:
s3_count = count_tiles_s3(source, pattern=pattern)
print_log(f'There are {s3_count} tiles at {source} with the pattern {pattern}')
# If there are as many tiles on the spot machine with the relevant pattern as there are on s3, no tiles are downloaded
if local_tile_count == s3_count:
print_log(f'Tiles with pattern {pattern} are already on spot machine. Not downloading.', "\n")
return
# Downloads tile sets from the gfw-data-lake.
# They need a special process because they don't have a tile pattern on the data-lake,
# so I have to download them into their own folder and then give them a pattern while moving them to the main folder
if 'gfw-data-lake' in source:
print_log(f'Downloading tiles with pattern {ec2_pattern}...')
# Deletes special folder for downloads from data-lake (if it already exists)
if os.path.exists(os.path.join(dest, 'data-lake-downloads')):
os.rmdir(os.path.join(dest, 'data-lake-downloads'))
# Special folder for the tile set that doesn't have a pattern when downloaded
os.mkdir(os.path.join(dest, 'data-lake-downloads'))
cmd = ['aws', 's3', 'cp', source, os.path.join(dest, 'data-lake-downloads'),
'--request-payer', 'requester', '--exclude', '*xml',
'--exclude', '*geojson', '--exclude', '*vrt', '--exclude', '*csv', '--no-progress', '--recursive']
log_subprocess_output_full(cmd)
# Copies pattern-less tiles from their special folder to main tile folder and renames them with
# pattern along the way
print_log("Copying tiles to main tile folder...")
for filename in os.listdir(os.path.join(dest, 'data-lake-downloads')):
move(os.path.join(dest, f'data-lake-downloads/{filename}'),
os.path.join(cn.docker_tile_dir, f'{filename[:-4]}_{ec2_pattern}.tif'))
# Deletes special folder for downloads from data-lake
os.rmdir(os.path.join(dest, 'data-lake-downloads'))
print_log(f'data-lake tiles with pattern {ec2_pattern} copied to main tile folder...')
# The --no-sign-request in the else statement below was causing the following error when trying to download the 1km drivers:
# "An error occurred (AccessDenied) when calling the GetObject operation: Access Denied"
#TODO update this when we move 1km drivers source to gfw-data-lake after API ingestion
elif 'drivers_of_loss' in source:
print_log(f'Tiles with pattern {pattern} are not on spot machine. Downloading...')
cmd = ['aws', 's3', 'cp', source, dest, '--exclude', '*tiled/*',
'--exclude', '*geojson', '--exclude', '*vrt', '--exclude', '*csv', '--no-progress', '--recursive']
log_subprocess_output_full(cmd)
# Downloads non-data-lake inputs
else:
print_log(f'Tiles with pattern {pattern} are not on spot machine. Downloading...')
cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--exclude', '*tiled/*',
'--exclude', '*geojson', '--exclude', '*vrt', '--exclude', '*csv', '--no-progress', '--recursive']
# cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--exclude', '*tiled/*',
# '--exclude', '*geojson', '--exclude', '*vrt', '--exclude', '*csv', '--recursive']
log_subprocess_output_full(cmd)
print_log("\n")
# Downloads individual tiles from s3
# Source=source file on s3
# dest=where to download onto spot machine
# sensit_type = whether the model is standard or a sensitivity analysis model run
def s3_file_download(source, dest, sensit_type):
# Retrieves the s3 directory and name of the tile from the full path name
dir = get_tile_dir(source)
file_name = get_tile_name(source)
try:
tile_id = get_tile_id(file_name)
except:
pass
# Changes the file to download based on the sensitivity analysis being run and whether that particular input
# has a sensitivity analysis path on s3.
# Files that have standard and sensitivity analysis variants are handled differently from ones without variants
# Hierarchy for getting tiles (start with #1, end with #4):
# 1. Use sensitivity tile if already downloaded
# 2. Download sensitivity if it exists
# 3. Use standard tile if already downloaded
# 4. Download standard tile if it exists
if sensit_type != 'std' and 'standard' in dir:
# Creates directory and file names according to sensitivity analysis type
dir_sens = dir.replace('standard', sensit_type)
file_name_sens = file_name[:-4] + '_' + sensit_type + '.tif'
# Doesn't download the tile if sensitivity version is already on the spot machine
print_log(f'Option 1: Checking if {file_name_sens} is already on spot machine...')
if os.path.exists(file_name_sens):
print_log(f' Option 1 success: {file_name_sens} already downloaded', "\n")
return
else:
print_log(f' Option 1 failure: {file_name_sens} is not already on spot machine.')
print_log(f'Option 2: Checking for sensitivity analysis tile {dir_sens[15:]}/{file_name_sens} on s3...')
# If not already downloaded, first tries to download the sensitivity analysis version
# cmd = ['aws', 's3', 'cp', '{0}/{1}'.format(dir_sens, file_name_sens), dest, '--no-sign-request', '--only-show-errors']
cmd = ['aws', 's3', 'cp', '{0}/{1}'.format(dir_sens, file_name_sens), dest, '--only-show-errors']
log_subprocess_output_full(cmd)
if os.path.exists(file_name_sens):
print_log(f' Option 2 success: Sensitivity analysis tile {dir_sens}/{file_name_sens} found on s3 and downloaded', "\n")
return
else:
print_log(f' Option 2 failure: Tile {dir_sens}/{file_name_sens} not found on s3. Looking for standard model source...')
# Next option is to use standard version of tile if on spot machine.
# This can happen despite it being a sensitivity run because this input file doesn't have a sensitivity version
# for this date.
print_log(f'Option 3: Checking if standard version {file_name} is already on spot machine...')
if os.path.exists(file_name):
print_log(f' Option 3 success: {file_name} already downloaded', "\n")
return
else:
print_log(f' Option 3 failure: {file_name} is not already on spot machine. ')
print_log(f'Option 4: Looking for standard version of {file_name} to download...')
# If not already downloaded, final option is to try to download the standard version of the tile.
# If this doesn't work, the script throws a fatal error because no variant of this tile was found.
# cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--only-show-errors']
cmd = ['aws', 's3', 'cp', source, dest, '--only-show-errors']
log_subprocess_output_full(cmd)
if os.path.exists(file_name):
print_log(f' Option 4 success: Standard tile {source} found on s3 and downloaded', "\n")
return
else:
print_log(f' Option 4 failure: Tile {source} not found on s3. Tile not found but it seems it should be. Check file paths and names.', "\n")
# If not a sensitivity run or a tile type without sensitivity analysis variants, the standard file is downloaded
# Special download procedures for gfw-data-lake datasets (tree cover gain, planted forests) because the tiles have no pattern, just an ID.
# These tiles are renamed as they are downloaded to get a pattern added to them.
else:
if 'gfw-data-lake' in source:
if dir == cn.gain_dir[:-1]: # Delete last character of gain_dir because it has the terminal / while dir does not have terminal /
ec2_file_name = f'{tile_id}_{cn.pattern_gain_ec2}.tif'
elif dir == cn.datalake_pf_agc_rf_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_pf_rf_agc_ec2}.tif'
elif dir == cn.datalake_pf_agcbgc_rf_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_pf_rf_agcbgc_ec2}.tif'
elif dir == cn.datalake_pf_agc_sd_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_pf_sd_agc_ec2}.tif'
elif dir == cn.datalake_pf_agcbgc_sd_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_pf_sd_agcbgc_ec2}.tif'
elif dir == cn.datalake_pf_simplename_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_planted_forest_type}.tif'
elif dir == cn.datalake_pf_estab_year_dir[:-1]:
ec2_file_name = f'{tile_id}_{cn.pattern_planted_forest_estab_year}.tif'
else:
print_log(f' Warning: {source} is located in the gfw-data-lake bucket but has not been assigned a file name pattern for download. Please update the constants_and_names.py file and the s3_file_download function in the universal_util.py file to include this dataset for download.')
return
gfw_data_lake_download(source, dest, dir, file_name, ec2_file_name)
return
# All other tiles besides gfw-data-lake datasets
else:
print_log(f'Option 1: Checking if {file_name} is already on spot machine...')
if os.path.exists(os.path.join(dest, file_name)):
print_log(f' Option 1 success: {os.path.join(dest, file_name)} already downloaded', "\n")
return
else:
print_log(f' Option 1 failure: {file_name} is not already on spot machine.')
print_log(f'Option 2: Checking for tile {source} on s3...')
# If the tile isn't already downloaded, download is attempted
source = os.path.join(dir, file_name)
# cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--only-show-errors']
cmd = ['aws', 's3', 'cp', source, dest, '--only-show-errors']
log_subprocess_output_full(cmd)
if os.path.exists(os.path.join(dest, file_name)):
print_log(f' Option 2 success: Tile {source} found on s3 and downloaded', "\n")
return
else:
print_log(f' Option 2 failure: Tile {source} not found on s3. Tile not found but it seems it should be. Check file paths and names.', "\n")
def gfw_data_lake_download(source, dest, dir, file_name, ec2_file_name):
print_log(f'Option 1: Checking if {ec2_file_name} is already on spot machine...')
if os.path.exists(os.path.join(dest, ec2_file_name)):
print_log(f' Option 1 success: {os.path.join(dest, ec2_file_name)} already downloaded', "\n")
return
else:
print_log(f' Option 1 failure: {ec2_file_name} is not already on spot machine.')
print_log(f'Option 2: Checking for tile {source} on s3...')
# If the tile isn't already downloaded, download is attempted
source = os.path.join(dir, file_name)
# cmd = ['aws', 's3', 'cp', source, dest, '--no-sign-request', '--only-show-errors']
cmd = ['aws', 's3', 'cp', source, f'{dest}{ec2_file_name}',
'--request-payer', 'requester', '--only-show-errors']
log_subprocess_output_full(cmd)
if os.path.exists(os.path.join(dest, ec2_file_name)):
print_log(f' Option 2 success: Tile {source} found on s3 and downloaded', "\n")
return
else:
print_log(
f' Option 2 failure: Tile {source} not found on s3. Tile not found but it seems it should be. Check file paths and names.',
"\n")
# Uploads all tiles of a pattern to specified location
def upload_final_set(upload_dir, pattern):
print_log(f'Uploading tiles with pattern {pattern} to {upload_dir}')
cmd = ['aws', 's3', 'cp', cn.docker_tile_dir, upload_dir, '--exclude', '*', '--include', '*{}*tif'.format(pattern),
'--recursive', '--no-progress']
try:
log_subprocess_output_full(cmd)
print_log(f' Upload of tiles with {pattern} pattern complete!')
except:
print_log('Error uploading output tile(s)')
# Uploads the log as each model output tile set is finished
upload_log()
# Uploads tile to specified location
def upload_final(upload_dir, tile_id, pattern):
file = '{}_{}.tif'.format(tile_id, pattern)
print_log("Uploading {}".format(file))
# cmd = ['aws', 's3', 'cp', file, upload_dir, '--no-sign-request', '--no-progress']
cmd = ['aws', 's3', 'cp', file, upload_dir, '--no-progress']
try:
log_subprocess_output_full(cmd)
except:
print_log('Error uploading output tile')
# This version of checking for data is bad because it can miss tiles that have very little data in them.
# But it takes less memory than using rasterio, so it's good for local tests.
# This method creates a tif.aux.xml file that I tried to add a line to delete but couldn't get to work.
def check_and_delete_if_empty_light(tile_id, output_pattern):
tile_name = f'{tile_id}_{output_pattern}.tif'
# Source: http://gis.stackexchange.com/questions/90726
# Opens raster and chooses band to find min, max
gtif = gdal.Open(tile_name)
srcband = gtif.GetRasterBand(1)
stats = srcband.GetStatistics(True, True)
print_log(" Tile stats = Minimum=%.3f, Maximum=%.3f, Mean=%.3f, StdDev=%.3f" % (stats[0], stats[1], stats[2], stats[3]))
if stats[0] != 0:
print_log(f' Data found in {tile_name}. Keeping file...')
else:
print_log(f' Data not found in {tile_name}. Deleting...')
os.remove(tile_name)
# Using this gdal data check method creates a tif.aux.xml file that is unnecessary.
# This does not work, however; it returns an error that there is no such file or directory.
# os.remove("{0}{1}.aux.xml".format(cn.docker_base_dir, tile_name))
# This version of checking for data in a tile is more robust
def check_for_data(tile):
with rasterio.open(tile) as img:
msk = img.read_masks(1).astype(bool)
if msk[msk].size == 0:
# print_log(f"Tile {tile} is empty")
return True
else:
# print_log(f"Tile {tile} is not empty")
return False
def check_and_delete_if_empty(tile_id, output_pattern):
tile_name = f'{tile_id}_{output_pattern}.tif'
# Only checks for data if the tile exists
if not os.path.exists(tile_name):
print_log(f'{tile_name} does not exist. Skipping check of whether there is data.')
return
print_log(f'Checking if {tile_name} contains any data...')
no_data = check_for_data(tile_name)
if no_data:
print_log(f' Data not found in {tile_name}. Deleting...')
os.remove(tile_name)
else:
print_log(f' Data found in {tile_name}. Keeping tile to copy to s3...')