Skip to content

Commit

Permalink
Merge pull request #661 from YoungHypo/issue-anchor_peer
Browse files Browse the repository at this point in the history
Implemented anchor peer configuration
  • Loading branch information
yeasy authored Dec 13, 2024
2 parents a177906 + 3d88da4 commit 8eef62e
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 33 deletions.
53 changes: 32 additions & 21 deletions src/api-engine/api/lib/configtxlator/configtxlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from subprocess import call, run
from api.config import FABRIC_TOOL, FABRIC_VERSION

import logging
LOG = logging.getLogger(__name__)

class ConfigTxLator:
"""
Expand All @@ -24,17 +26,21 @@ def proto_encode(self, input, type, output):
output: A file to write the output to.
"""
try:
call([self.configtxlator,
"proto_encode",
"--input={}".format(input),
"--type={}".format(type),
"--output={}".format(output),
])
command = [self.configtxlator,
"proto_encode",
"--input={}".format(input),
"--type={}".format(type),
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)
except Exception as e:
err_msg = "configtxlator proto decode fail! "
raise Exception(err_msg + str(e))

def proto_decode(self, input, type):
def proto_decode(self, input, type, output):
"""
Converts a proto message to JSON.
Expand All @@ -45,16 +51,17 @@ def proto_decode(self, input, type):
config
"""
try:
res = run([self.configtxlator,
command = [self.configtxlator,
"proto_decode",
"--type={}".format(type),
"--input={}".format(input),
],
capture_output=True)
if res.returncode == 0 :
return res.stdout
else:
return res.stderr
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)

except Exception as e:
err_msg = "configtxlator proto decode fail! "
raise Exception(err_msg + str(e))
Expand All @@ -71,13 +78,17 @@ def compute_update(self, original, updated, channel_id, output):
output: A file to write the JSON document to.
"""
try:
call([self.configtxlator,
"compute_update",
"--original={}".format(original),
"--updated={}".format(updated),
"--channel_id={}".format(channel_id),
"--output={}".format(output),
])
command = [self.configtxlator,
"compute_update",
"--original={}".format(original),
"--updated={}".format(updated),
"--channel_id={}".format(channel_id),
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)
except Exception as e:
err_msg = "configtxlator compute update fail! "
raise Exception(err_msg + str(e))
19 changes: 16 additions & 3 deletions src/api-engine/api/lib/peer/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,25 @@ def update(self, channel, channel_tx, orderer_url):
orderer_url: Ordering service endpoint.
"""
try:
res = os.system("{} channel update -c {} -f {} -o {}"
.format(self.peer, channel, channel_tx, orderer_url))
ORDERER_CA = os.getenv("ORDERER_CA")

command = [
self.peer,
"channel", "update",
"-f", channel_tx,
"-c", channel,
"-o", orderer_url,
"--ordererTLSHostnameOverride", orderer_url.split(":")[0],
"--tls",
"--cafile", ORDERER_CA
]
LOG.info(" ".join(command))

res = subprocess.run(command, check=True)

except Exception as e:
err_msg = "update channel failed for {}!".format(e)
raise Exception(err_msg)
res = res >> 8
return res

def fetch(self, block_path, channel, orderer_general_url, max_retries=5, retry_interval=1):
Expand Down
118 changes: 109 additions & 9 deletions src/api-engine/api/routes/channel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from api.config import CELLO_HOME
from api.common.serializers import PageQuerySerializer
from api.utils.common import with_common_response, parse_block_file, to_dict
from api.utils.common import with_common_response, parse_block_file, to_dict, json_filter, json_add_anchor_peer, json_create_envelope
from api.lib.configtxgen import ConfigTX, ConfigTxGen
from api.lib.peer.channel import Channel as PeerChannel
from api.lib.configtxlator.configtxlator import ConfigTxLator
Expand Down Expand Up @@ -147,7 +147,8 @@ def create(self, request):
peer_channel_join(name, peers, org)

# set anchor peer
set_anchor_peer(name, org, peers, ordering_node)
anchor_peer = Node.objects.get(id=peers[0])
set_anchor_peer(name, org, anchor_peer, ordering_node)

# save channel to db
channel = Channel(
Expand Down Expand Up @@ -425,31 +426,130 @@ def peer_channel_join(name, peers, org):
CELLO_HOME, org.network.name, name)
)

def set_anchor_peer(name, org, peers, ordering_node):
def set_anchor_peer(name, org, anchor_peer, ordering_node):
"""
Set anchor peer for the channel.
:param org: Organization object.
:param peers: list of Node objects
:param anchor_peer: Anchor peer node
:param ordering_node: Orderer node
:return: none
"""
peer_channel_fetch(name, org, peers, ordering_node)
org_msp = '{}'.format(org.name.split(".", 1)[0].capitalize())
channel_artifacts_path = "{}/{}".format(CELLO_HOME, org.network.name)

# Fetch the channel block from the orderer
peer_channel_fetch(name, org, anchor_peer, ordering_node)

# Decode block to JSON
ConfigTxLator().proto_decode(
input="{}/config_block.pb".format(channel_artifacts_path),
type="common.Block",
output="{}/config_block.json".format(channel_artifacts_path),
)

# Get the config data from the block
json_filter(
input="{}/config_block.json".format(channel_artifacts_path),
output="{}/config.json".format(channel_artifacts_path),
expression=".data.data[0].payload.data.config"
)

# add anchor peer config
anchor_peer_config = {
"AnchorPeers": {
"mod_policy": "Admins",
"value": {
"anchor_peers": [
{
"host": anchor_peer.name + "." + org.name,
"port": 7051
}
]
},
"version": 0
}
}

