Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sidhdirenge committed Feb 21, 2025
1 parent 78ca001 commit a47f30b
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.cdap.cdap.spi.data.table.field.FieldType;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorFactory;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorType;
import io.cdap.cdap.storage.spanner.compression.CompressorFactory;
import io.cdap.cdap.storage.spanner.compression.CompressorType;
import io.cdap.cdap.storage.spanner.compression.Compressor;

Check warning on line 29 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredRow.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'io.cdap.cdap.storage.spanner.compression.Compressor' import. Should be before 'io.cdap.cdap.storage.spanner.compression.CompressorType'.
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -68,18 +68,21 @@ public Boolean getBoolean(String fieldName) throws InvalidFieldException {
@Nullable
@Override
public String getString(String fieldName) throws InvalidFieldException {
String value = null;
if (!isNull(fieldName)) {
value = struct.getString(fieldName);
CompressorType compressorType = getCompressionStatus(fieldName);
if (value != null && compressorType != null) {
Compressor compressor = CompressorFactory.createCompressor(compressorType);
try {
value = compressor.decompress(value);
} catch (IOException e) {
Throwables.propagate(e);
}
}
if (isNull(fieldName)) {
return null;
}

String value = struct.getString(fieldName);
CompressorType compressorType = getCompressorType(fieldName);
if (value == null || compressorType == null) {
return value;
}

Compressor compressor = CompressorFactory.getCompressor(compressorType);
try {
value = compressor.decompress(value);
} catch (IOException e) {
Throwables.propagate(e);
}
return value;
}
Expand All @@ -99,18 +102,21 @@ public Double getDouble(String fieldName) throws InvalidFieldException {
@Nullable
@Override
public byte[] getBytes(String fieldName) throws InvalidFieldException {
byte[] value = null;
if (!isNull(fieldName)) {
value = struct.getBytes(fieldName).toByteArray();
CompressorType compressorType = getCompressionStatus(fieldName);
if (value != null && compressorType != null) {
Compressor compressor = CompressorFactory.createCompressor(compressorType);
try {
value = compressor.decompress(value);
} catch (IOException e) {
Throwables.propagate(e);
}
}
if (isNull(fieldName)) {
return null;
}

byte[] value = struct.getBytes(fieldName).toByteArray();
CompressorType compressorType = getCompressorType(fieldName);
if (value == null || compressorType == null) {
return value;
}

try {
Compressor compressor = CompressorFactory.getCompressor(compressorType);
value = compressor.decompress(value);
} catch (IOException e) {
Throwables.propagate(e);
}
return value;
}
Expand Down Expand Up @@ -159,9 +165,12 @@ public Collection<Field<?>> getPrimaryKeys() {
}
}

private CompressorType getCompressionStatus(String fieldName) {
// Check if the field is compression enabled based on cConf.
CompressionConfig.CompressorType compressor = schema.getCompressorFromConfig(fieldName);
/**
* Returns compression status of the field stored in db, if the field is compression enabled.
*/
private CompressorType getCompressorType(String fieldName) {
// First check if the field is compression enabled based on cConf.
CompressorType compressor = schema.getCompressorType(fieldName);
if (compressor == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorFactory;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorType;
import io.cdap.cdap.storage.spanner.compression.CompressorFactory;
import io.cdap.cdap.storage.spanner.compression.CompressorType;
import io.cdap.cdap.storage.spanner.compression.Compressor;

Check warning on line 43 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'io.cdap.cdap.storage.spanner.compression.Compressor' import. Should be before 'io.cdap.cdap.storage.spanner.compression.CompressorType'.
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void update(Collection<Field<?>> fields) throws InvalidFieldException {
primaryKeyFields.add(field);
} else {
updateFields.add(field);
CompressionConfig.CompressorType compressorType = determineCompressorForField(field);
CompressorType compressorType = determineCompressorForField(field);
if (compressorType != null) {
Field<String> compressedField = Fields.stringField(
field.getName() + CompressionConfig.COMPRESSED_COLUMN_SUFFIX,
Expand Down Expand Up @@ -619,7 +619,7 @@ private void insert(Collection<Field<?>> fields) throws InvalidFieldException {
for (Field<?> field : fields) {
fieldValidator.validateField(field);
insertFields.add(field);
CompressionConfig.CompressorType compressorType = determineCompressorForField(field);
CompressorType compressorType = determineCompressorForField(field);
if (compressorType != null) {
Field<String> compressedField = Fields.stringField(
field.getName() + CompressionConfig.COMPRESSED_COLUMN_SUFFIX,
Expand Down Expand Up @@ -651,7 +651,7 @@ private CompressorType determineCompressorForField(Field field) {
return null;
}

CompressionConfig.CompressorType compressor = schema.getCompressorFromConfig(field.getName());
CompressorType compressor = schema.getCompressorType(field.getName());
if (compressor == null) {
return null;
}
Expand Down Expand Up @@ -700,9 +700,9 @@ private Value getValue(Field<?> field) {

private String getStringValue(Field field) {
String val = (String) field.getValue();
CompressionConfig.CompressorType compressorType = determineCompressorForField(field);
CompressorType compressorType = determineCompressorForField(field);
if (compressorType != null) {
Compressor compressor = CompressorFactory.createCompressor(compressorType);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
try {
val = compressor.compress(val);
} catch (IOException e) {
Expand All @@ -714,9 +714,9 @@ private String getStringValue(Field field) {

private ByteArray getBytesValue(Field field) {
byte[] val = (byte[]) field.getValue();
CompressionConfig.CompressorType compressorType = determineCompressorForField(field);
CompressorType compressorType = determineCompressorForField(field);
if (compressorType != null) {
Compressor compressor = CompressorFactory.createCompressor(compressorType);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
try {
val = compressor.compress(val);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.cdap.cdap.spi.data.table.field.FieldType;
import io.cdap.cdap.spi.data.table.field.FieldType.Type;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorType;
import io.cdap.cdap.storage.spanner.compression.CompressorType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,39 @@
import io.cdap.cdap.spi.data.table.StructuredTableSchema;
import io.cdap.cdap.spi.data.table.field.FieldType;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig;
import io.cdap.cdap.storage.spanner.compression.CompressionConfig.CompressorType;
import io.cdap.cdap.storage.spanner.compression.CompressorType;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Wrapper over {@link StructuredTableSchema} that stores spanner specific config related to
* compression along with the table schema.
*/
public class SpannerStructuredTableSchema extends StructuredTableSchema {

/**
* Map of column name and compression config.
*/
private final Map<String, CompressionConfig> compressionConfigMap;
private final Map<String, CompressionConfig> compressionConfigs;

SpannerStructuredTableSchema(StructuredTableId tableId,
List<FieldType> fields,
List<String> primaryKeys,
Collection<String> indexes,
Map<String, CompressionConfig> compressionConfigMap) {
Map<String, CompressionConfig> compressionConfigs) {
super(tableId, fields, primaryKeys, indexes);
this.compressionConfigMap = compressionConfigMap;
this.compressionConfigs = compressionConfigs == null ?

Check warning on line 46 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableSchema.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'?' should be on a new line.
Collections.emptyMap() : Collections.unmodifiableMap(compressionConfigs);
}

/**

Check warning on line 50 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableSchema.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

Summary javadoc is missing.
* @param field column name for which compressor needs to be determined.
* @return compressor type set in cConf
*/
CompressorType getCompressorFromConfig(String field) {
if (compressionConfigMap == null) {
return null;
}
CompressionConfig config = compressionConfigMap.get(field);
CompressorType getCompressorType(String field) {
CompressionConfig config = compressionConfigs.get(field);
return config == null ? null : config.getCompressorType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,34 @@
*/
public class CompressionConfig {

/**
* Suffix for compressed columns.
*/
public static final String COMPRESSED_COLUMN_SUFFIX = "_compression_status";
/**
* Spanner limit for string field in bytes.
*/
public static final int STRING_LIMIT = 2621440;
/**
* Spanner limit for byte[] field in bytes.
*/
public static final int BYTES_LIMIT = 10 * 1024 * 1024;

private final CompressorType compressorType;

/**
* Constructor for {@link CompressionConfig}.
*
* @param compressorType the {@link CompressorType} to be used.
*/
public CompressionConfig(CompressorType compressorType) {
this.compressorType = compressorType;
}

/**
* Returns the {@link CompressorType}.
*/
public CompressorType getCompressorType() {
return compressorType;
}

public enum CompressorType {
SNAPPY;

public static CompressorType fromString(String value) {
if (value == null || value.isEmpty()) {
return null;
}
return valueOf(value.toUpperCase());
}
}

private final CompressorType compressorType;

public static class CompressorFactory {

public static Compressor createCompressor(CompressorType type) {
switch (type) {
case SNAPPY:
return new SnappyCompressor();
default:
throw new IllegalArgumentException("Unsupported compression type: " + type);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,43 @@
import java.io.IOException;

/**
* Interface for various implementations of compressors used for spanner.
* An interface for compressing and decompressing data.
*/
public interface Compressor {

/**
* Compresses the given string data.
*
* @param data the string data to compress
* @return the compressed string data
* @throws IOException if there is an error during compression
*/
String compress(String data) throws IOException;

/**
* Decompresses the given compressed string data.
*
* @param compressedData the compressed string data
* @return the decompressed string data
* @throws IOException if there is an error during decompression
*/
String decompress(String compressedData) throws IOException;

/**
* Compresses the given byte array data.
*
* @param data the byte array data to compress
* @return the compressed byte array data
* @throws IOException if there is an error during compression
*/
byte[] compress(byte[] data) throws IOException;

Check warning on line 51 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/compression/Compressor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.OverloadMethodsDeclarationOrderCheck

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '33'.

/**
* Decompresses the given compressed byte array data.
*
* @param compressedData the compressed byte array data
* @return the decompressed byte array data
* @throws IOException if there is an error during decompression
*/
byte[] decompress(byte[] compressedData) throws IOException;

Check warning on line 60 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/compression/Compressor.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.OverloadMethodsDeclarationOrderCheck

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '42'.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*

Check warning on line 1 in cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/compression/CompressorFactory.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.NewlineAtEndOfFileCheck

File does not end with a newline.
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.storage.spanner.compression;

import java.util.EnumMap;

/**
* Factory class for creating and retrieving {@link Compressor} instances based on
* {@link CompressorType}.
*/
public class CompressorFactory {

private static final EnumMap<CompressorType, Compressor> compressorMap = new EnumMap<>(
CompressorType.class);

static {
compressorMap.put(CompressorType.SNAPPY, new SnappyCompressor());
}

/**
* Method to retrieve the Compressor instance for a given CompressorType.
*/
public static Compressor getCompressor(CompressorType type) {
Compressor compressor = compressorMap.get(type);
if (compressor == null) {
throw new IllegalArgumentException("Unsupported compression type: " + type);
}
return compressor;
}
}
Loading

0 comments on commit a47f30b

Please sign in to comment.