diff --git a/kol/onshape/client.py b/kol/onshape/client.py index a637d66..0e520b7 100644 --- a/kol/onshape/client.py +++ b/kol/onshape/client.py @@ -10,6 +10,7 @@ import re import string import urllib.parse +import webbrowser from contextlib import asynccontextmanager from dataclasses import dataclass from typing import Any, AsyncIterator, Literal, Mapping, cast @@ -44,6 +45,26 @@ def get_url(self, base_url: str = DEFAULT_BASE_URL) -> str: return f"{base_url}/documents/{self.document_id}/{self.item_kind}/{self.item_id}/e/{self.element_id}" +def get_access_and_secret_keys_interactively() -> tuple[str, str]: + access_key = os.environ.get("ONSHAPE_ACCESS_KEY") + secret_key = os.environ.get("ONSHAPE_SECRET_KEY") + if access_key is not None and secret_key is not None: + return access_key, secret_key + print(f"In order to use this tool, you must first set an Onshape API key here: {ONSHAPE_API_KEY_URL}") + open_web_browser = input("Would you like to open the API key portal? (y/n) ") + if open_web_browser.lower() in {"y", "yes"}: + webbrowser.open(ONSHAPE_API_KEY_URL) + while (access_key_input := input("Enter your access key: ")) == "": + continue + while (secret_key_input := input("Enter your secret key: ")) == "": + continue + print("For future use, you can set the ONSHAPE_ACCESS_KEY and ONSHAPE_SECRET_KEY environment variables.") + print("For example:") + print(f"export ONSHAPE_ACCESS_KEY={access_key_input}") + print(f"export ONSHAPE_SECRET_KEY={secret_key_input}") + return access_key_input, secret_key_input + + class OnshapeClient: def __init__( self, @@ -54,24 +75,8 @@ def __init__( ) -> None: super().__init__() - if access_key is None: - if "ONSHAPE_ACCESS_KEY" not in os.environ: - raise ValueError( - "The ONSHAPE_ACCESS_KEY environment variable is not set. You must first generate an API key " - f"by navigating to the following URL: {ONSHAPE_API_KEY_URL}, then set the ONSHAPE_ACCESS_KEY " - "and ONSHAPE_SECRET_KEY environment variables." - ) - access_key = os.environ["ONSHAPE_ACCESS_KEY"] - - if secret_key is None: - if "ONSHAPE_SECRET_KEY" not in os.environ: - raise ValueError( - "The ONSHAPE_SECRET_KEY environment variable is not set. You must first generate an API key " - f"by navigating to the following URL: {ONSHAPE_API_KEY_URL}, then set the ONSHAPE_ACCESS_KEY " - "and ONSHAPE_SECRET_KEY environment variables." - ) - secret_key = os.environ["ONSHAPE_SECRET_KEY"] - + if access_key is None or secret_key is None: + access_key, secret_key = get_access_and_secret_keys_interactively() self.access_key = access_key.encode("utf-8") self.secret_key = secret_key.encode("utf-8") self.base_url = base_url @@ -224,7 +229,10 @@ async def request( logger.error("Got response %d for %s, details: %s", response.status_code, path, response.text) if response.status_code == 403: - logger.warning("Check that your authentication information is correct") + logger.warning( + "Check that your access key and secret key are correct, " + "and that the document exists and is not private." + ) else: logger.debug("Got response %d for %s", response.status_code, path) diff --git a/kol/onshape/config.py b/kol/onshape/config.py index d325051..05423e7 100644 --- a/kol/onshape/config.py +++ b/kol/onshape/config.py @@ -107,11 +107,11 @@ def from_cli_args(cls, args: Sequence[str]) -> Self: # First, parse the document URL and output directory. parser = argparse.ArgumentParser() parser.add_argument("document_url", help="The URL of the OnShape document.") - parser.add_argument("output_dir", nargs="?", default=None, help="The output directory.") + parser.add_argument("-o", "--output-dir", type=str, default="robot", help="The output directory.") parser.add_argument("-c", "--config-path", type=Path, default=None, help="The path to the config file.") parsed_args, remaining_args = parser.parse_known_args(args) document_url: str = parsed_args.document_url - output_dir: str = "robot" if parsed_args.output_dir is None else parsed_args.output_dir + output_dir: str = parsed_args.output_dir config_path: Path | None = parsed_args.config_path # Next, parses additional config arguments. @@ -191,11 +191,11 @@ def from_cli_args(cls, args: Sequence[str]) -> Self: # First, parse the document URL and output directory. parser = argparse.ArgumentParser() parser.add_argument("document_url", help="The URL of the OnShape document.") - parser.add_argument("output_dir", nargs="?", default=None, help="The output directory.") + parser.add_argument("-o", "--output-dir", type=str, default="robot", help="The output directory.") parser.add_argument("-c", "--config-path", type=Path, default=None, help="The path to the config file.") parsed_args, remaining_args = parser.parse_known_args(args) document_url: str = parsed_args.document_url - output_dir: str = "robot" if parsed_args.output_dir is None else parsed_args.output_dir + output_dir: str = parsed_args.output_dir config_path: Path | None = parsed_args.config_path # Next, parses additional config arguments. diff --git a/kol/onshape/download.py b/kol/onshape/download.py index d53bb66..58c84fe 100644 --- a/kol/onshape/download.py +++ b/kol/onshape/download.py @@ -86,6 +86,9 @@ async def gather_dict(d: dict[Tk, Coroutine[Any, Any, Tv]]) -> dict[Tk, Tv]: class FailedCheckError(ValueError): def __init__(self, msg: str, *suggestions: str, end_msg: str | None = None) -> None: + self.original_msg = msg + self.suggestions = suggestions + self.end_msg = end_msg full_msg = f"{msg}" + "".join(f"\n * {s}" for s in suggestions) if end_msg is not None: full_msg += f"\n\n{end_msg}" @@ -260,10 +263,17 @@ def get_graph( key_to_mate_feature = get_key_to_mate_feature(key_to_feature) # Adds all the graph nodes. - for key, _ in key_to_part_instance.items(): + for key, instance in key_to_part_instance.items(): + if instance.suppressed: + continue graph.add_node(key) def add_edge_safe(node_a: Key, node_b: Key, joint: Key) -> None: + part_instance_a = key_to_part_instance[node_a] + part_instance_b = key_to_part_instance[node_b] + if part_instance_a.suppressed or part_instance_b.suppressed: + return + # Here, we are considering a graph with no ignored joints # to make sure the original graph is connected. for node_lhs, node_rhs in ((node_a, node_b), (node_b, node_a)): @@ -351,6 +361,7 @@ def get_digraph(graph: nx.Graph, override_central_node: Key | None = None) -> tu def get_joint_list( digraph: nx.DiGraph, key_to_mate_feature: dict[Key, MateFeature], + key_namer: KeyNamer, ) -> list[Joint]: # Creates a BFS ordering of the graph. bfs_node_ordering = list(nx.topological_sort(digraph)) @@ -362,9 +373,24 @@ def get_joint_list( if mate_feature.suppressed: continue + if len(mate_feature.featureData.matedEntities) == 1: + logger.warning("Mate feature %s has only one entity", mate_feature.featureData.name) + continue + lhs_entity, rhs_entity = mate_feature.featureData.matedEntities lhs_key, rhs_key = joint_key[:-1] + lhs_entity.key, joint_key[:-1] + rhs_entity.key + if lhs_key not in node_level or rhs_key not in node_level: + lhs_name = key_namer(lhs_key, None, " : ", False) + rhs_name = key_namer(rhs_key, None, " : ", False) + logger.warning( + 'Mate feature "%s" has entities "%s" and "%s" that are not connected', + mate_feature.featureData.name, + lhs_name, + rhs_name, + ) + continue + lhs_is_first = node_level[lhs_key] < node_level[rhs_key] parent_key, child_key = (lhs_key, rhs_key) if lhs_is_first else (rhs_key, lhs_key) parent_entity, child_entity = (lhs_entity, rhs_entity) if lhs_is_first else (rhs_entity, lhs_entity) @@ -490,9 +516,9 @@ async def check_part( if mass <= 0 and config.default_part_mass is None: raise FailedCheckError( - f'Part "{part_name}" has a mass of {mass}. All parts should have a positive mass. To fix this, ' - "either assign a material to the part or manually set the part mass.", - "Note that the Onshape API returns a mass of 0 for standard parts.", + f'Part "{part_name}" has a mass of {mass}', + "All parts should have a positive mass.", + "To fix this, either assign a material to the part or manually set the part mass", ) return part_metadata, part_mass_properties @@ -660,22 +686,40 @@ async def check_document( ) from e # Checks all the parts in the assembly. - check_part_results = await asyncio.gather(*(catch_error(check_part(part, api)) for part in assembly.parts)) + check_part_results = await asyncio.gather( + *(catch_error(check_part(part, api, config=config)) for part in assembly.parts) + ) checked_part_properties, errs = zip(*check_part_results) if any(err is not None for err in errs): + extra_msgs = {suggestion for err in errs if isinstance(err, FailedCheckError) for suggestion in err.suggestions} raise FailedCheckError( f"Invalid parts for document {document_info.get_url()}", - *(f"{type(err).__name__}: {err}" for err in errs if err is not None), + *( + f"{err.original_msg}" if isinstance(err, FailedCheckError) else f"{type(err).__name__}: {err}" + for err in errs + if err is not None + ), + *extra_msgs, + "Note that the Onshape API returns a mass of 0 for standard parts.", + "You can manually set the part mass for these parts using the `default_part_mass` option.", ) part_metadata = {part.key: md for part, (md, _) in zip(assembly.parts, checked_part_properties)} part_dynamics = {part.key: dyn for part, (_, dyn) in zip(assembly.parts, checked_part_properties)} # Checks all the joints in the assembly. - joints = get_joint_list(digraph, key_to_mate_feature) + joints = get_joint_list(digraph, key_to_mate_feature, key_namer) joint_limits = await get_joint_limits(assembly, api) + # Checks that all the document keys have joint limits. + missing_joint_limits = [key for key, euid in key_to_euid.items() if euid not in joint_limits] + if missing_joint_limits: + raise FailedCheckError( + f"Missing joint limits for document {document_info.get_url()}", + *(f"Missing joint limits for part {key_namer(key, None)}" for key in missing_joint_limits), + ) + # Checks all the mate relations in the assembly. try: mate_relations = await get_mate_relations(key_to_mate_relation_feature, config=config) @@ -1127,7 +1171,7 @@ async def main(args: Sequence[str] | None = None) -> DownloadedDocument: if args is None: args = sys.argv[1:] config = DownloadConfig.from_cli_args(args) - configure_logging(level=logging.DEBUG if config.debug else logging.INFO) + configure_logging(level=logging.DEBUG if config.debug else logging.WARN) return await download( document_url=config.document_url, output_dir=config.output_dir, diff --git a/kol/onshape/schema/assembly.py b/kol/onshape/schema/assembly.py index 08f4ef6..5bca4e4 100644 --- a/kol/onshape/schema/assembly.py +++ b/kol/onshape/schema/assembly.py @@ -23,6 +23,11 @@ class MimicRelation: multiplier: float +class EmptyInstance(BaseModel): + id: str + name: str + + class BaseInstance(BaseModel): name: str suppressed: bool @@ -64,7 +69,7 @@ def euid(self) -> ElementUid: ) -Instance = AssemblyInstance | PartInstance +Instance = AssemblyInstance | PartInstance | EmptyInstance class Occurrence(BaseModel): diff --git a/kol/onshape/schema/features.py b/kol/onshape/schema/features.py index dba8b10..89775fe 100644 --- a/kol/onshape/schema/features.py +++ b/kol/onshape/schema/features.py @@ -11,6 +11,7 @@ class FeatureType(str, Enum): mate = "mate" mateRelation = "mateRelation" + relation = "replicate" class InferenceType(str, Enum):