Skip to content

Commit

Permalink
Merge pull request apache#123 from seznam/pete/explain-examples
Browse files Browse the repository at this point in the history
#! [euphoria-examples] Provide some more explanatory comments
  • Loading branch information
vanekjar authored May 10, 2017
2 parents c2a5046 + 83c3bfc commit 8f3fa87
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@
import java.util.regex.Pattern;

/**
* Simple aggregation of logs in Apache log format.
* Counts number of daily hits per client.
* Simple aggregation of logs in Apache log format. Counts the number of daily
* hits per client. This is a word-count like program utilizing time windowing
* based on event time.
* <p>
* If newly coming the euphoria API, you are advised to first study the
* {@link SimpleWordCount} program.
* <p>
*
* Example usage on flink:
Expand Down Expand Up @@ -77,18 +81,89 @@ public static void main(String[] args) throws Exception {
final String executorName = args[0];
final String inputPath = args[1];

// As with the {@code SimpleWordCount} we define a source to read data from ...
DataSource<String> dataSource = new SimpleHadoopTextFileSource(inputPath);
// ... and a sink to write the business logic's output to. In this particular
// case we use a sink that eventually writes out the data to the executors
// standard output. This is rarely useful in production environments but
// is handy in local executions.
DataSink<String> dataSink = new StdoutSink<>();

// We start by allocating a new flow, a container to encapsulates the
// chain of transformations.
Flow flow = Flow.create("Access log processor");

// From the data source describing the actual input data location and
// physical form, we create an abstract data set to be processed in the
// context of the created flow.
//
// As in other examples, reading the actual input source is deferred
// until the flow's execution. The data itself is _not_ touched at this
// point in time yet.
Dataset<String> input = flow.createInput(dataSource);

// We assume the actual input data to have a particular format; in this
// case, each element is expected to be a log line from the Apache's access
// log. We "map" the "parseLine" function over each such line to transform
// the raw log entry into a more structured object.
//
// Note: Using `MapElements` implies that for each input we generate an
// output. In the context of this program it means, that we are not able
// to naturally "skip" invalid log lines.
//
// Note: Generally, user defined functions must be thread-safe. If you
// inspect the `parseLine` function, you'll see that it allocates a new
// `SimpleDateFormat` instance for every input element since sharing such
// an instance between threads without explicit synchronization is not
// thread-safe. (In this example we have intentionally used the
// `SimpleDateFormat` to make this point. In a read-world program you
// would probably hand out to `DateTimeFormatter` which can be safely
// be re-used across threads.)
Dataset<LogLine> parsed = MapElements.named("LOG-PARSER")
.of(input)
.using(LogParser::parseLine)
.output();

// In the previous step we derived a data set specifying points in time
// at which particular IPs accessed our web-server. Our goal is now to
// count how often a particular IP accessed the web-server, per day. This
// is, instead of deriving the count of a particular IP from the whole
// input, we want to know the number of hits per IP for every day
// distinctly (we're not interested in zero hit counts, of course.)
//
// Actually, this computation is merely a word-count problem explained
// in the already mentioned {@link SimpleWordCount}. We just count the
// number of occurrences of a particular IP. However, we also specify
// how the input is to be "windowed."
//
// Windowing splits the input into fixed sections of chunks. Such as we
// can divide a data set into chunks by a certain size, we can split
// a data set into chunks defined by time, e.g. a chunk for day one,
// another chunk for day two, etc. provided that elements of the data
// set have a notion of time. Once the input data set is logically divided
// into these "time windows", the computation takes place separately on
// each of them, and, produces a results for each window separately.
//
// Here, we specify time based windowing using the `Time.of(..)` method
// specifying the size of the windows, in particular "one day" in this
// example. Then, we also specify how to determine the time of the
// elements, such that these are placed into the right windows.
//
// Note: There are a few different windowing strategies and you can
// investigate each by looking for classes implementing {@link Windowing}.
//
// Note: You might wonder why we didn't just a
// "select ip, count(*) from input group by (ip, day)". First, windowing
// as such is a separate concern to the actual computation; there is no
// need to mix them up and further complicate the actual computation.
// Being a separate concern it allows for easier exchange and
// experimentation. Second, by specifying windowing as a separate concern,
// we can make the computation work even on unbounded, i.e. endless, input
// streams. Windowing strategies generally work together with the
// executor and can define a point when a window is determined to be
// "filled" at which point the windows data can be processed, calculated,
// and the corresponding results emitted. This makes endless stream
// processing work.
Dataset<Pair<String, Long>> aggregated = ReduceByKey.named("AGGREGATE")
.of(parsed)
.keyBy(LogLine::getIp)
Expand All @@ -97,6 +172,14 @@ public static void main(String[] args) throws Exception {
.windowBy(Time.of(Duration.ofDays(1)), line -> line.getDate().getTime())
.output();

// At the final stage of our flow, we nicely format the previously emitted
// results before persisting them to a given data sink, e.g. external storage.
//
// The elements emitted from the previous operator specify the windowed
// results of the "IP-count". This is, for each IP we get a count of the
// number of its occurrences (within a window.) The window information
// itself - if desired - can be accessed from the `FlatMap`'s context
// parameter as demonstrated below.
FlatMap.named("FORMAT-OUTPUT")
.of(aggregated)
.using(((Pair<String, Long> elem, Context<String> context) -> {
Expand All @@ -108,6 +191,7 @@ public static void main(String[] args) throws Exception {
.output()
.persist(dataSink);

// Finally, we allocate an executor and submit our flow for execution on it.
Executor executor = Executors.createExecutor(executorName);
executor.submit(flow).get();
}
Expand All @@ -120,7 +204,11 @@ public static LogLine parseLine(String line) {
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH);
// SDF is not thread-safe, so we need to allocate one here. Ideally,
// we'd use `DateTimeFormatter` and re-use it across input elements.
// see the corresponding note at the operator utilizing `parseLine`.
SimpleDateFormat sdf =
new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH);

String ip = matcher.group(1);
Date date = sdf.parse(matcher.group(4));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.io.StdoutSink;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.MapElements;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.client.util.Sums;
import cz.seznam.euphoria.core.executor.Executor;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.examples.Executors;
import cz.seznam.euphoria.hadoop.input.SimpleHadoopTextFileSource;
import cz.seznam.euphoria.hadoop.output.SimpleHadoopTextFileSink;

import java.net.URI;
import java.util.regex.Pattern;

/**
* Demostrates a very simple word-count supported batched input.
* Demonstrates a very simple word-count supporting batched input
* without windowing.
*
* Example usage on flink:
* <pre>{@code
Expand Down Expand Up @@ -69,43 +69,113 @@ public class SimpleWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: " + SimpleWordCount.class
+ " <executor-name> <input-uri> <output-uri> [num-output-partitions]");
+ " <executor-name> <input-path> <output-path> [num-output-partitions]");
System.exit(1);
}

Settings settings = new Settings();
settings.setClass("euphoria.io.datasource.factory.file", SimpleHadoopTextFileSource.Factory.class);
settings.setClass("euphoria.io.datasource.factory.hdfs", SimpleHadoopTextFileSource.Factory.class);
settings.setClass("euphoria.io.datasink.factory.file", SimpleHadoopTextFileSink.Factory.class);
settings.setClass("euphoria.io.datasink.factory.hdfs", SimpleHadoopTextFileSink.Factory.class);
settings.setClass("euphoria.io.datasink.factory.stdout", StdoutSink.Factory.class);

final String executorName = args[0];
final String input = args[1];
final String output = args[2];
final int partitions = args.length > 3 ? Integer.parseInt(args[3]) : -1;

Flow flow = buildFlow(settings, URI.create(input), URI.create(output), partitions);
// Define a source of data to read text lines from. We utilize an
// already predefined DataSource implementations hiding some of
// implementation details. Note that at this point in time the data
// is not read. The source files will be opened and read in a distributed
// manner only when the "WordCount" flow is submitted for execution.
DataSource<String> inSource = new SimpleHadoopTextFileSource(input);

// Define a sink where to write the program's final results to. As with
// the above defined source, no resources are opened for writing yet.
// Only when the program is submitted for execution, the sink will be
// instructed to open writers to the final, physical destination. Here,
// we utilize an already predefined DataSink which simply writes a string
// on its own line.
DataSink<String> outSink = new SimpleHadoopTextFileSink<>(output);

// Construct a flow which we'll later submit for execution. For the sake
// of readability we've moved the definition into its own method.
Flow flow = buildFlow(inSource, outSink, partitions);

// Allocate an executor by the specified name.
Executor executor = Executors.createExecutor(executorName);

// Only now we submit the flow and will have the executor execute
// the business logic defined by the flow. Only, we data sources
// and sinks will be opened.
//
// As you can see the submission of flow happens in the background,
// and we could submit other flows to execute concurrently with the
// one just submitted. To await the termination of a flow, we just
// ask for the result of the `Future` object returned by `submit()`.
executor.submit(flow).get();
}

private static Flow buildFlow(Settings settings, URI input, URI output, int partitions)
throws Exception
{
Flow flow = Flow.create(SimpleWordCount.class.getSimpleName(), settings);
/**
* This method defines the executor independent business logic of the program.
*
* @param input the source to read lines of text from
* @param output the sink to write the output of the business logic to
* @param partitions the parallelism of computation's execution and
* number of output partitions to generate
*
* @return a flow, a unit to be executed on a specific executor
*/
private static Flow buildFlow(DataSource<String> input, DataSink<String> output, int partitions) {
// The first step in building a euphoria flow is creating a ...
// well, a `Flow` object. It is a container encapsulating a chain
// of transformations. Within a program we can have many flows. Though,
// these all will be independent. Dependencies between operation
// can be expressed only within a single flow.
//
// It is usually good practice to give each flow within a program a
// unique name to make it easier to distinguish corresponding statistics
// or otherwise displayed information from other flow which may be
// potentially part of the program.
Flow flow = Flow.create(SimpleWordCount.class.getSimpleName());

// Given a data source we lift this source up into an abstract data
// set. A data set is the input and output of operators. While a source
// describes a particular source a data set is abstracting from this
// particular notion. It can be literally thought of as a "set of data"
// (without the notion of uniqueness of elements.)
//
// Note: we ask the flow to do this lifting. The new data set will
// automatically be associated with the flow. All operators processing
// this data set will also become automatically associated with the
// flow. Using the data set (or an operator) associated with a flow
// in a different flow, is considered an error and will lead to
// exceptions before the flow is even executed.
Dataset<String> lines = flow.createInput(input);

// split lines to words
// In the next step we want to chop up the data set of strings into a
// data set of words. Using the `FlatMap` operator we can process each
// element/string from the original data set to transform it into words.
//
// Note: Generally we are never modifying the original input data set but
// merely produce a new one. Further, the processing order of the input
// elements is generally unknown and typically happens in parallel.
//
// The `FlatMap` operator in particular is a handy choice at this point.
// It processes one input element at a time and allows user code to emit
// zero, one, or more output elements. We use it here to chop up a long
// string into individual words and emit each individually instead.
Dataset<String> words = FlatMap.named("TOKENIZER")
.of(lines)
.using((String line, Context<String> c) ->
SPLIT_RE.splitAsStream(line).forEachOrdered(c::collect))
.output();

// count per word
// Given the "words" data set, we want to reduce it to a collection
// of word-counts, i.e. a collection which counts the number occurrences
// of every distinct word.
//
// From each input element we extract a key, which is the word itself
// here, and a value, which is the constant `1` in this example. Then, we
// reduce by the key - the operator ensures that all values for the same
// key end up being processed together. It applies our `combineBy` user
// defined function to these values. The result of this user defined
// function is then emitted to the output along with its corresponding
// key.
Dataset<Pair<String, Long>> counted = ReduceByKey.named("REDUCE")
.of(words)
.keyBy(e -> e)
Expand All @@ -114,7 +184,14 @@ private static Flow buildFlow(Settings settings, URI input, URI output, int part
.applyIf(partitions > 0, op -> op.setNumPartitions(partitions))
.output();

// format output
// Lastly we merely format the output of the preceding operator and
// call `.persist()` with a data sink specifying the "persistent"
// destination of the data. A data source itself describes where to
// write the data to and how to physically lay it out.
//
// Note: a flow without any call to `.persist()` is meaningless as
// such a flow would never produces anything. Executors are free to
// reject such flows.
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getFirst() + "\t" + p.getSecond())
Expand Down

0 comments on commit 8f3fa87

Please sign in to comment.