Skip to content

Commit

Permalink
hw2: matrixgenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
vbugaevskii committed Nov 19, 2017
1 parent 5d446f6 commit e0ed218
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 0 deletions.
26 changes: 26 additions & 0 deletions hw2matrixgenerator/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<configuration>
<property>
<name>mgen.row-count</name>
<value>20000</value>
</property>
<property>
<name>mgen.column-count</name>
<value>20000</value>
</property>
<property>
<name>mgen.min</name>
<value>2</value>
</property>
<property>
<name>mgen.max</name>
<value>8</value>
</property>
<property>
<name>mgen.sparsity</name>
<value>0</value>
</property>
<property>
<name>mgen.num-mappers</name>
<value>40</value>
</property>
</configuration>
18 changes: 18 additions & 0 deletions hw2matrixgenerator/package.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -e

USERNAME="Бугаевский"

ARCHIVE="Task2-$USERNAME.zip"
WORK_DIR="tmp"

mkdir -p "$WORK_DIR/prog"
[[ -z "$_DONT_REMOVE_WORK_DIR" ]] && trap "rm -rf ${WORK_DIR}" EXIT

cp -r src pom.xml config.xml "$WORK_DIR/prog"
cp readme.txt "$WORK_DIR"
if [ -f "$ARCHIVE" ]; then
rm "$ARCHIVE"
fi
cd "$WORK_DIR" ; zip -r "../$ARCHIVE" * ; cd ..
59 changes: 59 additions & 0 deletions hw2matrixgenerator/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>ru.msu.cs.bigdata</groupId>
<artifactId>sparse-matrix-generator</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<compilerArgument>-Werror</compilerArgument>
</configuration>
</plugin>
</plugins>
</build>
</project>
15 changes: 15 additions & 0 deletions hw2matrixgenerator/readme.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
1. Компиляция программы проходила в среде разработки IntelijIdea с помощью
указания maven цели с параметром package, что соответвует запуску команды
"mvn package" в консоли. В результате работы в директории target появляется
исполняемый файл sparse-matrix-generator-1.0-SNAPSHOT.jar.

2. Запуск задачи на сервере
hadoop jar sparse-matrix-generator-1.0-SNAPSHOT.jar \
mgen -conf config.xml output

hadoop jar sparse-matrix-generator-1.0-SNAPSHOT.jar mgen \
-D mgen.row-count=20000 -D mgen.column-count=20000 -D mgen.seed=8888 \
-D mgen.num-mappers=30 -D mgen.sparsity=0.0 output

На выходе в директории output оказывается директория matrix.data (данные) и
файл matrix.size (размеры матрицы)
59 changes: 59 additions & 0 deletions hw2matrixgenerator/src/main/java/GeneratorInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.*;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

public class GeneratorInputFormat extends InputFormat<MatrixCoords, FloatWritable> {
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

int numMappers = conf.getInt(mgen.PARAM_NUM_MAPPERS, mgen.PARAM_NUM_MAPPERS_DEFAULT);
int matrixRows = conf.getInt(mgen.PARAM_ROW_COUNT, mgen.PARAM_ROW_COUNT_DEFAULT);
int matrixCols = conf.getInt(mgen.PARAM_COL_COUNT, mgen.PARAM_COL_COUNT_DEFAULT);

List<InputSplit> splits = new LinkedList<>();

if (numMappers > matrixRows && numMappers > matrixCols) {
numMappers = Math.max(matrixCols, matrixRows);
}

int strideWidth, colMin, colMax, rowMin, rowMax;
if (numMappers <= matrixRows) {
// split by rows
strideWidth = (int) Math.ceil(((float) matrixRows) / numMappers);
colMin = 0;
colMax = matrixCols;

for (rowMin = 0; rowMin < matrixRows; rowMin += strideWidth) {
rowMax = Math.min(rowMin + strideWidth, matrixRows);
splits.add(new GeneratorInputSplit(rowMin, rowMax, colMin, colMax));
}
} else if (numMappers <= matrixCols) {
// split by columns
strideWidth = (int) Math.ceil(((float) matrixCols) / numMappers);
rowMin = 0;
rowMax = matrixRows;

for (colMin = 0; colMin < matrixCols; colMin += strideWidth) {
colMax = Math.min(colMin + strideWidth, matrixCols);
splits.add(new GeneratorInputSplit(rowMin, rowMax, colMin, colMax));
}
}

return splits;
}

