From f8d69444fc096cf7a961fdb90958176a0fd60e80 Mon Sep 17 00:00:00 2001 From: YoungHypo Date: Mon, 9 Dec 2024 22:05:15 -0800 Subject: [PATCH 1/3] add anchor funcs in channel routes --- .../api/lib/configtxlator/configtxlator.py | 50 +++++++++----- src/api-engine/api/lib/jq/__init__.py | 0 src/api-engine/api/lib/jq/jq.py | 29 ++++++++ src/api-engine/api/lib/peer/channel.py | 19 +++++- src/api-engine/api/routes/channel/views.py | 67 +++++++++++++++++++ 5 files changed, 145 insertions(+), 20 deletions(-) create mode 100644 src/api-engine/api/lib/jq/__init__.py create mode 100644 src/api-engine/api/lib/jq/jq.py diff --git a/src/api-engine/api/lib/configtxlator/configtxlator.py b/src/api-engine/api/lib/configtxlator/configtxlator.py index 879954032..241a2931a 100644 --- a/src/api-engine/api/lib/configtxlator/configtxlator.py +++ b/src/api-engine/api/lib/configtxlator/configtxlator.py @@ -4,6 +4,8 @@ from subprocess import call, run from api.config import FABRIC_TOOL, FABRIC_VERSION +import logging +LOG = logging.getLogger(__name__) class ConfigTxLator: """ @@ -24,17 +26,21 @@ def proto_encode(self, input, type, output): output: A file to write the output to. """ try: - call([self.configtxlator, - "proto_encode", - "--input={}".format(input), - "--type={}".format(type), - "--output={}".format(output), - ]) + command = [self.configtxlator, + "proto_encode", + "--input={}".format(input), + "--type={}".format(type), + "--output={}".format(output), + ] + + LOG.info(" ".join(command)) + + call(command) except Exception as e: err_msg = "configtxlator proto decode fail! " raise Exception(err_msg + str(e)) - def proto_decode(self, input, type): + def proto_decode(self, input, type, output): """ Converts a proto message to JSON. @@ -45,12 +51,18 @@ def proto_decode(self, input, type): config """ try: - res = run([self.configtxlator, + command = [self.configtxlator, "proto_decode", "--type={}".format(type), "--input={}".format(input), - ], - capture_output=True) + "--output={}".format(output), + ] + + LOG.info(" ".join(command)) + + res = run(command, + capture_output=True) + if res.returncode == 0 : return res.stdout else: @@ -71,13 +83,17 @@ def compute_update(self, original, updated, channel_id, output): output: A file to write the JSON document to. """ try: - call([self.configtxlator, - "compute_update", - "--original={}".format(original), - "--updated={}".format(updated), - "--channel_id={}".format(channel_id), - "--output={}".format(output), - ]) + command = [self.configtxlator, + "compute_update", + "--original={}".format(original), + "--updated={}".format(updated), + "--channel_id={}".format(channel_id), + "--output={}".format(output), + ] + + LOG.info(" ".join(command)) + + call(command) except Exception as e: err_msg = "configtxlator compute update fail! " raise Exception(err_msg + str(e)) diff --git a/src/api-engine/api/lib/jq/__init__.py b/src/api-engine/api/lib/jq/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/api-engine/api/lib/jq/jq.py b/src/api-engine/api/lib/jq/jq.py new file mode 100644 index 000000000..696f116df --- /dev/null +++ b/src/api-engine/api/lib/jq/jq.py @@ -0,0 +1,29 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# + +from subprocess import call +import logging + +LOG = logging.getLogger(__name__) + +class JQ: + def __init__(self): + self.jq = "jq" + + def filter(self, input, output, expression): + """ + Filter the input file with the given expression and write the output to the given file. + """ + try: + command = [self.jq, + expression, + input, + ">", output] + + LOG.info(" ".join(command)) + + call(command) + except Exception as e: + err_msg = "jq filter fail! " + raise Exception(err_msg + str(e)) diff --git a/src/api-engine/api/lib/peer/channel.py b/src/api-engine/api/lib/peer/channel.py index e3b000a3b..6bf9a257f 100644 --- a/src/api-engine/api/lib/peer/channel.py +++ b/src/api-engine/api/lib/peer/channel.py @@ -87,12 +87,25 @@ def update(self, channel, channel_tx, orderer_url): orderer_url: Ordering service endpoint. """ try: - res = os.system("{} channel update -c {} -f {} -o {}" - .format(self.peer, channel, channel_tx, orderer_url)) + ORDERER_CA = os.getenv("ORDERER_CA") + + command = [ + self.peer, + "channel", "update", + "-f", channel_tx, + "-c", channel, + "-o", orderer_url, + "--ordererTLSHostnameOverride", orderer_url.split(":")[0], + "--tls", + "--cafile", ORDERER_CA + ] + LOG.info(" ".join(command)) + + res = subprocess.run(command, check=True) + except Exception as e: err_msg = "update channel failed for {}!".format(e) raise Exception(err_msg) - res = res >> 8 return res def fetch(self, block_path, channel, orderer_general_url, max_retries=5, retry_interval=1): diff --git a/src/api-engine/api/routes/channel/views.py b/src/api-engine/api/routes/channel/views.py index 65d4ed811..d272efe3d 100644 --- a/src/api-engine/api/routes/channel/views.py +++ b/src/api-engine/api/routes/channel/views.py @@ -20,6 +20,7 @@ from api.common.serializers import PageQuerySerializer from api.utils.common import with_common_response, parse_block_file, to_dict from api.lib.configtxgen import ConfigTX, ConfigTxGen +from api.lib.jq.jq import JQ from api.lib.peer.channel import Channel as PeerChannel from api.lib.configtxlator.configtxlator import ConfigTxLator from api.exceptions import ( @@ -432,8 +433,74 @@ def set_anchor_peer(name, org, peers, ordering_node): :param peers: list of Node objects :return: none """ + org_msp = '{}MSP'.format(org.name.split(".", 1)[0].capitalize()) + channel_artifacts_path = "{}/{}".format(CELLO_HOME, org.network.name) peer_channel_fetch(name, org, peers, ordering_node) + ConfigTxLator().proto_encode( + input="{}/config_block.pb".format(channel_artifacts_path), + type="common.Block", + output="{}/config_block.json".format(channel_artifacts_path), + ) + + JQ().filter( + input="{}/config_block.json".format(channel_artifacts_path), + output="{}/config.json".format(channel_artifacts_path), + expression=".data.data[0].payload.data.config" + ) + + JQ().filter( + input="{}/config.json".format(channel_artifacts_path), + output="{}/modified_config.json".format(channel_artifacts_path), + expression=".channel_group.groups.Application.groups.{}.values += {{AnchorPeers:{{mod_policy: Admins,value:{{anchor_peers:[{{host: {},port: {}}}]}},version: 0}}}}".format(org_msp, peers[0].name, str(7051)) + ) + + ConfigTxLator().proto_encode( + input="{}/config.json".format(channel_artifacts_path), + type="common.Config", + output="{}/config.pb".format(channel_artifacts_path), + ) + + ConfigTxLator().proto_encode( + input="{}/modified_config.json".format(channel_artifacts_path), + type="common.Config", + output="{}/modified_config.pb".format(channel_artifacts_path), + ) + + ConfigTxLator().compute_update( + original="{}/config.pb".format(channel_artifacts_path), + updated="{}/modified_config.pb".format(channel_artifacts_path), + channel_id=name, + output="{}/config_update.pb".format(channel_artifacts_path), + ) + + ConfigTxLator().proto_decode( + input="{}/config_update.pb".format(channel_artifacts_path), + type="common.ConfigUpdate", + output="{}/config_update.json".format(channel_artifacts_path), + ) + + JQ().filter( + input=".", + output="{}/config_update_in_envelope.json".format(channel_artifacts_path), + expression="" + ) + + ConfigTxLator().proto_encode( + input="{}/config_update_in_envelope.json".format(channel_artifacts_path), + type="common.Envelope", + output="{}/config_update_in_envelope.pb".format(channel_artifacts_path), + ) + + envs = init_env_vars(ordering_node, org) + peer_channel_cli = PeerChannel(**envs) + peer_channel_cli.update( + channel=name, + channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path), + orderer_url="{}.{}:{}".format( + ordering_node.name, org.name.split(".", 1)[1], str(7050)), + ) + def peer_channel_fetch(name, org, peers, ordering_node): """ From f02e00a8f5beb09d2ba0c14ce0e5175d43738adf Mon Sep 17 00:00:00 2001 From: YoungHypo Date: Tue, 10 Dec 2024 00:35:09 -0800 Subject: [PATCH 2/3] finish anchor peer of the last step of channel creation --- .../api/lib/configtxlator/configtxlator.py | 7 +- src/api-engine/api/lib/jq/__init__.py | 0 src/api-engine/api/lib/jq/jq.py | 29 ----- src/api-engine/api/routes/channel/views.py | 86 +++++++++----- src/api-engine/api/utils/common.py | 109 ++++++++++++++++++ 5 files changed, 170 insertions(+), 61 deletions(-) delete mode 100644 src/api-engine/api/lib/jq/__init__.py delete mode 100644 src/api-engine/api/lib/jq/jq.py diff --git a/src/api-engine/api/lib/configtxlator/configtxlator.py b/src/api-engine/api/lib/configtxlator/configtxlator.py index 241a2931a..d25fb42c5 100644 --- a/src/api-engine/api/lib/configtxlator/configtxlator.py +++ b/src/api-engine/api/lib/configtxlator/configtxlator.py @@ -60,13 +60,8 @@ def proto_decode(self, input, type, output): LOG.info(" ".join(command)) - res = run(command, - capture_output=True) + call(command) - if res.returncode == 0 : - return res.stdout - else: - return res.stderr except Exception as e: err_msg = "configtxlator proto decode fail! " raise Exception(err_msg + str(e)) diff --git a/src/api-engine/api/lib/jq/__init__.py b/src/api-engine/api/lib/jq/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/api-engine/api/lib/jq/jq.py b/src/api-engine/api/lib/jq/jq.py deleted file mode 100644 index 696f116df..000000000 --- a/src/api-engine/api/lib/jq/jq.py +++ /dev/null @@ -1,29 +0,0 @@ -# -# SPDX-License-Identifier: Apache-2.0 -# - -from subprocess import call -import logging - -LOG = logging.getLogger(__name__) - -class JQ: - def __init__(self): - self.jq = "jq" - - def filter(self, input, output, expression): - """ - Filter the input file with the given expression and write the output to the given file. - """ - try: - command = [self.jq, - expression, - input, - ">", output] - - LOG.info(" ".join(command)) - - call(command) - except Exception as e: - err_msg = "jq filter fail! " - raise Exception(err_msg + str(e)) diff --git a/src/api-engine/api/routes/channel/views.py b/src/api-engine/api/routes/channel/views.py index d272efe3d..c08d35b5e 100644 --- a/src/api-engine/api/routes/channel/views.py +++ b/src/api-engine/api/routes/channel/views.py @@ -18,7 +18,7 @@ from api.config import CELLO_HOME from api.common.serializers import PageQuerySerializer -from api.utils.common import with_common_response, parse_block_file, to_dict +from api.utils.common import with_common_response, parse_block_file, to_dict, json_filter, json_add_anchor_peer, json_create_envelope from api.lib.configtxgen import ConfigTX, ConfigTxGen from api.lib.jq.jq import JQ from api.lib.peer.channel import Channel as PeerChannel @@ -148,7 +148,8 @@ def create(self, request): peer_channel_join(name, peers, org) # set anchor peer - set_anchor_peer(name, org, peers, ordering_node) + anchor_peer = Node.objects.get(id=peers[0]) + set_anchor_peer(name, org, anchor_peer, ordering_node) # save channel to db channel = Channel( @@ -426,33 +427,55 @@ def peer_channel_join(name, peers, org): CELLO_HOME, org.network.name, name) ) -def set_anchor_peer(name, org, peers, ordering_node): +def set_anchor_peer(name, org, anchor_peer, ordering_node): """ Set anchor peer for the channel. :param org: Organization object. - :param peers: list of Node objects + :param anchor_peer: Anchor peer node + :param ordering_node: Orderer node :return: none """ - org_msp = '{}MSP'.format(org.name.split(".", 1)[0].capitalize()) + org_msp = '{}'.format(org.name.split(".", 1)[0].capitalize()) channel_artifacts_path = "{}/{}".format(CELLO_HOME, org.network.name) - peer_channel_fetch(name, org, peers, ordering_node) + + # Fetch the channel block from the orderer + peer_channel_fetch(name, org, anchor_peer, ordering_node) - ConfigTxLator().proto_encode( + # Decode block to JSON + ConfigTxLator().proto_decode( input="{}/config_block.pb".format(channel_artifacts_path), type="common.Block", output="{}/config_block.json".format(channel_artifacts_path), ) - - JQ().filter( + + # Get the config data from the block + json_filter( input="{}/config_block.json".format(channel_artifacts_path), output="{}/config.json".format(channel_artifacts_path), expression=".data.data[0].payload.data.config" ) - JQ().filter( + # add anchor peer config + anchor_peer_config = { + "AnchorPeers": { + "mod_policy": "Admins", + "value": { + "anchor_peers": [ + { + "host": anchor_peer.name + "." + org.name, + "port": 7051 + } + ] + }, + "version": 0 + } + } + + json_add_anchor_peer( input="{}/config.json".format(channel_artifacts_path), output="{}/modified_config.json".format(channel_artifacts_path), - expression=".channel_group.groups.Application.groups.{}.values += {{AnchorPeers:{{mod_policy: Admins,value:{{anchor_peers:[{{host: {},port: {}}}]}},version: 0}}}}".format(org_msp, peers[0].name, str(7051)) + anchor_peer_config=anchor_peer_config, + org_msp=org_msp ) ConfigTxLator().proto_encode( @@ -480,10 +503,11 @@ def set_anchor_peer(name, org, peers, ordering_node): output="{}/config_update.json".format(channel_artifacts_path), ) - JQ().filter( - input=".", + # Create config update envelope + json_create_envelope( + input="{}/config_update.json".format(channel_artifacts_path), output="{}/config_update_in_envelope.json".format(channel_artifacts_path), - expression="" + channel=name ) ConfigTxLator().proto_encode( @@ -492,31 +516,41 @@ def set_anchor_peer(name, org, peers, ordering_node): output="{}/config_update_in_envelope.pb".format(channel_artifacts_path), ) - envs = init_env_vars(ordering_node, org) - peer_channel_cli = PeerChannel(**envs) - peer_channel_cli.update( - channel=name, - channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path), - orderer_url="{}.{}:{}".format( - ordering_node.name, org.name.split(".", 1)[1], str(7050)), - ) + # Update the channel of anchor peer + peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path) -def peer_channel_fetch(name, org, peers, ordering_node): +def peer_channel_fetch(name, org, anchor_peer, ordering_node): """ Fetch the channel block from the orderer. - :param peers: list of Node objects + :param anchor_peer: Anchor peer node :param org: Organization object. :param channel_name: Name of the channel. :return: none """ - peer_node = Node.objects.get(id=peers[0]) - envs = init_env_vars(peer_node, org) + envs = init_env_vars(anchor_peer, org) peer_channel_cli = PeerChannel(**envs) peer_channel_cli.fetch(block_path="{}/{}/config_block.pb".format(CELLO_HOME, org.network.name), channel=name, orderer_general_url="{}.{}:{}".format( ordering_node.name, org.name.split(".", 1)[1], str(7050))) +def peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path): + """ + Update the channel. + :param anchor_peer: Anchor peer node + :param org: Organization object. + :param channel_name: Name of the channel. + :return: none + """ + envs = init_env_vars(anchor_peer, org) + peer_channel_cli = PeerChannel(**envs) + peer_channel_cli.update( + channel=name, + channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path), + orderer_url="{}.{}:{}".format( + ordering_node.name, org.name.split(".", 1)[1], str(7050)), + ) + def init_env_vars(node, org): """ diff --git a/src/api-engine/api/utils/common.py b/src/api-engine/api/utils/common.py index f16ed85ab..8829d3ea8 100644 --- a/src/api-engine/api/utils/common.py +++ b/src/api-engine/api/utils/common.py @@ -13,7 +13,10 @@ import uuid from zipfile import ZipFile from json import loads +import json +import logging +LOG = logging.getLogger(__name__) def make_uuid(): return str(uuid.uuid4()) @@ -153,3 +156,109 @@ def parse_block_file(data): def to_dict(data): return loads(data) + + +def json_filter(input, output, expression): + """ + Process JSON data using path expression similar to jq + + Args: + input (str): JSON data or file path to JSON + output (str): Path expression like ".data.data[0].payload.data.config" + + Returns: + dict: Processed JSON data + """ + # if json_data is a file path, read the file + if isinstance(input, str): + with open(input, 'r', encoding='utf-8') as f: + data = json.load(f) + else: + data = input + + # parse the path expression + path_parts = expression.strip('.').split('.') + result = data + + for part in path_parts: + # handle array index, like data[0] + if '[' in part and ']' in part: + array_name = part.split('[')[0] + index = int(part.split('[')[1].split(']')[0]) + result = result[array_name][index] + else: + result = result[part] + + with open(output, 'w', encoding='utf-8') as f: + json.dump(result, f, sort_keys=False, indent=4) + + LOG.info("jq {} {} -> {}".format(expression, input, output)) + +def json_add_anchor_peer(input, output, anchor_peer_config, org_msp): + """ + Add anchor peer to the organization + + Args: + input (str): JSON data or file path to JSON + output (str): Path expression like ".data.data[0].payload.data.config" + expression (str): Anchor peer data + """ + # if json_data is a file path, read the file + if isinstance(input, str): + with open(input, 'r', encoding='utf-8') as f: + data = json.load(f) + else: + data = input + + if "groups" not in data["channel_group"]: + data["channel_group"]["groups"] = {} + if "Application" not in data["channel_group"]["groups"]: + data["channel_group"]["groups"]["Application"] = {"groups": {}} + if org_msp not in data["channel_group"]["groups"]["Application"]["groups"]: + data["channel_group"]["groups"]["Application"]["groups"][org_msp] = {"values": {}} + + data["channel_group"]["groups"]["Application"]["groups"][org_msp]["values"].update(anchor_peer_config) + + with open(output, 'w', encoding='utf-8') as f: + json.dump(data, f, sort_keys=False, indent=4) + + LOG.info("jq '.channel_group.groups.Application.groups.Org1MSP.values += ... ' {} -> {}".format(input, output)) + +def json_create_envelope(input, output, channel): + """ + Create a config update envelope structure + + Args: + input (str): Path to the config update JSON file + output (str): Path to save the envelope JSON + channel (str): Name of the channel + """ + try: + # Read the config update file + with open(input, 'r', encoding='utf-8') as f: + config_update = json.load(f) + + # Create the envelope structure + envelope = { + "payload": { + "header": { + "channel_header": { + "channel_id": channel, + "type": 2 + } + }, + "data": { + "config_update": config_update + } + } + } + + # Write the envelope to output file + with open(output, 'w', encoding='utf-8') as f: + json.dump(envelope, f, sort_keys=False, indent=4) + + LOG.info("echo 'payload ... ' | jq . > {}".format(output)) + + except Exception as e: + LOG.error("Failed to create config update envelope: {}".format(str(e))) + raise \ No newline at end of file From 5c844e13266c788a14ea360f1f9e4115b4718aa4 Mon Sep 17 00:00:00 2001 From: YoungHypo Date: Wed, 11 Dec 2024 11:13:48 -0800 Subject: [PATCH 3/3] delete jq import in channel views --- src/api-engine/api/routes/channel/views.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api-engine/api/routes/channel/views.py b/src/api-engine/api/routes/channel/views.py index c08d35b5e..e7db44a9b 100644 --- a/src/api-engine/api/routes/channel/views.py +++ b/src/api-engine/api/routes/channel/views.py @@ -20,7 +20,6 @@ from api.common.serializers import PageQuerySerializer from api.utils.common import with_common_response, parse_block_file, to_dict, json_filter, json_add_anchor_peer, json_create_envelope from api.lib.configtxgen import ConfigTX, ConfigTxGen -from api.lib.jq.jq import JQ from api.lib.peer.channel import Channel as PeerChannel from api.lib.configtxlator.configtxlator import ConfigTxLator from api.exceptions import (