-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path05c_big-data.qmd
927 lines (643 loc) · 38.6 KB
/
05c_big-data.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
---
title: "Data too big for memory"
subtitle: "Working efficiently with Data"
---
We've a look at approaches to speed up working with data in memory more efficiently. But what if the data we want to work with just cannot fit into memory?
There a number of approaches to deal with this situation which will depend on what exactly we need from the data to perform our operation.
For example:
- We might need to process the whole dataset but can somehow split our computation to work on smaller chunks of the data i.e. batch processing. Perhaps our data is split in many individual csv files, in which case we could write a function that works on a single file at a time and use an apply function to process all files and agreggate any result. This sort of processing is highly amenable to parallelisation.
- We might need a subset of our data which we define through filtering, selecting and other aggregating functions. In this situation, storing our data in a database and using SQL to query it for the subset of data of interest is our best option.
- A harder problem is when we indeed require all data in memory. Here our choice might require using distributed memory between many machines (e.g. on an HPC platform using MPI) as well as considering options like using single precision floats and mathematical optimisations of our algorithms.
# Databases
Databases are an appropriate choice if you have large amounts of data that can't fit into memory which you only require subsets from that you can extract using queries.
There are many types of databases which are beyond the scope of this workshop. What we we will focus on here is simple relational databases that store tabular data in single flat files (a.k.a. embedded databases) as opposed to databases which are run through a server like MySQL, Microsoft SQL Server PostgresSQL or which do not store tabular data, for example MongoDB which stores data as documents.
We also focus on databases that can be queried using **SQL**. SQL (which stands for Structured Query Language) is a standardized programming language that is used to manage [relational databases](https://www.techtarget.com/searchdatamanagement/definition/relational-database) and perform various operations on the data in them.
It's good to have an idea of SQL basics when interacting with databases, but in R, many of the `dplyr` verbs are inspired by SQL commands while package `dbplyr` can take dplyr operations and translate them to SQL for querying databases as you would data.frames or tibbles.
As such we can build up our queries using a connection to a database and only collect our data explicitly when we are ready for R to execute the query.
## SQLite
> [SQLite](https://sqlite.org/index.html) is a C-language library that implements a [small](https://sqlite.org/footprint.html), [fast](https://sqlite.org/fasterthanfs.html), [self-contained](https://sqlite.org/selfcontained.html), [high-reliability](https://sqlite.org/hirely.html), [full-featured](https://sqlite.org/fullsql.html), SQL database engine.
>
> The SQLite [file format](https://sqlite.org/fileformat2.html) is stable, cross-platform, and backwards compatible.
>
> SQLite [source code](https://sqlite.org/src) is in the [public-domain](https://sqlite.org/copyright.html) and is free to everyone to use for any purpose.
Let's start our experimentation by creating a simple SQLite database with a single table.
The data we will use is contained in `data/synthpop_10000000.parquet` and represents characteristics of 10,000,000 individuals from a synthetic population.
I know this section is about data too big for memory and this is not an approach you can use for data truly larger than memory). But because I want to benchmark against in memory data and can fit it into memory on my machine, I will actually load it into memory and write it directly to the database. However, I also show a way to populate it in batches if you find the file is overloading your memory. You can also choose to use one of the smaller synthpop parquet files (e.g `data/synthpop_1000000.parquet` which contains 1,000,000 rows).
So let's load our data and have a look at it:
```{r}
data <- arrow::read_parquet("data/synthpop_10000000.parquet")
head(data)
```
This is quite a large dataframe.
```{r}
pryr::object_size(data)
```
Next let's load some useful libraries, create our database and store the connection to said database in a variable.
```{r}
library(DBI)
library(dplyr)
```
```{r}
con <- dbConnect(drv = RSQLite::SQLite(), "data/db.sqlite")
```
The above command creates an SQLite database at path `data/db.sqlite`.
```{r}
con
```
We can also see that the `con` object is tiny, only `r pryr::object_size(con)`. That's because it's just a connection to the database and does not contain any data itself.
```{r}
pryr::object_size(con)
```
Now let's go ahead and write our data to a table in our database. For this we can use `DBI`'s function `dbWriteTable`. This will both create the table in our database and also write the data to it. The arguments we need to provide are:
- `conn` (the first argument) where we provide the connection to the database we want to write to.
- `name` the name of the table we want to to create.
- `value` the object containing the data we want to write to the table. This must be a `data.frame` or an object coercible to a `data.frame`.
```{r}
#| eval: false
dbWriteTable(con, name = "synthpop", data)
```
Once writing the table is complete (this might take a little while), we can do some initial checks on our data using some more `DBI` functions:
- `dbListTables` lists the names of all tables in the database
- `dbListFields` lists all fields in a given table (`"synthpop"`)
```{r}
dbListTables(con)
dbListFields(con, "synthpop")
```
### Chunked method to populate the database
If the data is too big to load into memory and then write to a database, an option is to populate it in chunks. This involves using `readr::read_csv_chunked` to populate a database in batches detailed in the [following blogpost](https://www.michaelc-m.com/manual_posts/2022-01-27-big-CSV-SQL.html) by Michael Culshaw-Maurer.
Here's some example code of how we could populate our database from the 1,000,000 row csv file we created in the Data Input/Output section:
```{r}
#| eval: false
readr::read_csv_chunked("data/write/synthpop_1000000.csv",
callback = function(chunk, dummy){
dbWriteTable(con, "synthpop", chunk, append = T)
},
chunk_size = 10000)
```
## Interacting with database through `dplyr` & `dbplyr`
`dbplyr` is the database backend for [dplyr](https://dplyr.tidyverse.org/). It allows you to use remote database tables as if they are in-memory data frames by automatically converting dplyr code into SQL.
All dplyr calls are evaluated lazily, generating SQL that is only sent to the database when you request the data.
So let's start using our connection to access some data. For that we can use function `tbl()`. Just as `dbConnect()` opens a connection to a database, we can think of `tbl()` as opening a connection to a single table in the database, in this case \`"synthpop"\`.
```{r}
tbl <- tbl(con, "synthpop")
```
If we print `tbl` we can see all columns in the database and the first 10 rows, which looks a bit like printing a tibble, but if we look at the header of information above the data, we can see the database source as well as `[?? x 12]` in the dimensions summary. That's because `tbl` does not contain the full table data, just a connection to it, and therefore is not aware of the number of rows of the complete table.
```{r}
tbl
```
Let's have a look at the `tbl` class. The important class identifiers are `"tbl_dbi"` and `"tbl_sql"` which indicate any data manipulation on the tbl will be translated to SQL, will be lazy and will return another `"tbl_dbi"`, not the actual result of the query.
```{r}
class(tbl)
```
### Getting the number of rows of a database table
So what if we did want to know the number of rows of the `"synthpop"` table to double check we have written in fully?
We might try a familiar R function, `nrow()`:
```{r}
nrow(tbl)
```
But this doesn't work! That's because there is no translation of R function `nrow()` to SQL.
We'll need to frame our request as something that can be translated into an SQL query by `dbplyr`.
```{r}
tbl %>%
summarize(n = n())
```
`summarise(n())` get's translated to an SQL `COUNT` function, which is an [SQL aggregate function](https://www.sqlite.org/lang_aggfunc.html) that returns one value, hence what is returned to us is another `tbl_dbi` of 1 x 1 dimensions.
::: callout-note
To learn more about which R functions are translated by dbplyr to SQL have a look at the package's vignettes on [Verb](https://dbplyr.tidyverse.org/articles/translation-verb.html) and [Function](https://dbplyr.tidyverse.org/articles/translation-function.html) translation.
:::
We can inspect the SQL statement generated by `dbplyr` by piping the query to `dplyr`'s `show_query()` function.
```{r}
tbl %>%
summarize(n = n()) %>%
show_query()
```
Remember that just running the query returns another `tbl_dbi`. To be able to compute on it in R need to collect it.
```{r}
db_nrows <- tbl %>%
summarize(n = n()) %>%
collect()
db_nrows
pull(db_nrows) == nrow(data)
```
We have now checked that our data was fully written to our database table.
### Filtering, selecting and summarising
As mentioned, many of `dplyr` verbs as well as number of aggregating and arithmetic functions can be translated to SQL by `dbplyr`. For greatest it's good to try and perform as many operations in SQL before collecting the data. These are performed by the databases SQL engine which is generally more efficient when working with large data.
Let's try a few examples.
Let's put together a query that filter values in a few columns and then selects a few columns to return:
```{r}
filter(tbl,
age > 30,
sex == "MALE",
sport == TRUE
) %>%
select(income, age, marital)
```
Again, running the query without collecting does not return the full query result but can help check what your query is doing.
```{r}
filter(tbl,
age > 30,
sex == "MALE",
sport == TRUE) %>%
select(income, age, marital) %>%
show_query()
```
And adding show_query() to the end of the pipe shows the SQL translation of the query.
#### Query 1
Let's try another one:
```{r}
filter(tbl,
age > 50L & age < 60L,
income < 300) %>%
arrange(bmi, age, income, nociga, sex)
```
```{r}
filter(tbl,
age > 50L & age < 60L,
income < 300) %>%
arrange(bmi, age, income, nociga, sex) %>%
show_query()
```
Let's wrap this query in a function we can use to benchmark how long it takes to execute.
```{r}
query_1 <- function(tbl) {
filter(tbl,
age > 50L & age < 60L,
income < 300) %>%
arrange(bmi, age, income, nociga, sex)
}
```
#### Query 2
Let's put together one more example to use for benchmarking which includes some aggregating and arithmetic functions.
```{r}
filter(tbl,
age > 65L,
sex == "MALE",
sport == TRUE,
!is.na(income),
!is.na(marital)) %>%
group_by(marital) %>%
summarise(min_income = min(income),
max_income = max(income),
mean_income = mean(income))
```
Let's look at the SQL translation:
```{r}
filter(tbl,
age > 65L,
sex == "MALE",
sport == TRUE,
!is.na(income),
!is.na(marital)) %>%
group_by(marital) %>%
summarise(min_income = min(income),
max_income = max(income),
mean_income = mean(income)) %>%
show_query()
```
And again, wrap it in a function:
```{r}
query_2 <- function(tbl) {
filter(tbl,
age > 65L,
sex == "MALE",
sport == TRUE,
!is.na(income),
!is.na(marital)) %>%
group_by(marital) %>%
summarise(min_income = min(income),
max_income = max(income),
mean_income = mean(income)) %>%
arrange(marital)
}
```
OK, let's now run some starting benchmarks against running the same query on the data in memory:
##### Query 1
```{r}
bench::mark(
df = query_1(data),
sqlite = query_1(tbl) %>%
collect(),
check = FALSE
)
```
*Note I've turned off checking for this benchmark because of the difference in how `dplyr` handles `NA`s when arranging data in data.frames (`NA`'s at the end) vs how SQLite's engine does (`NA`'s at the top).*
##### Query 2
```{r}
bench::mark(
df = query_2(data),
sqlite = query_2(tbl) %>%
collect()
)
```
In this first test of performance, databases come out slower. That shouldn't surprise us though. Working with in memory data (that still allows for the memory required for computation) will always be faster because there is no **I/O** cost to the query (once it has been loaded into memory), whereas executing and collecting the query from the database involves returning the data from disk. We can see though that working with a database is much more memory efficient, which given the topic of the chapter is working with data that does not fit into memory, shows it is a good approach for this use case.
### Indexing
Indexes are a way to improve the performance of your read queries, particularly ones with filters (`WHERE`) on them. They're data structures that exist in your database engine, outside of whatever table they work with, that point to data you're trying to query.
They work similar to how indexes in the back of a book do. They contain the ordered values of the column you create them on along with information about the location of the rows containing each value in the original table.
So just like you might use an index to find a recipe instead of flicking through an entire recipe book, database indexes allow you to look up the values in columns and the location of the rows containing them in your original table without scanning the full table. A well crafted index can produce impressive query speed ups!
This does come at a cost. They take up space within your database, increasing it's overall size, and they also slow down updating any tables containing indexes as the indexes must also be updated. Crafting indexes is also a bit of an art, as creating an index that speeds up a given query might actually slow another one down!
The details of good indexing strategy are a big topic that is well beyond the scope of this workshop.
```{r}
#| eval: false
file.copy(
from = "data/db.sqlite",
to = "data/db_idx.sqlite"
)
```
```{r}
#| echo: false
#| eval: true
file.copy(
from = "data/db.sqlite",
to = "data/db_idx.sqlite",
overwrite = TRUE
)
```
Let's connect to the database we're going to index as well as create a connection to the `"synthpop"` table.
```{r}
con_idx <- dbConnect(RSQLite::SQLite(), "data/db_idx.sqlite")
tbl_idx <- tbl(con_idx, "synthpop")
```
Now **let's create our first index to try and improve the performance of our select (`WHERE`) operation in query 1.**
#### Query 1
Let's remind ourself what the query is actually doing. This time we'll use another `dplyr` function, `explain()`.
```{r}
query_1(tbl_idx) %>%
explain()
```
`explain()` translates to the `EXPLAIN QUERY PLAN` command in SQLite databases. It includes the `SQL` translation of our query but is used primarily to obtain a high-level description of the strategy or plan that SQLite uses to implement a specific SQL query. Most significantly, **`EXPLAIN QUERY PLAN` reports on the way in which the query uses database indices**. The relevant information is found in the `detail` column of the bottom table of the output.
The output of piping query 1 into `explain()` indicates that the SQLite engine is using a full scan of the `"synthpop"` table to locate the rows matching our select (`WHERE`) condition. It then uses a temporary Sorting B-Tree for ordering the data. When you use `ORDER BY` without an index on the column to be sorted, SQLite builds up a temporary data structure that contains the sorted results each time the query is executed. That data structure will use up space in memory (or on disk, depending on the circumstances) and will be torn down after the query is executed.
::: callout-tip
To find out more about the SQLite `EXPLAIN QUERY PLAN` command, head to [SQLite doumentation](https://www.sqlite.org/eqp.html).
:::
Time to create an index. To do so we use function `dbExecute()` on our database connection and pass it a character string of the SQL statement to create an index.
```{r}
dbExecute(con_idx,
"CREATE INDEX synthpop_age_inc_idx ON synthpop (age, income);")
```
Let's break down the statement:
- `CREATE INDEX` is the command to create an index.
- `synthpop_age_inc_idx` is the name of the index we want to create. It's good practice to include an indication of the table as well as the columns used to create the index in name of the index.
- `ON synthpop` indicates that the index is being created on table `synthpop`.
- `(age, income)` the parenthesis indicates the columns we want to include in our index. Indexes can be created using one or multiple columns. Here, because our filter statement includes seraching for values on both `age` and `income`, we include both columns for better performance. Note however that this inevitably takes up more disk space and more time to create (and in future update) the index.
OK, let's check the query plan to see if SQLite plans to use our index:
```{r}
query_1(tbl_idx) %>%
explain()
```
Indeed! The query is not `SCAN`ning the full table anymore but is instead using our index to `SEARCH` for values in the index matching our filter statement.
Let's see whether we've improved our query's performance:
```{r}
bench::mark(
no_index = query_1(tbl) %>%
collect(),
index = query_1(tbl_idx) %>%
collect()
)
```
Indeed we have, roughly a 5x speed up. Not bad! But we could do better!
Because indexes are sorted data structures and their benefit comes from how binary search works, it's important to ensure that our indexed columns have what is called "high cardinality". All this means is that the indexed data has a lot of uniqueness.
While our `age` column has `r length(unique(data$age))` unique values, our income column has `r length(unique(data$income))`, i.e. income has higher cardinality than income.
A multi-column index will initially use the first column to search, then the second an so on. So instead of putting `age` at the front of our index, let's drop our first index using the `DROP INDEX` command and let's instead create a new index with income first:
```{r}
dbExecute(con_idx,
"DROP INDEX synthpop_age_inc_idx;")
dbExecute(con_idx,
"CREATE INDEX synthpop_inc_age_idx ON synthpop (income, age);")
```
Let's inspect our query plan which reveals that, indeed, our index now searches through income first:
```{r}
query_1(tbl_idx) %>%
explain()
```
Let's run our benchmarks again:
```{r}
bench::mark(
no_index = query_1(tbl) %>%
collect(),
index = query_1(tbl_idx) %>%
collect()
)
```
Much better! We're now approaching a 10x speed up!
So, do you think we can speed up the query even more? What about the arrange part of the query?
You may have noticed that the `ORDER BY` part of the query is still using a temporary B-TREE.
```{r}
query_1(tbl_idx) %>%
explain()
```
An index can be used to speed up sorting only if the query allows to return the rows in the order in which they are stored in the index. **Because our index does not include many of the columns we are using in the sort operation, and most importantly, the first one (`bmi`) the index is ignored by `ORDER BY`.**
We might consider creating another index to take care of the `ORDER BY` operation and include all the columns involved in the order that we want them sorted.
```{r}
dbExecute(con_idx,
"CREATE INDEX synthpop_arrange1_idx ON synthpop (bmi, age, income, nociga, sex);")
```
Let's see if that improves performance:
```{r}
bench::mark(
no_index = query_1(tbl) %>%
collect(),
index = query_1(tbl_idx) %>%
collect()
)
```
Oh dear! The query is now much slower and not a huge improvement to our non-indexed database! What's going on?
Let's inspect our query plan:
```{r}
query_1(tbl_idx) %>%
explain()
```
Now what we see is that the engine is indeed using our `synthpop_arrange1_idx` index but is only using that one. Not only that, it is now performing a full `SCAN` of the arrange index table.
An important thing to note is that, in SQLite, **each table in the FROM clause of a query can use at most one index and SQLite strives to use at least one index on each table**. So it cannot use one index for the `WHERE` part of the query and another for the `ORDER BY` part.
In this case, the engine determines that the least costly query plan is to just use the `synthpop_arrange1_idx` index because all the information it needs is stored within and therefore does not require a lookup in the original `synthpop` table to retrieve further data. It knows the data is stored in the correct order but to perform the `WHERE` operation, it does need to scan the full index table.
But why does this in practice end up slower? That's because the `WHERE` operation actually returns a much smaller subset of the data. So optimising that part of the query and the using a B-TREE for sorting actually ends up much faster in practice. However, the query optimiser has no way of knowing this upfront (and may not be the case if the `WHERE` operation returns a much bigger subset), so concludes that (wrongly in our case) that using the `synthpop_arrange1_idx` index is most efficient.
So at least for this query, let's consider the `synthpop_arrange1_idx` index an drop it.
```{r}
dbExecute(con_idx,
"DROP INDEX synthpop_arrange1_idx;")
query_1(tbl_idx) %>%
explain()
```
Now the optimiser goes back to using the `synthpop_inc_age_idx` index.
#### Query 2
So we've made Query 1 faster but what about query 2?
Let's check whether it also helps with query 2:
```{r}
bench::mark(
no_index = query_2(tbl) %>%
collect(),
index = query_2(tbl_idx) %>%
collect()
)
```
Well that's not good! The index seems to have made query 2 a slower?! If we use `explain()` to dig into it we see it's still doing a full scan but now the optimiser has to also evaluate a potential query plan that might involve our `synthpop_inc_age_idx` index.
```{r}
query_2(tbl_idx) %>%
explain()
```
Let's create an index to improve the performance of query 2. Let's focus again on the `WHERE` part of the query.
We might start by creating an index using all columns involved in the order of decreasing cardinality:
```{r}
dbExecute(con_idx,
"CREATE INDEX synthpop_inc_age_mar_sex_sp_idx ON synthpop (income, age, marital, sex, sport);")
```
Let's check our query plan and benchmark:
```{r}
query_2(tbl_idx) %>%
explain()
bench::mark(
no_index = query_2(tbl) %>%
collect(),
index = query_2(tbl_idx) %>%
collect()
)
```
We see a small improvement. At least the query is not slower now!
::: callout-important
#### Indexing Take Aways:
Indexes can be a useful strategy for improving specific query perfomance. HOWEVER:
- We have only scraped the surface of the types of indexes available as well as how to determine when and how to deploy them.
- They are fiddly to create and can have unexpected effects on different queries.
- They take time to create and update and take up space of disk (our indexed database is now `r fs::file_size("data/db_idx.sqlite")` compared to `r fs::file_size("data/db.sqlite")` of our original database!
- Trying to create new indexes to optimise each new query quickly get out of hand and required a lot of knowledge/experimentation.
:::
But! The next section provides some useful perspective!
### DuckDB
While SQLite is ubiquitous in the world of embedded databases, and it supports complex analytical queries, **SQLite is primarily designed for fast online transaction processing (OLTP)**, employing row-oriented execution.
There is however a rather recent type of embedded (flat) database called [**DuckDB**](https://duckdb.org/).
DuckDB can be far more efficient for complex analytics queries on large amount of data from a database, more common in analytics workflow.
From the [DuckDB website](https://duckdb.org/why_duckdb):
> DuckDB is designed to support **analytical query workloads**, also known as **Online analytical processing (OLAP).**
>
> These workloads are characterized by complex, relatively long-running queries that process significant portions of the stored dataset, for example aggregations over entire tables or joins between several large tables.
>
> Changes to the data are expected to be rather large-scale as well, with several rows being appended, or large portions of tables being changed or added at the same time.
>
> DuckDB contains a [**columnar-vectorized query execution engine**]{.underline}, where queries are still interpreted, but a large batch of values (a "vector") are processed in one operation. This greatly reduces overhead present in traditional systems such as PostgreSQL, MySQL or SQLite which process each row sequentially
It also has a nice API to R handled through package `duckdb`. I highly recommend checking the DuckDB documentation to learn more about it's features, but in general, you can interact with DuckDB databases in R as you would any other database.
So let's create a DuckDB database with the same data and benchmark our queries against it.
Again we can use `dbConnect()` to both create a database using a `duckdb::duckdb()` driver and open a connection to it.
```{r}
con_duckdb <- dbConnect(duckdb::duckdb(), "data/db.duckdb")
con_duckdb
```
```{r}
#| eval: false
dbWriteTable(con_duckdb, "synthpop", data)
```
```{r}
dbListTables(con_duckdb)
dbListFields(con_duckdb, "synthpop")
```
```{r}
tbl_duckdb <- tbl(con_duckdb, "synthpop")
tbl_duckdb
```
## Benchmark Queries
Now let's go ahead and run our queries again, this time including running them on the `duckdb` database we just created.
```{r}
bench::mark(
df = query_1(data),
sqlite = query_1(tbl) %>%
collect(),
sqlite_idx = query_1(tbl_idx) %>%
collect(),
duckdb = query_1(tbl_duckdb) %>%
collect(),
check = FALSE
)
```
```{r}
bench::mark(
df = query_2(data),
sqlite = query_2(tbl) %>%
collect(),
sqlite_idx = query_2(tbl_idx) %>%
collect(),
duckdb = query_2(tbl_duckdb) %>%
collect()
)
```
Wow! DuckDB is much faster than SQLite, can compete with and beat an indexed SQL database and can be even faster than running the queries on in-memory data!! And still very memory efficient. And all this without even having to think about indexes!! 🤩 🎉
This is definitely a database type you should know about!
## Accessing data through the `arrow` package
**Arrow** is software development platform for building high performance applications. As we've seen already, The `arrow` R package provides functionality for fast reading and writing of flat files as well as more efficient binary file formats.
It **also provides functions for opening connections to files as well as directories of files**, much like we did with databases, and because it has deep integration with `dplyr`, it allows us to perform queries on out of memory data as we've been doing with our databases.
### Accessing single files as arrow tables
We can read in a single large csv, arrow or parquet file using the appropriate `arrow` function but instead of reading it in as a data.frame, we can use `as_data_frame = FALSE` to open it as an arrow table. Because of how Arrow allocates memory, arrow tables are much more memory efficient representations of tabular data that could mean data that won't fit into memory as an
```{r}
arrow_tbl <- arrow::read_parquet("data/synthpop_10000000.parquet",
as_data_frame = FALSE)
arrow_tbl
```
Many dplyr verbs can be used to interrogate this arrow table. To demonstrated let's execute query 1 on our data.
```{r}
query_1(arrow_tbl)
```
Just like with databases, the query does not return a tibble. We again need to `collect()` the results of our query for it be converted to a tibble:
```{r}
query_1(arrow_tbl) %>%
collect()
```
Given that the `arrow_tbl` is actually in memory, we can compare query execution time to the in memory `data`
```{r}
bench::mark(in_mem_csv = query_1(data),
arrow_tbl = query_1(arrow_tbl) %>%
collect(),
check = FALSE)
```
WOw! that's must faster than performing the query even on an in memory data.frame. Impressive!
### Accessing data as arrow datasets
Another way to access files through R is by opening them as a dataset with function `arrow::open_dataset()`.
We can open a single file or a whole directory of files, formatted in any of the formats arrow can handle.
This does not load the data into memory. Instead `open_dataset()` scans the content of the file(s) to identify the name of the columns and their data types.
#### Accessing single files as arrow datasets
Let's open a single file as a dataset first. To do so we supply the path to the files as well as the format it's stored in.
```{r}
arrow_dt_csv <- arrow::open_dataset("data/synthpop_10000000.parquet", format = "parquet")
bench::mark(
df = query_1(data),
sqlite = query_1(tbl) %>%
collect(),
duckdb = query_1(tbl_duckdb) %>%
collect(),
arrow_dt_csv = query_1(arrow_dt_csv) %>%
collect(),
check = FALSE
)
```
#### Accessing directories as arrow datasets
We can also use the same function to open a directory of files stored in the same format. This might be appropriate when your data generation involves creating data in batches that end up in separate files and for some reason you don't want to be writing them to a database.
The directory structure can help improve performance of queries too depending on how data is partitioned across directories. In some ways you can think of the physical partitioning as a physical index that can be used in a query to completely skip certain files.
Let's have a look at what this means by actually creating such a directory structure from our dataset.
First let's create a directory to partition it into. Then we can use function `arrow::write_dataset()` to write out our data partitioned according to any variables we specify in the `partitioning` argument. Here we choose to partition across age. Let's also write data out in efficient parquet files.
```{r}
#| eval: false
arrow::write_dataset(data,
path = "data/arrow_dataset",
format = "parquet",
partitioning = "age")
```
Let's use `fs::dir_tree()` to see the structure of the `arrow_dat` directory we just created:
```{r}
fs::dir_tree("data/arrow_dataset/")
```
As we can see, a folder has been created for each value of age and the rows where the original data matched that condition are contained in parquet files within.
The dataset directory is nonetheless still more efficient than the original csv.
```{r}
# parquet arrow data set
fs::dir_info("data/arrow_dataset", recurse = TRUE)$size %>% sum()
# original csv
fs::file_size("data/synthpop_10000000.csv")
```
Now that we've got a partitioned directory of our data, let's go ahead and open it as an arrow dataset.
```{r}
arrow_dir_dataset <- arrow::open_dataset("data/arrow_dataset", format = "parquet")
```
## Summary Benchmarks
```{r}
bench::mark(
df = query_1(data),
sqlite = query_1(tbl) %>%
collect(),
sqlite_idx = query_1(tbl_idx) %>%
collect(),
duckdb = query_1(tbl_duckdb) %>%
collect(),
arrow_tbl = query_1(arrow_tbl) %>%
collect(),
arrow_csv_dataset = query_1(arrow_dt_csv) %>%
collect(),
arrow_dir_dataset = query_1(arrow_dir_dataset) %>%
collect(),
check = FALSE
)
```
```{r}
bench::mark(
df = query_2(data),
sqlite = query_2(tbl) %>%
collect(),
sqlite_idx = query_2(tbl_idx) %>%
collect(),
duckdb = query_2(tbl_duckdb) %>%
collect(),
arrow_tbl = query_2(arrow_tbl) %>%
collect(),
arrow_csv_dataset = query_2(arrow_dt_csv) %>%
collect(),
arrow_dir_dataset = query_2(arrow_dir_dataset) %>%
collect(),
check = FALSE
)
```
::: callout-important
### Overall Take Aways
- DuckDB can be a very efficient database format for complex queries involving large amounts of data due to it's OLAP nature owing to it's columnar-vectorised operation engine.
- Indexing can improve queries in SQLite and other OLTP type databases. However they are not flexible, take a lot of knowledge and experimentation, increase disk space and can also reduce performance on other queries or if mis-applied.
- The arrow package provide another option for loading or opening connections to files or directories of data and has deep integration with dplyr for performing queries.
- Partitioning can improve querying directories of data as arrow datasets. they are however inflexible and represent a single physical index applied on the whole dataset.
- Arrow tables allows loading large datasets in a more memory efficient way and support very fast querying.
:::
# Batch processing
In the previous sections we were focusing on a narrow set of operations, in particular the efficiency of accessing, filtering, selecting, ordering and aggregating subsets of data from data too large to fit into memory. But often we need to perform some processing on the whole dataset, as we saw in the example of populating our database in batches.
Other times our analyses, for example fitting a model, might require the full dataset to produce a result which can be problematic even if we can just load our data in our memory as that may leaves us with little RAM to compute.
An option in this case would be to use algorithms that can compute on chunks or batches of the data. These algorithms are known as *external memory algorithms* (EMA), or *batch processing*.
Here's a simple example of how we might write a batch processing algorithm to calculate the mean across multiple files, specifically the parquet files we just created in `data/arrow_dataset/arrow_dat`.
```{r}
batch_mean <- function(file_name) {
dat <- arrow::read_parquet(file_name, col_select = "income")
income_vct <- na.omit(dat[["income"]])
c(mean = mean(income_vct),
n = length(income_vct))
}
```
In this function we are given a file name. For each file, we load only the column we are interested (`income`) remove `NA`s and calculate the mean. To be able to aggregate the mean across all files, we also record `n`, the number of values used to calculate the mean.
We can then apply the function to a list of file names and aggregate the results in a tibble using the `purrr::map_dfr`.
```{r}
file_names <- fs::dir_ls("data/arrow_dataset",
recurse = TRUE,
type = "file")
means <- purrr::map_dfr(file_names, ~batch_mean(.x))
```
```{r}
means
```
Now that we've got our batched mean we can calculate a weighted mean an use the n column as the weight, which indeed gives us the same mean as we had calculated it on the whole dataset.
```{r}
weighted.mean(x = means$mean, w = means$n)
mean(data$income, na.rm = TRUE)
```
## Specialised R packages
There are a number of R packages that provide EMA solutions for analysis bigger than memory data.
For example function `biglm` from package `biglm` allows for fitting a linear model in batches.
In the following example from the package documentation, an `lm` model is fitted initially to a small subset of the data with function `biglm`. The model is subsequently updated with additional chunks of using the `update`.
```{r}
data(trees)
ff<-log(Volume)~log(Girth)+log(Height)
chunk1<-trees[1:10,]
chunk2<-trees[11:20,]
chunk3<-trees[21:31,]
library(biglm)
a <- biglm(ff,chunk1)
a <- update(a,chunk2)
a <- update(a,chunk3)
coef(a)
```
The list of R packages available are numerous and their suitability varies according to the data and analysis you need to perform.
- [**bigmemory**](https://cran.r-project.org/web/packages/bigmemory/bigmemory.pdf): Manage Massive Matrices with Shared Memory and Memory-Mapped Files. The package uses memory mapping where RAM addresses are mapped to a file on disk. While innnevitably reducing performance, this extends the memory available for computation to memory on disk
- A number of analytics packages build on `bigmemory` including:
- [**`bigtabulate`**](https://cran.r-project.org/web/packages/bigtabulate/bigtabulate.pdf): Extend the bigmemory package with 'table', 'tapply', and 'split' support for 'big.matrix' objects.
- [`bigalgebra`](https://cran.r-project.org/web/packages/bigalgebra/index.html): For matrix operation.
- [**`biganalytics`**](https://cran.r-project.org/web/packages/biganalytics/biganalytics.pdf): Extend the 'bigmemory' package with various analytics, eg bigkmeans.
- [**`bigFastlm`**](https://github.com/jaredhuling/bigFastlm): for (fast) linear models.
- [**`biglasso`**](https://yaohuizeng.github.io/biglasso/index.html): extends lasso and elastic nets.
- [**`GHap`**](https://cran.r-project.org/web/packages/GHap/index.html): Haplotype calling from phased SNP data.
- [**`oem`**](https://jaredhuling.org/oem/): Penalized Regression for Big Tall Data.
- [**`bigstep`**](https://cran.r-project.org/web/packages/bigstep/vignettes/bigstep.html): Uses the **bigmemory** framework to perform stepwise model selection, when the data cannot fit into RAM.
- [**`ff`**](https://cran.r-project.org/web/packages/ff/index.html) : The ff package provides data structures that are stored on disk in a binary format but behave (almost) as if they were in RAM by transparently mapping only a section (pagesize) in main memory. These data structures lend themselves to efficient chunking. Unlike bigmemory which on support numeric data types, `ff` supports all of R vector types including factors (which any character data is converted to for memory efficiency.
- [`ffbase`](https://cran.microsoft.com/snapshot/2020-04-20/web/packages/ffbase/ffbase.pdf): extends the `ff` package with a number of methods for working with `ff` objects.
- Package `biglm` also has methods for `ff` type objects so is not limited to fitting on numeric data.
A good place to find up to date information on available packages is the [CRAN Task View on High-Performance and Parallel Computing with R](https://cran.r-project.org/web/views/HighPerformanceComputing.html), especially the section on **Large memory and out-of-memory data**.
::: callout-note
I should acknowledge that this brief section has been a summarisation of the [chapter on Efficient Memory](http://www.john-ros.com/Rcourse/memory.html) from BGUs Department of Industrial Engineering and Management "R" course by Jonathan D. Rosenblatt. For more information I highly recommend reviewing it as well as the chapter on [Sparse Representations](http://www.john-ros.com/Rcourse/sparse.html).
:::