@Override
public RecordReader<MatrixCoords, FloatWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
double sparsity = context.getConfiguration().getDouble(mgen.PARAM_SPARSITY, mgen.PARAM_SPARSITY_DEFAULT);
RecordReader recordReader = (sparsity < 0.5) ?
new GeneratorRecordReaderInverse() : new GeneratorRecordReaderForward();
recordReader.initialize(split, context);
return recordReader;
}
}
63 changes: 63 additions & 0 deletions hw2matrixgenerator/src/main/java/GeneratorInputSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class GeneratorInputSplit extends InputSplit implements Writable {
private int rowMin, rowMax, colMin, colMax;

public GeneratorInputSplit() {
rowMin = rowMax = colMin = colMax = -1;
}

public GeneratorInputSplit(int rowMin, int rowMax, int colMin, int colMax) {
this.rowMin = rowMin;
this.rowMax = rowMax;
this.colMin = colMin;
this.colMax = colMax;
}

public int getRowMin() {
return rowMin;
}

public int getRowMax() {
return rowMax;
}

public int getColMin() {
return colMin;
}

public int getColMax() {
return colMax;
}

@Override
public long getLength() throws IOException, InterruptedException {
return (rowMax - rowMin) * (colMax - colMin) * (Float.SIZE / Byte.SIZE);
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[0];
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(rowMin);
out.writeInt(rowMax);
out.writeInt(colMin);
out.writeInt(colMax);
}

@Override
public void readFields(DataInput in) throws IOException {
rowMin = in.readInt();
rowMax = in.readInt();
colMin = in.readInt();
colMax = in.readInt();
}
}
38 changes: 38 additions & 0 deletions hw2matrixgenerator/src/main/java/GeneratorMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GeneratorMapper extends Mapper<MatrixCoords, FloatWritable, NullWritable, Text> {
private static String outputFormat;
private static Text outputValue = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
Configuration conf = context.getConfiguration();

String tag = conf.get(mgen.PARAM_TAG);
if (tag != null) {
builder.append(tag);
builder.append("\t");
}
builder.append("%s\t"); // for key

String floatFormat = conf.get(mgen.PARAM_FLOAT_FORMAT, mgen.PARAM_FLOAT_FROMAT_DEFAULT);
builder.append(floatFormat);

outputFormat = builder.toString();
}

@Override
protected void map(MatrixCoords key, FloatWritable value, Context context)
throws IOException, InterruptedException {
outputValue.set(String.format(outputFormat, key.toString(), value.get()));
context.write(NullWritable.get(), outputValue);
}
}
77 changes: 77 additions & 0 deletions hw2matrixgenerator/src/main/java/GeneratorRecordReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.Random;

abstract public class GeneratorRecordReader extends RecordReader<MatrixCoords, FloatWritable> {
protected int valuesMapped, valuesMappedMax;

protected Random random;
protected float minValue, maxValue;
protected int rowMin, rowMax, colMin, colMax;

protected MatrixCoords currentKey = null;
protected Float currentValue = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

minValue = conf.getFloat(mgen.PARAM_MIN_VALUE, mgen.PARAM_MIN_VALUE_DEFAULT);
maxValue = conf.getFloat(mgen.PARAM_MAX_VALUE, mgen.PARAM_MAX_VALUE_DEFAULT);

random = new Random(conf.getLong(mgen.PARAM_SEED, mgen.PARAM_SEED_DEFAULT));

GeneratorInputSplit inputSplit = (GeneratorInputSplit) split;
rowMin = inputSplit.getRowMin();
rowMax = inputSplit.getRowMax();
colMin = inputSplit.getColMin();
colMax = inputSplit.getColMax();

double sparsity = conf.getDouble(mgen.PARAM_SPARSITY, mgen.PARAM_SPARSITY_DEFAULT);
valuesMappedMax = (int) ( (rowMax - rowMin) * (colMax - colMin) * (1.0 - sparsity) );
valuesMapped = -1;
}

protected int generateRandomRow() {
return rowMin + random.nextInt(rowMax - rowMin);
}

protected int generateRandomCol() {
return colMin + random.nextInt(colMax - colMin);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
currentKey = null;
currentValue = null;
return ++valuesMapped < valuesMappedMax;
}

abstract public MatrixCoords getCurrentKey() throws IOException, InterruptedException;

@Override
public FloatWritable getCurrentValue() throws IOException, InterruptedException {
if (currentValue == null) {
currentValue = 0.0f;
while (currentValue == 0.0f) {
currentValue = minValue + (maxValue - minValue) * random.nextFloat();
}
}
return new FloatWritable(currentValue);
}

@Override
public float getProgress() throws IOException, InterruptedException {
return ( (float) valuesMapped ) / valuesMappedMax;
}

@Override
public void close() throws IOException {
valuesMapped = -1;
}
}
Loading

0 comments on commit e0ed218

Please sign in to comment.