diff --git a/onnxruntime/python/tools/quantization/qdq_quantizer.py b/onnxruntime/python/tools/quantization/qdq_quantizer.py index 5552a4451c542..05a487db0a0ae 100644 --- a/onnxruntime/python/tools/quantization/qdq_quantizer.py +++ b/onnxruntime/python/tools/quantization/qdq_quantizer.py @@ -1168,30 +1168,6 @@ def is_tensor_per_channel( return True, axis - def _get_tensor_quantization_scale(self, tensor_name: str, consumer_node_name: str) -> np.ndarray | None: - """ - Returns the quantization scale of a tensor that is consumed by the given node. - :parameter tensor_name: The name of the tensor. - :parameter consumer_node_name: The name of the node that consumes the tensor as input. Necessary in case - the quantization type of the tensor was converted. - Refer: QDQQuantizer::_add_qdq_ops_for_converted_activation. - :returns: The quantization scale or None. - """ - initializers = self.model.initializer() - scale_initializer: onnx.TensorProto | None = None - - if tensor_name in self.quantized_value_map: - # Tensor was quantized by this tool, so get scale from initializer created by this tool run. - scale_name = self.quantized_value_map[tensor_name].get_for_consumer(consumer_node_name).scale_name - scale_initializer = find_by_name(scale_name, initializers) - else: - # Tensor was already quantized in original model, so get scale from DQ node that outputs the tensor. - dq_node = self.tensor_to_producing_dq.get(tensor_name, None) - if dq_node: - scale_initializer = find_by_name(dq_node.input[1], initializers) - - return tensor_proto_to_array(scale_initializer) if scale_initializer is not None else None - def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> str: """ Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale @@ -1201,21 +1177,17 @@ def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> s if bias_name in self.quantized_value_map: return self.quantized_value_map[bias_name].original.q_name - # get scale for weight. - weight_scale = self._get_tensor_quantization_scale(bias_info.weight_name, bias_info.node_name) - if weight_scale is None: - raise ValueError( - f"Unable to get valid quantization scale for weight input '{bias_info.weight_name}' " - f"when quantizing bias '{bias_name}' to int32." - ) + # get scale for weight + weight_scale_name = self.quantized_value_map[bias_info.weight_name].original.scale_name + weight_scale_initializer = find_by_name(weight_scale_name, self.model.initializer()) + weight_scale = tensor_proto_to_array(weight_scale_initializer) - # get scale for input. - input_scale = self._get_tensor_quantization_scale(bias_info.input_name, bias_info.node_name) - if input_scale is None: - raise ValueError( - f"Unable to get valid quantization scale for input '{bias_info.input_name}' " - f"when quantizing bias '{bias_name}' to int32." - ) + # get scale for input + input_scale_name = ( + self.quantized_value_map[bias_info.input_name].get_for_consumer(bias_info.node_name).scale_name + ) + input_scale_initializer = find_by_name(input_scale_name, self.model.initializer()) + input_scale = tensor_proto_to_array(input_scale_initializer) ( quantized_bias_name, diff --git a/onnxruntime/test/python/quantization/test_qdq.py b/onnxruntime/test/python/quantization/test_qdq.py index 23b397ffd80e1..f38cd738f1142 100644 --- a/onnxruntime/test/python/quantization/test_qdq.py +++ b/onnxruntime/test/python/quantization/test_qdq.py @@ -1927,280 +1927,5 @@ def test_dup_shared_bias(self): self.assertEqual(len(bias_names), 2) -class TestQDQPrequantWeights(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.prequant_weight") - - # Note: swap with the commented line if you want to see the models in local test dir. - cls._tmp_dir_path = cls._tmp_model_dir.name - # cls._tmp_dir_path = "." - - @classmethod - def tearDownClass(cls): - cls._tmp_model_dir.cleanup() - - def build_conv_model( - self, - inp_shape: list[int], - weight_quant_data: np.ndarray, - weight_scale_data: np.ndarray, - weight_zp_data: np.ndarray, - bias_data: np.ndarray, - float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT, - ): - """ - Builds a model with a Conv that has a pre-quantized constant weight input. - """ - input_0 = onnx.helper.make_tensor_value_info("input_0", float_type, inp_shape) - output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None) - weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_quant") - weight_scale = onnx.numpy_helper.from_array(weight_scale_data, "weight_scale") - weight_zp = onnx.numpy_helper.from_array(weight_zp_data, "weight_zp") - bias = onnx.numpy_helper.from_array(bias_data, "bias") - - dq_node = onnx.helper.make_node( - "DequantizeLinear", ["weight_quant", "weight_scale", "weight_zp"], ["weight_dequant"], name="DQ0" - ) - conv_node = onnx.helper.make_node("Conv", ["input_0", "weight_dequant", "bias"], ["output_0"], name="Conv0") - graph = onnx.helper.make_graph( - [dq_node, conv_node], - "ConvPreQuantWeight", - [input_0], - [output_0], - initializer=[weight_quant, weight_scale, weight_zp, bias], - ) - opset_imports = [onnx.helper.make_opsetid("", 21)] - model = onnx.helper.make_model(graph, opset_imports=opset_imports) - - return onnx.shape_inference.infer_shapes(model) - - def build_conv_dynamic_weight_model( - self, - input_quant_data: np.ndarray, - input_scale_data: np.ndarray, - input_zp_data: np.ndarray, - weight_shape: list[int], - bias_data: np.ndarray, - float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT, - ): - """ - Builds a model with a Conv that has a dynamic float weight input, but a constant - pre-quantized input[0]. - """ - dyn_weight = onnx.helper.make_tensor_value_info("dyn_weight", float_type, weight_shape) - output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None) - input_quant = onnx.numpy_helper.from_array(input_quant_data, "input_quant") - input_scale = onnx.numpy_helper.from_array(input_scale_data, "input_scale") - input_zp = onnx.numpy_helper.from_array(input_zp_data, "input_zp") - bias = onnx.numpy_helper.from_array(bias_data, "bias") - - dq_node = onnx.helper.make_node( - "DequantizeLinear", ["input_quant", "input_scale", "input_zp"], ["input_dequant"], name="DQ0" - ) - conv_node = onnx.helper.make_node("Conv", ["input_dequant", "dyn_weight", "bias"], ["output_0"], name="Conv0") - graph = onnx.helper.make_graph( - [dq_node, conv_node], - "ConvPreQuantInput_DynamicWeight", - [dyn_weight], - [output_0], - initializer=[input_quant, input_scale, input_zp, bias], - ) - opset_imports = [onnx.helper.make_opsetid("", 21)] - model = onnx.helper.make_model(graph, opset_imports=opset_imports) - - return onnx.shape_inference.infer_shapes(model) - - def test_quantize_with_prequantized_weights(self): - """ - Test quantization of Conv with pre-quantized weights. - """ - rng = np.random.default_rng(123) - test_configs = [onnx.TensorProto.FLOAT, onnx.TensorProto.FLOAT16] - - for float_type in test_configs: - with self.subTest(float_type=float_type): - label = f"_{onnx.TensorProto.DataType.Name(float_type)}" - float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_weight{label}.onnx") - qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_weight{label}.qdq.onnx") - - inp_shape = [1, 2, 100, 100] - weight_shape = [2, 2, 20, 20] - np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type) - - # range = 2.0, scale = 2/254, zp = 0 - weight_scale_data = np.array(2 / 254, dtype=np_dtype) - weight_zp_data = np.array(0, dtype=np.int8) - weight_data = np.linspace(-1.0, 1.0, num=1600, dtype=np_dtype).reshape(weight_shape) - weight_quant_data = quantize_nparray( - onnx.TensorProto.INT8, weight_data, weight_scale_data, weight_zp_data - ) - - bias_data = np.array([-10.0, 10.0], dtype=np_dtype) - float_model = self.build_conv_model( - inp_shape, weight_quant_data, weight_scale_data, weight_zp_data, bias_data, float_type - ) - - onnx.checker.check_model(float_model, True) - onnx.save_model(float_model, float_model_path) - - # Check that the input model only has a pre-quantized weight and save its scale/zero-point - # to check that it doesn't change after quantization. - float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1} - check_op_type_count(self, float_model_path, **float_node_counts) - conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None) - self.assertNotEqual(conv_node_original, None) - - _, producers_original = get_tensor_consumers_and_producers(float_model) - weight_dq_node_original = producers_original.get(conv_node_original.input[1], None) - initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer} - scale_name_original = weight_dq_node_original.input[1] - scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original]) - zp_name_original = weight_dq_node_original.input[2] - zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original]) - - input_data_list = [ - {"input_0": rng.uniform(-10.0, 10.0, inp_shape).astype(np_dtype)}, - ] - data_reader = TestDataFeeds(input_data_list) - - quantize_static( - float_model_path, - qdq_model_path, - data_reader, - quant_format=QuantFormat.QDQ, - activation_type=QuantType.QUInt8, - weight_type=QuantType.QInt8, - op_types_to_quantize=["Conv"], - ) - - # The final model should have everything quantized - qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4} - check_op_type_count(self, qdq_model_path, **qdq_node_counts) - - # Check that the pre-quantized weight still has the same scale/zp after quantization - qdq_model = onnx.load_model(qdq_model_path) - conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None) - self.assertNotEqual(conv_node, None) - - _, producers = get_tensor_consumers_and_producers(qdq_model) - weight_dq_node = producers.get(conv_node.input[1], None) - initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer} - - scale_name = weight_dq_node.input[1] - self.assertEqual(scale_name, scale_name_original) - scale_val = onnx.numpy_helper.to_array(initializers[scale_name]) - self.assertEqual(scale_val, scale_val_original) - - zp_name = weight_dq_node.input[2] - self.assertEqual(zp_name, zp_name_original) - zp_val = onnx.numpy_helper.to_array(initializers[zp_name]) - self.assertEqual(zp_val, zp_val_original) - - def test_quantize_with_prequantized_input(self): - """ - Test quantization of Conv with pre-quantized input and dynamic weight. - """ - rng = np.random.default_rng(123) - test_configs = [ - (onnx.TensorProto.FLOAT, False), - (onnx.TensorProto.FLOAT16, False), - (onnx.TensorProto.FLOAT, True), - (onnx.TensorProto.FLOAT16, True), - ] - - for float_type, convert_weight_qtype in test_configs: - with self.subTest(float_type=float_type): - convert_label = "_convert_qtype" if convert_weight_qtype else "" - label = f"_{onnx.TensorProto.DataType.Name(float_type)}{convert_label}" - float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_input{label}.onnx") - qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_input{label}.qdq.onnx") - - inp_shape = [1, 2, 40, 40] - weight_shape = [2, 2, 20, 20] - np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type) - - # range = 3.0, scale = 3/255, zp = 127 - input_scale_data = np.array(3 / 255, dtype=np_dtype) - input_zp_data = np.array(127, dtype=np.uint8) - input_data = np.linspace(-1.5, 1.5, num=3200, dtype=np_dtype).reshape(inp_shape) - input_quant_data = quantize_nparray(onnx.TensorProto.UINT8, input_data, input_scale_data, input_zp_data) - - bias_data = np.array([-10.0, 10.0], dtype=np_dtype) - float_model = self.build_conv_dynamic_weight_model( - input_quant_data, input_scale_data, input_zp_data, weight_shape, bias_data, float_type - ) - - onnx.checker.check_model(float_model, True) - onnx.save_model(float_model, float_model_path) - - # Check that the input model only has a pre-quantized input and save its scale/zero-point - # to check that it doesn't change after quantization. - float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1} - check_op_type_count(self, float_model_path, **float_node_counts) - conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None) - self.assertNotEqual(conv_node_original, None) - - _, producers_original = get_tensor_consumers_and_producers(float_model) - input_dq_node_original = producers_original.get(conv_node_original.input[0], None) - initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer} - scale_name_original = input_dq_node_original.input[1] - scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original]) - zp_name_original = input_dq_node_original.input[2] - zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original]) - - # Create data reader with random input calibration data. - dyn_weight_data_list = [ - {"dyn_weight": rng.uniform(-10.0, 10.0, weight_shape).astype(np_dtype)}, - ] - data_reader = TestDataFeeds(dyn_weight_data_list) - - extra_options = {} - if convert_weight_qtype: - # Test converting the dynamic weight's quantization type, which results in - # dyn_weight -> Q(u16) -> DQ(f32) -> Q(u8) -> DQ(f32) -> Conv - extra_options["TensorQuantOverrides"] = { - "dyn_weight": [{"quant_type": QuantType.QUInt16, "convert": {"quant_type": QuantType.QUInt8}}], - } - - quantize_static( - float_model_path, - qdq_model_path, - data_reader, - quant_format=QuantFormat.QDQ, - activation_type=QuantType.QUInt8, - weight_type=QuantType.QInt8, - op_types_to_quantize=["Conv"], - extra_options=extra_options, - ) - - # The final model should have everything quantized - qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4} - if convert_weight_qtype: - qdq_node_counts["QuantizeLinear"] += 1 - qdq_node_counts["DequantizeLinear"] += 1 - - check_op_type_count(self, qdq_model_path, **qdq_node_counts) - - # Check that the pre-quantized input still has the same scale/zp after quantization - qdq_model = onnx.load_model(qdq_model_path) - conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None) - self.assertNotEqual(conv_node, None) - - _, producers = get_tensor_consumers_and_producers(qdq_model) - input_dq_node = producers.get(conv_node.input[0], None) - initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer} - - scale_name = input_dq_node.input[1] - self.assertEqual(scale_name, scale_name_original) - scale_val = onnx.numpy_helper.to_array(initializers[scale_name]) - self.assertEqual(scale_val, scale_val_original) - - zp_name = input_dq_node.input[2] - self.assertEqual(zp_name, zp_name_original) - zp_val = onnx.numpy_helper.to_array(initializers[zp_name]) - self.assertEqual(zp_val, zp_val_original) - - if __name__ == "__main__": unittest.main()