Skip to content

Commit

Permalink
[Quant Tool] Prevent int32 quantized bias from clipping by adjusting …
Browse files Browse the repository at this point in the history
…the weight's scale (microsoft#22020)

### Description
Fixes scenario in which a bias input quantized to int32 has a scale that
is too small. A bias with a scale that is smaller than a certain
threshold will overflow the range of an `int32` when quantized, which
significantly decreases accuracy.

Credit to @yihonglyu for finding out about this issue and the fix.

### Motivation and Context
Consider the following Convolution with very small weights and a
constant bias input of `[5, -4.5]`.

![image](https://github.com/user-attachments/assets/4bde2bd9-892f-4ae9-887b-61a6668779a1)

The QDQ quantizer first computes the following quantization scale for
`input_0` and `weight`:
- `input_0`: scale=0.5
- `weight`: scale=7.843e-10 **[really small]**

The QDQ quantizer then computes the bias input's scale as follows:
```
bias_scale = input_0_scale * weight_0_scale = 0.5 * 7.843e-10 = 3.9215686274509805e-11
```

This `bias_scale` is too small. Before this PR, the QDQ quantizer would
quantize the f32 bias with this `bias_scale`:
```
bias_quant = round(bias_f32 / bias_scale) =  round([5.0/bias_scale, -4.5/bias_scale]) = [127500000000, -114750000000]
```
These quantized bias values exceed the range of int32, and so are
clipped to [int32.min(), int32.max()], which is very inaccurate.

#### New approach
This PR increases the `weight_0_scale` by the necessary amount to ensure
that `bias_scale` (which equals `weight_0_scale * input_0_scale`) is
appropriate for the int32 quantization type.

The smallest valid bias scale is given by the normal scale formula: 
`bias_smallest_valid_scale = (bias_f32_max - bias_f32_min) / (int32_max
- int32_min)`

Then, we compute the candidate bias scale:
`bias_scale_candidate = input_0_scale * weight_0_scale`

If the candidate scale is smaller than the smallest valid scale, we
increase the `weight_0_scale` by the necessary ratio:
```python
if bias_scale_candidate < bias_smallest_valid_scale:
    ratio = bias_smallest_valid_scale / bias_scale_candidate
    weight_0_scale = ratio * weight_0_scale
```

Then, we recompute the final bias scale:
```python
bias_scale = input_0_scale * weight_0_scale
```

#### Impact on accuracy
Here's the above model's quantized output compared to the f32
(ground-truth) output.
- Before PR: 
  - f32 model output[0]: **5.0f**
  - qdq model output[0]: **0.075**
  - SNR: 0.1369 (higher is better)
- After PR:
  - f32 model output[0]: **5.0f**
  - qdq model output[0]: **4.992**
  - SNR: 55.656 (higher is better)
  • Loading branch information
adrianlizarraga authored and ankitm3k committed Dec 11, 2024
1 parent 01ecbb0 commit 1bfb963
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 313 deletions.
48 changes: 10 additions & 38 deletions onnxruntime/python/tools/quantization/qdq_quantizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,30 +1168,6 @@ def is_tensor_per_channel(

return True, axis

def _get_tensor_quantization_scale(self, tensor_name: str, consumer_node_name: str) -> np.ndarray | None:
"""
Returns the quantization scale of a tensor that is consumed by the given node.
:parameter tensor_name: The name of the tensor.
:parameter consumer_node_name: The name of the node that consumes the tensor as input. Necessary in case
the quantization type of the tensor was converted.
Refer: QDQQuantizer::_add_qdq_ops_for_converted_activation.
:returns: The quantization scale or None.
"""
initializers = self.model.initializer()
scale_initializer: onnx.TensorProto | None = None

if tensor_name in self.quantized_value_map:
# Tensor was quantized by this tool, so get scale from initializer created by this tool run.
scale_name = self.quantized_value_map[tensor_name].get_for_consumer(consumer_node_name).scale_name
scale_initializer = find_by_name(scale_name, initializers)
else:
# Tensor was already quantized in original model, so get scale from DQ node that outputs the tensor.
dq_node = self.tensor_to_producing_dq.get(tensor_name, None)
if dq_node:
scale_initializer = find_by_name(dq_node.input[1], initializers)

return tensor_proto_to_array(scale_initializer) if scale_initializer is not None else None

def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> str:
"""
Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale
Expand All @@ -1201,21 +1177,17 @@ def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> s
if bias_name in self.quantized_value_map:
return self.quantized_value_map[bias_name].original.q_name

# get scale for weight.
weight_scale = self._get_tensor_quantization_scale(bias_info.weight_name, bias_info.node_name)
if weight_scale is None:
raise ValueError(
f"Unable to get valid quantization scale for weight input '{bias_info.weight_name}' "
f"when quantizing bias '{bias_name}' to int32."
)
# get scale for weight
weight_scale_name = self.quantized_value_map[bias_info.weight_name].original.scale_name
weight_scale_initializer = find_by_name(weight_scale_name, self.model.initializer())
weight_scale = tensor_proto_to_array(weight_scale_initializer)

# get scale for input.
input_scale = self._get_tensor_quantization_scale(bias_info.input_name, bias_info.node_name)
if input_scale is None:
raise ValueError(
f"Unable to get valid quantization scale for input '{bias_info.input_name}' "
f"when quantizing bias '{bias_name}' to int32."
)
# get scale for input
input_scale_name = (
self.quantized_value_map[bias_info.input_name].get_for_consumer(bias_info.node_name).scale_name
)
input_scale_initializer = find_by_name(input_scale_name, self.model.initializer())
input_scale = tensor_proto_to_array(input_scale_initializer)

(
quantized_bias_name,
Expand Down
275 changes: 0 additions & 275 deletions onnxruntime/test/python/quantization/test_qdq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1927,280 +1927,5 @@ def test_dup_shared_bias(self):
self.assertEqual(len(bias_names), 2)


class TestQDQPrequantWeights(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.prequant_weight")

# Note: swap with the commented line if you want to see the models in local test dir.
cls._tmp_dir_path = cls._tmp_model_dir.name
# cls._tmp_dir_path = "."

@classmethod
def tearDownClass(cls):
cls._tmp_model_dir.cleanup()

def build_conv_model(
self,
inp_shape: list[int],
weight_quant_data: np.ndarray,
weight_scale_data: np.ndarray,
weight_zp_data: np.ndarray,
bias_data: np.ndarray,
float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT,
):
"""
Builds a model with a Conv that has a pre-quantized constant weight input.
"""
input_0 = onnx.helper.make_tensor_value_info("input_0", float_type, inp_shape)
output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None)
weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_quant")
weight_scale = onnx.numpy_helper.from_array(weight_scale_data, "weight_scale")
weight_zp = onnx.numpy_helper.from_array(weight_zp_data, "weight_zp")
bias = onnx.numpy_helper.from_array(bias_data, "bias")

dq_node = onnx.helper.make_node(
"DequantizeLinear", ["weight_quant", "weight_scale", "weight_zp"], ["weight_dequant"], name="DQ0"
)
conv_node = onnx.helper.make_node("Conv", ["input_0", "weight_dequant", "bias"], ["output_0"], name="Conv0")
graph = onnx.helper.make_graph(
[dq_node, conv_node],
"ConvPreQuantWeight",
[input_0],
[output_0],
initializer=[weight_quant, weight_scale, weight_zp, bias],
)
opset_imports = [onnx.helper.make_opsetid("", 21)]
model = onnx.helper.make_model(graph, opset_imports=opset_imports)

return onnx.shape_inference.infer_shapes(model)

def build_conv_dynamic_weight_model(
self,
input_quant_data: np.ndarray,
input_scale_data: np.ndarray,
input_zp_data: np.ndarray,
weight_shape: list[int],
bias_data: np.ndarray,
float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT,
):
"""
Builds a model with a Conv that has a dynamic float weight input, but a constant
pre-quantized input[0].
"""
dyn_weight = onnx.helper.make_tensor_value_info("dyn_weight", float_type, weight_shape)
output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None)
input_quant = onnx.numpy_helper.from_array(input_quant_data, "input_quant")
input_scale = onnx.numpy_helper.from_array(input_scale_data, "input_scale")
input_zp = onnx.numpy_helper.from_array(input_zp_data, "input_zp")
bias = onnx.numpy_helper.from_array(bias_data, "bias")

dq_node = onnx.helper.make_node(
"DequantizeLinear", ["input_quant", "input_scale", "input_zp"], ["input_dequant"], name="DQ0"
)
conv_node = onnx.helper.make_node("Conv", ["input_dequant", "dyn_weight", "bias"], ["output_0"], name="Conv0")
graph = onnx.helper.make_graph(
[dq_node, conv_node],
"ConvPreQuantInput_DynamicWeight",
[dyn_weight],
[output_0],
initializer=[input_quant, input_scale, input_zp, bias],
)
opset_imports = [onnx.helper.make_opsetid("", 21)]
model = onnx.helper.make_model(graph, opset_imports=opset_imports)

return onnx.shape_inference.infer_shapes(model)

def test_quantize_with_prequantized_weights(self):
"""
Test quantization of Conv with pre-quantized weights.
"""
rng = np.random.default_rng(123)
test_configs = [onnx.TensorProto.FLOAT, onnx.TensorProto.FLOAT16]

for float_type in test_configs:
with self.subTest(float_type=float_type):
label = f"_{onnx.TensorProto.DataType.Name(float_type)}"
float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_weight{label}.onnx")
qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_weight{label}.qdq.onnx")

inp_shape = [1, 2, 100, 100]
weight_shape = [2, 2, 20, 20]
np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type)

# range = 2.0, scale = 2/254, zp = 0
weight_scale_data = np.array(2 / 254, dtype=np_dtype)
weight_zp_data = np.array(0, dtype=np.int8)
weight_data = np.linspace(-1.0, 1.0, num=1600, dtype=np_dtype).reshape(weight_shape)
weight_quant_data = quantize_nparray(
onnx.TensorProto.INT8, weight_data, weight_scale_data, weight_zp_data
)

bias_data = np.array([-10.0, 10.0], dtype=np_dtype)
float_model = self.build_conv_model(
inp_shape, weight_quant_data, weight_scale_data, weight_zp_data, bias_data, float_type
)

onnx.checker.check_model(float_model, True)
onnx.save_model(float_model, float_model_path)

# Check that the input model only has a pre-quantized weight and save its scale/zero-point
# to check that it doesn't change after quantization.
float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1}
check_op_type_count(self, float_model_path, **float_node_counts)
conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node_original, None)

_, producers_original = get_tensor_consumers_and_producers(float_model)
weight_dq_node_original = producers_original.get(conv_node_original.input[1], None)
initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer}
scale_name_original = weight_dq_node_original.input[1]
scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original])
zp_name_original = weight_dq_node_original.input[2]
zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original])

input_data_list = [
{"input_0": rng.uniform(-10.0, 10.0, inp_shape).astype(np_dtype)},
]
data_reader = TestDataFeeds(input_data_list)

quantize_static(
float_model_path,
qdq_model_path,
data_reader,
quant_format=QuantFormat.QDQ,
activation_type=QuantType.QUInt8,
weight_type=QuantType.QInt8,
op_types_to_quantize=["Conv"],
)

# The final model should have everything quantized
qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4}
check_op_type_count(self, qdq_model_path, **qdq_node_counts)

# Check that the pre-quantized weight still has the same scale/zp after quantization
qdq_model = onnx.load_model(qdq_model_path)
conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node, None)

_, producers = get_tensor_consumers_and_producers(qdq_model)
weight_dq_node = producers.get(conv_node.input[1], None)
initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer}

scale_name = weight_dq_node.input[1]
self.assertEqual(scale_name, scale_name_original)
scale_val = onnx.numpy_helper.to_array(initializers[scale_name])
self.assertEqual(scale_val, scale_val_original)

zp_name = weight_dq_node.input[2]
self.assertEqual(zp_name, zp_name_original)
zp_val = onnx.numpy_helper.to_array(initializers[zp_name])
self.assertEqual(zp_val, zp_val_original)

def test_quantize_with_prequantized_input(self):
"""
Test quantization of Conv with pre-quantized input and dynamic weight.
"""
rng = np.random.default_rng(123)
test_configs = [
(onnx.TensorProto.FLOAT, False),
(onnx.TensorProto.FLOAT16, False),
(onnx.TensorProto.FLOAT, True),
(onnx.TensorProto.FLOAT16, True),
]

for float_type, convert_weight_qtype in test_configs:
with self.subTest(float_type=float_type):
convert_label = "_convert_qtype" if convert_weight_qtype else ""
label = f"_{onnx.TensorProto.DataType.Name(float_type)}{convert_label}"
float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_input{label}.onnx")
qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_input{label}.qdq.onnx")

inp_shape = [1, 2, 40, 40]
weight_shape = [2, 2, 20, 20]
np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type)

# range = 3.0, scale = 3/255, zp = 127
input_scale_data = np.array(3 / 255, dtype=np_dtype)
input_zp_data = np.array(127, dtype=np.uint8)
input_data = np.linspace(-1.5, 1.5, num=3200, dtype=np_dtype).reshape(inp_shape)
input_quant_data = quantize_nparray(onnx.TensorProto.UINT8, input_data, input_scale_data, input_zp_data)

bias_data = np.array([-10.0, 10.0], dtype=np_dtype)
float_model = self.build_conv_dynamic_weight_model(
input_quant_data, input_scale_data, input_zp_data, weight_shape, bias_data, float_type
)

onnx.checker.check_model(float_model, True)
onnx.save_model(float_model, float_model_path)

# Check that the input model only has a pre-quantized input and save its scale/zero-point
# to check that it doesn't change after quantization.
float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1}
check_op_type_count(self, float_model_path, **float_node_counts)
conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node_original, None)

_, producers_original = get_tensor_consumers_and_producers(float_model)
input_dq_node_original = producers_original.get(conv_node_original.input[0], None)
initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer}
scale_name_original = input_dq_node_original.input[1]
scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original])
zp_name_original = input_dq_node_original.input[2]
zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original])

# Create data reader with random input calibration data.
dyn_weight_data_list = [
{"dyn_weight": rng.uniform(-10.0, 10.0, weight_shape).astype(np_dtype)},
]
data_reader = TestDataFeeds(dyn_weight_data_list)

extra_options = {}
if convert_weight_qtype:
# Test converting the dynamic weight's quantization type, which results in
# dyn_weight -> Q(u16) -> DQ(f32) -> Q(u8) -> DQ(f32) -> Conv
extra_options["TensorQuantOverrides"] = {
"dyn_weight": [{"quant_type": QuantType.QUInt16, "convert": {"quant_type": QuantType.QUInt8}}],
}

quantize_static(
float_model_path,
qdq_model_path,
data_reader,
quant_format=QuantFormat.QDQ,
activation_type=QuantType.QUInt8,
weight_type=QuantType.QInt8,
op_types_to_quantize=["Conv"],
extra_options=extra_options,
)

# The final model should have everything quantized
qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4}
if convert_weight_qtype:
qdq_node_counts["QuantizeLinear"] += 1
qdq_node_counts["DequantizeLinear"] += 1

check_op_type_count(self, qdq_model_path, **qdq_node_counts)

# Check that the pre-quantized input still has the same scale/zp after quantization
qdq_model = onnx.load_model(qdq_model_path)
conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node, None)

_, producers = get_tensor_consumers_and_producers(qdq_model)
input_dq_node = producers.get(conv_node.input[0], None)
initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer}

scale_name = input_dq_node.input[1]
self.assertEqual(scale_name, scale_name_original)
scale_val = onnx.numpy_helper.to_array(initializers[scale_name])
self.assertEqual(scale_val, scale_val_original)

zp_name = input_dq_node.input[2]
self.assertEqual(zp_name, zp_name_original)
zp_val = onnx.numpy_helper.to_array(initializers[zp_name])
self.assertEqual(zp_val, zp_val_original)


if __name__ == "__main__":
unittest.main()

0 comments on commit 1bfb963

Please sign in to comment.