Skip to content

Commit

Permalink
[Tour of Beam] Learning content for "Introduction" module (apache#23085)
Browse files Browse the repository at this point in the history
* learning content for introduction module

* removing white spaces from md files

* delete whitespaces

* delete whitespaces in python

* delete whitespace #2

* divide pipeline concepts

* add pipeline example concepts

* adding category tag to python examples

* adding category to java examples

* adding category to go examples

* fixed go example

* fixed go example compilation

* fixing python duplicate example names

* add runner concepts

* fixing java examples

* add licence for runner unit

* some minor fixes for unit names

* fixed unit name

* resolving CR comments

* adding complexity to examples

* adding tags

* fixed go example compilation

* fixed python example with duplicate transform

* change indent python

* fixing missing pipeline options

* change arrow symbol

* delete example prefix

* minor formatting and readability fixes

* add example description

* minor fix

* minor code review comment

Co-authored-by: Abzal Tuganbay <abzal.tugan@gmail.com>
  • Loading branch information
Oleh Borysevych and sirenbyte authored Nov 7, 2022
1 parent 98d2ffd commit 23676a9
Show file tree
Hide file tree
Showing 84 changed files with 3,785 additions and 0 deletions.
22 changes: 22 additions & 0 deletions learning/tour-of-beam/learning-content/go/content-info.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sdk: Go
content:
- introduction
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Creating PCollection

Now that you know how to create a Beam pipeline and pass parameters into it, it is time to learn how to create an initial `PCollection` and fill it with data.

There are several options:

→ You can create a PCollection of data stored in an in-memory collection class in your driver program.

→ You can also read the data from a variety of external sources such as local or cloud-based files, databases, or other sources using Beam-provided I/O adapters

Through the tour, most of the examples use either a `PCollection` created from in-memory data or data read from one of the cloud buckets "beam-examples" or "dataflow-samples". These buckets contain sample data sets specifically created for educational purposes.

We encourage you to take a look, explore these data sets and use them while learning Apache Beam.

### Creating a PCollection from in-memory data

You can use the Beam-provided Create transform to create a `PCollection` from an in-memory Go Collection. You can apply Create transform directly to your Pipeline object itself.

The following example code shows how to do this:

```
func main() {
ctx := context.Background()
// First create pipeline
p, s := beam.NewPipelineWithRoot()
//Now create the PCollection using list of strings
strings := beam.Create(s, "To", "be", "or", "not", "to", "be","that", "is", "the", "question")
//Create a numerical PCollection
numbers := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
```

### Playground exercise

You can find the complete code of this example in the playground window you can run and experiment with.

One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Don’t worry if you don’t quite understand it, as the concept of `ParDo` transform will be explained later in the course. Feel free, however, to use it in exercises and challenges to explore results.

Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// beam-playground:
// name: ParDo
// description: ParDo example.
// multifile: false
// context_line: 32
// categories:
// - Quickstart
// complexity: BASIC
// tags:
// - hellobeam

package main

import (
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"fmt"
)

func main() {
p, s := beam.NewPipelineWithRoot()

words := beam.Create(s, "Hello", "world", "it`s", "Beam")

output(s, words)

err := beamx.Run(context.Background(), p)
if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
}

func output(s beam.Scope, input beam.PCollection) {
beam.ParDo0(s, func(element interface{}) {
fmt.Println(element)
}, input)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: from-memory
name: Creating in-memory PCollections
taskName: ParDo
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: creating-collections
name: Creating Collections
content:
- from-memory
- reading-from-text
- reading-from-csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

### Read from csv file

Data processing pipelines often work with tabular data. In many examples and challenges throughout the course, you’ll be working with one of the datasets stored as csv files in either beam-examples, dataflow-samples buckets.

Loading data from csv file requires some processing and consists of two main part:
* Loading text lines using `TextIO.Read` transform
* Parsing lines of text into tabular format

### Playground exercise

Try to experiment with an example in the playground window and modify the code to process other fields from New York taxi rides dataset.

Here is a small list of fields and an example record from this dataset:

| cost | passenger_count | ... |
|------|-----------------|-----|
| 5.8 | 1 | ... |
| 4.6 | 2 | ... |
| 24 | 1 | ... |

Overview [file](https://storage.googleapis.com/apache-beam-samples/nyc_taxi/misc/sample1000.csv)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// beam-playground:
// name: CSV
// description: CSV example.
// multifile: false
// context_line: 44
// categories:
// - Quickstart
// complexity: BASIC
// tags:
// - hellobeam

package main

import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"strconv"
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"

)

func less(a, b float64) bool{
return a>b
}

func main() {
p, s := beam.NewPipelineWithRoot()

file := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv")

cost := applyTransform(s, file)

fixedSizeElements := top.Largest(s,cost,10,less)

output(s, "Total cost: ", fixedSizeElements)

err := beamx.Run(context.Background(), p)
if err != nil {
log.Exitf(context.Background(), "Failed to execute job: %v", err)
}
}

// Read reads from fiename(s) specified by a glob string and a returns a PCollection<string>.
func Read(s beam.Scope, glob string) beam.PCollection {
return textio.Read(s, glob)
}

// ApplyTransform converts to uppercase all elements in a PCollection<string>.
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
return beam.ParDo(s, func(line string) float64 {
taxi := strings.Split(strings.TrimSpace(line), ",")
if len(taxi) > 16 {
cost, _ := strconv.ParseFloat(taxi[16],64)
return cost
}
return 0.0
}, input)
}

func output(s beam.Scope, prefix string, input beam.PCollection) {
beam.ParDo0(s, func(elements []float64) {
for _, element := range elements {
fmt.Println(prefix,element)
}
}, input)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

id: from-csv
name: Creating PCollections from csv files
taskName: CSV
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Reading from text file

You use one of the Beam-provided I/O adapters to read from an external source. The adapters vary in their exact usage, but all of them read from some external data source and return a `PCollection` whose elements represent the data records in that source.

Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself.

`TextIO.Read` , for example, reads from an external text file and returns a `PCollection` whose elements are of type String. Each String represents one line from the text file. Here’s how you would apply `TextIO.Read` to your Pipeline to create a `PCollection`:

```
func main() {
ctx := context.Background()
// First create pipline
p, s := beam.NewPipelineWithRoot()
// Now create the PCollection by reading text files. Separate elements will be added for each line in the input file
lines := textio.Read(scope, 'gs://some/inputData.txt')
}
```

### Playground exercise

In the playground window, you can find an example that reads a king lear poem from the text file stored in the Google Storage bucket and fills PCollection with individual lines and then with individual words. Try it out and see what the output is.

One of the differences you will see is that the output is much shorter than the input file itself. This is because the number of elements in the output `PCollection` is limited with the `top.Largest(s,lines,10,less)` transform. Use Sample.fixedSizeGlobally transform of is another technique you can use to troubleshoot and limit the output sent to the console for debugging purposes in case of large input datasets.

Overview [file](https://storage.googleapis.com/apache-beam-samples/shakespeare/kinglear.txt)
Loading

0 comments on commit 23676a9

Please sign in to comment.