From e0ed218ff2fa7dfa2dafaeb4058386140337d634 Mon Sep 17 00:00:00 2001 From: vbugaevsky Date: Sun, 19 Nov 2017 13:54:52 +0300 Subject: [PATCH] hw2: matrixgenerator --- hw2matrixgenerator/config.xml | 26 ++++ hw2matrixgenerator/package.sh | 18 +++ hw2matrixgenerator/pom.xml | 59 +++++++++ hw2matrixgenerator/readme.txt | 15 +++ .../src/main/java/GeneratorInputFormat.java | 59 +++++++++ .../src/main/java/GeneratorInputSplit.java | 63 ++++++++++ .../src/main/java/GeneratorMapper.java | 38 ++++++ .../src/main/java/GeneratorRecordReader.java | 77 ++++++++++++ .../java/GeneratorRecordReaderForward.java | 45 +++++++ .../java/GeneratorRecordReaderInverse.java | 62 ++++++++++ .../src/main/java/MatrixCoords.java | 81 +++++++++++++ hw2matrixgenerator/src/main/java/mgen.java | 113 ++++++++++++++++++ 12 files changed, 656 insertions(+) create mode 100644 hw2matrixgenerator/config.xml create mode 100755 hw2matrixgenerator/package.sh create mode 100644 hw2matrixgenerator/pom.xml create mode 100644 hw2matrixgenerator/readme.txt create mode 100644 hw2matrixgenerator/src/main/java/GeneratorInputFormat.java create mode 100644 hw2matrixgenerator/src/main/java/GeneratorInputSplit.java create mode 100644 hw2matrixgenerator/src/main/java/GeneratorMapper.java create mode 100644 hw2matrixgenerator/src/main/java/GeneratorRecordReader.java create mode 100644 hw2matrixgenerator/src/main/java/GeneratorRecordReaderForward.java create mode 100644 hw2matrixgenerator/src/main/java/GeneratorRecordReaderInverse.java create mode 100644 hw2matrixgenerator/src/main/java/MatrixCoords.java create mode 100644 hw2matrixgenerator/src/main/java/mgen.java diff --git a/hw2matrixgenerator/config.xml b/hw2matrixgenerator/config.xml new file mode 100644 index 0000000..dbf5abc --- /dev/null +++ b/hw2matrixgenerator/config.xml @@ -0,0 +1,26 @@ + + + mgen.row-count + 20000 + + + mgen.column-count + 20000 + + + mgen.min + 2 + + + mgen.max + 8 + + + mgen.sparsity + 0 + + + mgen.num-mappers + 40 + + diff --git a/hw2matrixgenerator/package.sh b/hw2matrixgenerator/package.sh new file mode 100755 index 0000000..5786bab --- /dev/null +++ b/hw2matrixgenerator/package.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -e + +USERNAME="Бугаевский" + +ARCHIVE="Task2-$USERNAME.zip" +WORK_DIR="tmp" + +mkdir -p "$WORK_DIR/prog" +[[ -z "$_DONT_REMOVE_WORK_DIR" ]] && trap "rm -rf ${WORK_DIR}" EXIT + +cp -r src pom.xml config.xml "$WORK_DIR/prog" +cp readme.txt "$WORK_DIR" +if [ -f "$ARCHIVE" ]; then + rm "$ARCHIVE" +fi +cd "$WORK_DIR" ; zip -r "../$ARCHIVE" * ; cd .. diff --git a/hw2matrixgenerator/pom.xml b/hw2matrixgenerator/pom.xml new file mode 100644 index 0000000..c1651b7 --- /dev/null +++ b/hw2matrixgenerator/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + ru.msu.cs.bigdata + sparse-matrix-generator + 1.0-SNAPSHOT + + + + org.apache.hadoop + hadoop-mapreduce-client-common + 2.7.3 + provided + + + org.apache.hadoop + hadoop-common + 2.7.3 + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + + + + + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.7 + 1.7 + -Werror + + + + + \ No newline at end of file diff --git a/hw2matrixgenerator/readme.txt b/hw2matrixgenerator/readme.txt new file mode 100644 index 0000000..8745bbe --- /dev/null +++ b/hw2matrixgenerator/readme.txt @@ -0,0 +1,15 @@ +1. Компиляция программы проходила в среде разработки IntelijIdea с помощью +указания maven цели с параметром package, что соответвует запуску команды +"mvn package" в консоли. В результате работы в директории target появляется +исполняемый файл sparse-matrix-generator-1.0-SNAPSHOT.jar. + +2. Запуск задачи на сервере +hadoop jar sparse-matrix-generator-1.0-SNAPSHOT.jar \ + mgen -conf config.xml output + +hadoop jar sparse-matrix-generator-1.0-SNAPSHOT.jar mgen \ + -D mgen.row-count=20000 -D mgen.column-count=20000 -D mgen.seed=8888 \ + -D mgen.num-mappers=30 -D mgen.sparsity=0.0 output + +На выходе в директории output оказывается директория matrix.data (данные) и +файл matrix.size (размеры матрицы) \ No newline at end of file diff --git a/hw2matrixgenerator/src/main/java/GeneratorInputFormat.java b/hw2matrixgenerator/src/main/java/GeneratorInputFormat.java new file mode 100644 index 0000000..cfa9b6f --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorInputFormat.java @@ -0,0 +1,59 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.mapreduce.*; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class GeneratorInputFormat extends InputFormat { + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + int numMappers = conf.getInt(mgen.PARAM_NUM_MAPPERS, mgen.PARAM_NUM_MAPPERS_DEFAULT); + int matrixRows = conf.getInt(mgen.PARAM_ROW_COUNT, mgen.PARAM_ROW_COUNT_DEFAULT); + int matrixCols = conf.getInt(mgen.PARAM_COL_COUNT, mgen.PARAM_COL_COUNT_DEFAULT); + + List splits = new LinkedList<>(); + + if (numMappers > matrixRows && numMappers > matrixCols) { + numMappers = Math.max(matrixCols, matrixRows); + } + + int strideWidth, colMin, colMax, rowMin, rowMax; + if (numMappers <= matrixRows) { + // split by rows + strideWidth = (int) Math.ceil(((float) matrixRows) / numMappers); + colMin = 0; + colMax = matrixCols; + + for (rowMin = 0; rowMin < matrixRows; rowMin += strideWidth) { + rowMax = Math.min(rowMin + strideWidth, matrixRows); + splits.add(new GeneratorInputSplit(rowMin, rowMax, colMin, colMax)); + } + } else if (numMappers <= matrixCols) { + // split by columns + strideWidth = (int) Math.ceil(((float) matrixCols) / numMappers); + rowMin = 0; + rowMax = matrixRows; + + for (colMin = 0; colMin < matrixCols; colMin += strideWidth) { + colMax = Math.min(colMin + strideWidth, matrixCols); + splits.add(new GeneratorInputSplit(rowMin, rowMax, colMin, colMax)); + } + } + + return splits; + } + + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + double sparsity = context.getConfiguration().getDouble(mgen.PARAM_SPARSITY, mgen.PARAM_SPARSITY_DEFAULT); + RecordReader recordReader = (sparsity < 0.5) ? + new GeneratorRecordReaderInverse() : new GeneratorRecordReaderForward(); + recordReader.initialize(split, context); + return recordReader; + } +} diff --git a/hw2matrixgenerator/src/main/java/GeneratorInputSplit.java b/hw2matrixgenerator/src/main/java/GeneratorInputSplit.java new file mode 100644 index 0000000..c80d129 --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorInputSplit.java @@ -0,0 +1,63 @@ +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class GeneratorInputSplit extends InputSplit implements Writable { + private int rowMin, rowMax, colMin, colMax; + + public GeneratorInputSplit() { + rowMin = rowMax = colMin = colMax = -1; + } + + public GeneratorInputSplit(int rowMin, int rowMax, int colMin, int colMax) { + this.rowMin = rowMin; + this.rowMax = rowMax; + this.colMin = colMin; + this.colMax = colMax; + } + + public int getRowMin() { + return rowMin; + } + + public int getRowMax() { + return rowMax; + } + + public int getColMin() { + return colMin; + } + + public int getColMax() { + return colMax; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return (rowMax - rowMin) * (colMax - colMin) * (Float.SIZE / Byte.SIZE); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(rowMin); + out.writeInt(rowMax); + out.writeInt(colMin); + out.writeInt(colMax); + } + + @Override + public void readFields(DataInput in) throws IOException { + rowMin = in.readInt(); + rowMax = in.readInt(); + colMin = in.readInt(); + colMax = in.readInt(); + } +} \ No newline at end of file diff --git a/hw2matrixgenerator/src/main/java/GeneratorMapper.java b/hw2matrixgenerator/src/main/java/GeneratorMapper.java new file mode 100644 index 0000000..f87d884 --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorMapper.java @@ -0,0 +1,38 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class GeneratorMapper extends Mapper { + private static String outputFormat; + private static Text outputValue = new Text(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + StringBuilder builder = new StringBuilder(); + Configuration conf = context.getConfiguration(); + + String tag = conf.get(mgen.PARAM_TAG); + if (tag != null) { + builder.append(tag); + builder.append("\t"); + } + builder.append("%s\t"); // for key + + String floatFormat = conf.get(mgen.PARAM_FLOAT_FORMAT, mgen.PARAM_FLOAT_FROMAT_DEFAULT); + builder.append(floatFormat); + + outputFormat = builder.toString(); + } + + @Override + protected void map(MatrixCoords key, FloatWritable value, Context context) + throws IOException, InterruptedException { + outputValue.set(String.format(outputFormat, key.toString(), value.get())); + context.write(NullWritable.get(), outputValue); + } +} diff --git a/hw2matrixgenerator/src/main/java/GeneratorRecordReader.java b/hw2matrixgenerator/src/main/java/GeneratorRecordReader.java new file mode 100644 index 0000000..4560c6d --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorRecordReader.java @@ -0,0 +1,77 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.Random; + +abstract public class GeneratorRecordReader extends RecordReader { + protected int valuesMapped, valuesMappedMax; + + protected Random random; + protected float minValue, maxValue; + protected int rowMin, rowMax, colMin, colMax; + + protected MatrixCoords currentKey = null; + protected Float currentValue = null; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + minValue = conf.getFloat(mgen.PARAM_MIN_VALUE, mgen.PARAM_MIN_VALUE_DEFAULT); + maxValue = conf.getFloat(mgen.PARAM_MAX_VALUE, mgen.PARAM_MAX_VALUE_DEFAULT); + + random = new Random(conf.getLong(mgen.PARAM_SEED, mgen.PARAM_SEED_DEFAULT)); + + GeneratorInputSplit inputSplit = (GeneratorInputSplit) split; + rowMin = inputSplit.getRowMin(); + rowMax = inputSplit.getRowMax(); + colMin = inputSplit.getColMin(); + colMax = inputSplit.getColMax(); + + double sparsity = conf.getDouble(mgen.PARAM_SPARSITY, mgen.PARAM_SPARSITY_DEFAULT); + valuesMappedMax = (int) ( (rowMax - rowMin) * (colMax - colMin) * (1.0 - sparsity) ); + valuesMapped = -1; + } + + protected int generateRandomRow() { + return rowMin + random.nextInt(rowMax - rowMin); + } + + protected int generateRandomCol() { + return colMin + random.nextInt(colMax - colMin); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + currentKey = null; + currentValue = null; + return ++valuesMapped < valuesMappedMax; + } + + abstract public MatrixCoords getCurrentKey() throws IOException, InterruptedException; + + @Override + public FloatWritable getCurrentValue() throws IOException, InterruptedException { + if (currentValue == null) { + currentValue = 0.0f; + while (currentValue == 0.0f) { + currentValue = minValue + (maxValue - minValue) * random.nextFloat(); + } + } + return new FloatWritable(currentValue); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return ( (float) valuesMapped ) / valuesMappedMax; + } + + @Override + public void close() throws IOException { + valuesMapped = -1; + } +} diff --git a/hw2matrixgenerator/src/main/java/GeneratorRecordReaderForward.java b/hw2matrixgenerator/src/main/java/GeneratorRecordReaderForward.java new file mode 100644 index 0000000..ad63792 --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorRecordReaderForward.java @@ -0,0 +1,45 @@ +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class GeneratorRecordReaderForward extends GeneratorRecordReader { + private List matrixCoordsChosen; + private int currentElem = 0; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(split, context); + + Set matrixCoordsSet = new HashSet<>(); + for (int i = 0; i < valuesMappedMax + 1; i++) { + MatrixCoords key = new MatrixCoords(); + while (matrixCoordsSet.contains(key)) { + key.setRow(generateRandomRow()); + key.setCol(generateRandomCol()); + } + matrixCoordsSet.add(key); + } + matrixCoordsSet.remove(new MatrixCoords()); + + matrixCoordsChosen = new ArrayList<>(matrixCoordsSet); + } + + @Override + public MatrixCoords getCurrentKey() throws IOException, InterruptedException { + if (currentKey == null) { + currentKey = matrixCoordsChosen.get(currentElem++); + } + return currentKey; + } + + @Override + public void close() throws IOException { + super.close(); + matrixCoordsChosen.clear(); + } +} diff --git a/hw2matrixgenerator/src/main/java/GeneratorRecordReaderInverse.java b/hw2matrixgenerator/src/main/java/GeneratorRecordReaderInverse.java new file mode 100644 index 0000000..85dd933 --- /dev/null +++ b/hw2matrixgenerator/src/main/java/GeneratorRecordReaderInverse.java @@ -0,0 +1,62 @@ +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +public class GeneratorRecordReaderInverse extends GeneratorRecordReader { + private Set matrixCoordsZeros; + + private int currentRow, currentCol; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + super.initialize(split, context); + + currentRow = rowMin; + currentCol = colMin; + + matrixCoordsZeros = new HashSet<>(); + + int valuesMappedZeros = (rowMax - rowMin) * (colMax - colMin) - valuesMappedMax; + for (int i = 0; i < valuesMappedZeros + 1; i++) { + MatrixCoords key = new MatrixCoords(); + while (matrixCoordsZeros.contains(key)) { + key.setRow(generateRandomRow()); + key.setCol(generateRandomCol()); + } + matrixCoordsZeros.add(key); + } + + for (MatrixCoords s : matrixCoordsZeros) { + System.out.println(s.toString()); + } + } + + @Override + public MatrixCoords getCurrentKey() throws IOException, InterruptedException { + if (currentKey == null) { + currentKey = new MatrixCoords(); + + outer: + for (; currentRow < rowMax; currentRow++) { + for (; currentCol < colMax; currentCol++) { + if (!matrixCoordsZeros.contains(currentKey)) { + break outer; + } + currentKey.setRow(currentRow); + currentKey.setCol(currentCol); + } + currentCol = colMin; + } + } + return currentKey; + } + + @Override + public void close() throws IOException { + super.close(); + matrixCoordsZeros.clear(); + } +} diff --git a/hw2matrixgenerator/src/main/java/MatrixCoords.java b/hw2matrixgenerator/src/main/java/MatrixCoords.java new file mode 100644 index 0000000..7e7ddfa --- /dev/null +++ b/hw2matrixgenerator/src/main/java/MatrixCoords.java @@ -0,0 +1,81 @@ +import org.apache.hadoop.io.WritableComparable; + +import javax.annotation.Nonnull; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + +public class MatrixCoords implements WritableComparable { + private Integer row, col; + + public MatrixCoords() { + row = -1; + col = -1; + } + + public MatrixCoords(int row, int col) { + this.row = row; + this.col = col; + } + + public int getRow() { + return row; + } + + public int getCol() { + return col; + } + + public void setRow(Integer row) { + this.row = row; + } + + public void setCol(Integer col) { + this.col = col; + } + + @Override + public int compareTo(@Nonnull MatrixCoords that) { + int compareResult = this.row.compareTo(that.row); + return (compareResult == 0) ? this.col.compareTo(that.col) : compareResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || !(o instanceof MatrixCoords)) { + return false; + } + + MatrixCoords that = (MatrixCoords) o; + return Objects.equals(this.row, that.row) && Objects.equals(this.col, that.col); + } + + @Override + public int hashCode() { + int result = (row != null) ? row.hashCode() : 0; + result = 31 * result + ((col != null) ? col.hashCode() : 0); + return result; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(row); + out.writeInt(col); + } + + @Override + public void readFields(DataInput in) throws IOException { + row = in.readInt(); + col = in.readInt(); + } + + @Override + public String toString() { + return String.format("%d\t%d", row, col); + } +} diff --git a/hw2matrixgenerator/src/main/java/mgen.java b/hw2matrixgenerator/src/main/java/mgen.java new file mode 100644 index 0000000..0ae8564 --- /dev/null +++ b/hw2matrixgenerator/src/main/java/mgen.java @@ -0,0 +1,113 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.FileSystemCounter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.*; +import java.net.URI; + +public class mgen extends Configured implements Tool { + public static final String PARAM_ROW_COUNT = "mgen.row-count"; + public static final String PARAM_COL_COUNT = "mgen.column-count"; + public static final int PARAM_ROW_COUNT_DEFAULT = 2; + public static final int PARAM_COL_COUNT_DEFAULT = 2; + + public static final String PARAM_MIN_VALUE = "mgen.min"; + public static final String PARAM_MAX_VALUE = "mgen.max"; + public static final float PARAM_MIN_VALUE_DEFAULT = 0.0f; + public static final float PARAM_MAX_VALUE_DEFAULT = 1.0f; + + public static final String PARAM_SPARSITY = "mgen.sparsity"; + public static final double PARAM_SPARSITY_DEFAULT = 0.0; + + public static final String PARAM_FLOAT_FORMAT = "mgen.float-format"; + public static final String PARAM_FLOAT_FROMAT_DEFAULT = "%.3f"; + + public static final String PARAM_TAG = "mgen.tag"; + public static final String PARAM_SEED = "mgen.seed"; + public static final long PARAM_SEED_DEFAULT = System.currentTimeMillis(); + + public static final String PARAM_NUM_MAPPERS = "mgen.num-mappers"; + public static final int PARAM_NUM_MAPPERS_DEFAULT = 1; + + private Job getJobConf(String output) throws IOException { + Job job = Job.getInstance(getConf()); + + job.setJarByClass(mgen.class); + job.setJobName("[HW2] Sparse Matrix Generator"); + + FileOutputFormat.setOutputPath(job, new Path(output)); + + job.setInputFormatClass(GeneratorInputFormat.class); + + job.setMapperClass(GeneratorMapper.class); + + job.setMapOutputKeyClass(MatrixCoords.class); + job.setMapOutputValueClass(FloatWritable.class); + + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setNumReduceTasks(0); + + return job; + } + + public int run(String[] args) throws Exception { + String matrixPath = args[0].replaceAll("/$", ""); + Job job = getJobConf(matrixPath + "/data"); + int ret = job.waitForCompletion(true) ? 0 : 1; + if (ret == 0) { + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(URI.create(matrixPath), conf); + + try (BufferedWriter outputWriter = new BufferedWriter( + new OutputStreamWriter(fs.create(new Path(matrixPath + "/size"))) + )) { + outputWriter.write(String.format( + "%d\t%d\n", + conf.getInt(PARAM_ROW_COUNT, PARAM_ROW_COUNT_DEFAULT), + conf.getInt(PARAM_COL_COUNT, PARAM_COL_COUNT_DEFAULT) + )); + } + + try (BufferedWriter outputWriter = new BufferedWriter( + new FileWriter("stats.logs", true))) { + Counters counters = job.getCounters(); + long runtime = job.getFinishTime() - job.getStartTime(); + long bytesRead = counters.findCounter( + FileSystemCounter.class.getName(), "FILE_BYTES_READ" + ).getValue(); + long bytesWritten = counters.findCounter( + FileSystemCounter.class.getName(), "FILE_BYTES_WRITTEN" + ).getValue(); + long numMappers = getConf().getInt(PARAM_NUM_MAPPERS, PARAM_NUM_MAPPERS_DEFAULT); + outputWriter.write(String.format( + "%d\t%d\t%d\t%d\n", numMappers, runtime, bytesRead, bytesWritten + )); + } + } + return ret; + } + + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new mgen(), args); + System.exit(ret); + } +} +