json_add_anchor_peer(
input="{}/config.json".format(channel_artifacts_path),
output="{}/modified_config.json".format(channel_artifacts_path),
anchor_peer_config=anchor_peer_config,
org_msp=org_msp
)

ConfigTxLator().proto_encode(
input="{}/config.json".format(channel_artifacts_path),
type="common.Config",
output="{}/config.pb".format(channel_artifacts_path),
)

ConfigTxLator().proto_encode(
input="{}/modified_config.json".format(channel_artifacts_path),
type="common.Config",
output="{}/modified_config.pb".format(channel_artifacts_path),
)

ConfigTxLator().compute_update(
original="{}/config.pb".format(channel_artifacts_path),
updated="{}/modified_config.pb".format(channel_artifacts_path),
channel_id=name,
output="{}/config_update.pb".format(channel_artifacts_path),
)

ConfigTxLator().proto_decode(
input="{}/config_update.pb".format(channel_artifacts_path),
type="common.ConfigUpdate",
output="{}/config_update.json".format(channel_artifacts_path),
)

def peer_channel_fetch(name, org, peers, ordering_node):
# Create config update envelope
json_create_envelope(
input="{}/config_update.json".format(channel_artifacts_path),
output="{}/config_update_in_envelope.json".format(channel_artifacts_path),
channel=name
)

ConfigTxLator().proto_encode(
input="{}/config_update_in_envelope.json".format(channel_artifacts_path),
type="common.Envelope",
output="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
)

# Update the channel of anchor peer
peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path)


def peer_channel_fetch(name, org, anchor_peer, ordering_node):
"""
Fetch the channel block from the orderer.
:param peers: list of Node objects
:param anchor_peer: Anchor peer node
:param org: Organization object.
:param channel_name: Name of the channel.
:return: none
"""
peer_node = Node.objects.get(id=peers[0])
envs = init_env_vars(peer_node, org)
envs = init_env_vars(anchor_peer, org)
peer_channel_cli = PeerChannel(**envs)
peer_channel_cli.fetch(block_path="{}/{}/config_block.pb".format(CELLO_HOME, org.network.name),
channel=name, orderer_general_url="{}.{}:{}".format(
ordering_node.name, org.name.split(".", 1)[1], str(7050)))

def peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path):
"""
Update the channel.
:param anchor_peer: Anchor peer node
:param org: Organization object.
:param channel_name: Name of the channel.
:return: none
"""
envs = init_env_vars(anchor_peer, org)
peer_channel_cli = PeerChannel(**envs)
peer_channel_cli.update(
channel=name,
channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
orderer_url="{}.{}:{}".format(
ordering_node.name, org.name.split(".", 1)[1], str(7050)),
)


def init_env_vars(node, org):
"""
Expand Down
109 changes: 109 additions & 0 deletions src/api-engine/api/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import uuid
from zipfile import ZipFile
from json import loads
import json
import logging

LOG = logging.getLogger(__name__)

def make_uuid():
return str(uuid.uuid4())
Expand Down Expand Up @@ -153,3 +156,109 @@ def parse_block_file(data):

def to_dict(data):
return loads(data)


def json_filter(input, output, expression):
"""
Process JSON data using path expression similar to jq
Args:
input (str): JSON data or file path to JSON
output (str): Path expression like ".data.data[0].payload.data.config"
Returns:
dict: Processed JSON data
"""
# if json_data is a file path, read the file
if isinstance(input, str):
with open(input, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = input

# parse the path expression
path_parts = expression.strip('.').split('.')
result = data

for part in path_parts:
# handle array index, like data[0]
if '[' in part and ']' in part:
array_name = part.split('[')[0]
index = int(part.split('[')[1].split(']')[0])
result = result[array_name][index]
else:
result = result[part]

with open(output, 'w', encoding='utf-8') as f:
json.dump(result, f, sort_keys=False, indent=4)

LOG.info("jq {} {} -> {}".format(expression, input, output))

def json_add_anchor_peer(input, output, anchor_peer_config, org_msp):
"""
Add anchor peer to the organization
Args:
input (str): JSON data or file path to JSON
output (str): Path expression like ".data.data[0].payload.data.config"
expression (str): Anchor peer data
"""
# if json_data is a file path, read the file
if isinstance(input, str):
with open(input, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = input

if "groups" not in data["channel_group"]:
data["channel_group"]["groups"] = {}
if "Application" not in data["channel_group"]["groups"]:
data["channel_group"]["groups"]["Application"] = {"groups": {}}
if org_msp not in data["channel_group"]["groups"]["Application"]["groups"]:
data["channel_group"]["groups"]["Application"]["groups"][org_msp] = {"values": {}}

data["channel_group"]["groups"]["Application"]["groups"][org_msp]["values"].update(anchor_peer_config)

with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, sort_keys=False, indent=4)

LOG.info("jq '.channel_group.groups.Application.groups.Org1MSP.values += ... ' {} -> {}".format(input, output))

def json_create_envelope(input, output, channel):
"""
Create a config update envelope structure
Args:
input (str): Path to the config update JSON file
output (str): Path to save the envelope JSON
channel (str): Name of the channel
"""
try:
# Read the config update file
with open(input, 'r', encoding='utf-8') as f:
config_update = json.load(f)

# Create the envelope structure
envelope = {
"payload": {
"header": {
"channel_header": {
"channel_id": channel,
"type": 2
}
},
"data": {
"config_update": config_update
}
}
}

# Write the envelope to output file
with open(output, 'w', encoding='utf-8') as f:
json.dump(envelope, f, sort_keys=False, indent=4)

LOG.info("echo 'payload ... ' | jq . > {}".format(output))

except Exception as e:
LOG.error("Failed to create config update envelope: {}".format(str(e)))
raise

0 comments on commit 8eef62e

Please sign in to comment.