Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api fixes #81

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions kol/onshape/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import string
import urllib.parse
import webbrowser
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, AsyncIterator, Literal, Mapping, cast
Expand Down Expand Up @@ -44,6 +45,26 @@ def get_url(self, base_url: str = DEFAULT_BASE_URL) -> str:
return f"{base_url}/documents/{self.document_id}/{self.item_kind}/{self.item_id}/e/{self.element_id}"


def get_access_and_secret_keys_interactively() -> tuple[str, str]:
access_key = os.environ.get("ONSHAPE_ACCESS_KEY")
secret_key = os.environ.get("ONSHAPE_SECRET_KEY")
if access_key is not None and secret_key is not None:
return access_key, secret_key
print(f"In order to use this tool, you must first set an Onshape API key here: {ONSHAPE_API_KEY_URL}")
open_web_browser = input("Would you like to open the API key portal? (y/n) ")
if open_web_browser.lower() in {"y", "yes"}:
webbrowser.open(ONSHAPE_API_KEY_URL)
while (access_key_input := input("Enter your access key: ")) == "":
continue
while (secret_key_input := input("Enter your secret key: ")) == "":
continue
print("For future use, you can set the ONSHAPE_ACCESS_KEY and ONSHAPE_SECRET_KEY environment variables.")
print("For example:")
print(f"export ONSHAPE_ACCESS_KEY={access_key_input}")
print(f"export ONSHAPE_SECRET_KEY={secret_key_input}")
return access_key_input, secret_key_input


class OnshapeClient:
def __init__(
self,
Expand All @@ -54,24 +75,8 @@ def __init__(
) -> None:
super().__init__()

if access_key is None:
if "ONSHAPE_ACCESS_KEY" not in os.environ:
raise ValueError(
"The ONSHAPE_ACCESS_KEY environment variable is not set. You must first generate an API key "
f"by navigating to the following URL: {ONSHAPE_API_KEY_URL}, then set the ONSHAPE_ACCESS_KEY "
"and ONSHAPE_SECRET_KEY environment variables."
)
access_key = os.environ["ONSHAPE_ACCESS_KEY"]

if secret_key is None:
if "ONSHAPE_SECRET_KEY" not in os.environ:
raise ValueError(
"The ONSHAPE_SECRET_KEY environment variable is not set. You must first generate an API key "
f"by navigating to the following URL: {ONSHAPE_API_KEY_URL}, then set the ONSHAPE_ACCESS_KEY "
"and ONSHAPE_SECRET_KEY environment variables."
)
secret_key = os.environ["ONSHAPE_SECRET_KEY"]

if access_key is None or secret_key is None:
access_key, secret_key = get_access_and_secret_keys_interactively()
self.access_key = access_key.encode("utf-8")
self.secret_key = secret_key.encode("utf-8")
self.base_url = base_url
Expand Down Expand Up @@ -224,7 +229,10 @@ async def request(
logger.error("Got response %d for %s, details: %s", response.status_code, path, response.text)

if response.status_code == 403:
logger.warning("Check that your authentication information is correct")
logger.warning(
"Check that your access key and secret key are correct, "
"and that the document exists and is not private."
)

else:
logger.debug("Got response %d for %s", response.status_code, path)
Expand Down
8 changes: 4 additions & 4 deletions kol/onshape/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ def from_cli_args(cls, args: Sequence[str]) -> Self:
# First, parse the document URL and output directory.
parser = argparse.ArgumentParser()
parser.add_argument("document_url", help="The URL of the OnShape document.")
parser.add_argument("output_dir", nargs="?", default=None, help="The output directory.")
parser.add_argument("-o", "--output-dir", type=str, default="robot", help="The output directory.")
parser.add_argument("-c", "--config-path", type=Path, default=None, help="The path to the config file.")
parsed_args, remaining_args = parser.parse_known_args(args)
document_url: str = parsed_args.document_url
output_dir: str = "robot" if parsed_args.output_dir is None else parsed_args.output_dir
output_dir: str = parsed_args.output_dir
config_path: Path | None = parsed_args.config_path

# Next, parses additional config arguments.
Expand Down Expand Up @@ -191,11 +191,11 @@ def from_cli_args(cls, args: Sequence[str]) -> Self:
# First, parse the document URL and output directory.
parser = argparse.ArgumentParser()
parser.add_argument("document_url", help="The URL of the OnShape document.")
parser.add_argument("output_dir", nargs="?", default=None, help="The output directory.")
parser.add_argument("-o", "--output-dir", type=str, default="robot", help="The output directory.")
parser.add_argument("-c", "--config-path", type=Path, default=None, help="The path to the config file.")
parsed_args, remaining_args = parser.parse_known_args(args)
document_url: str = parsed_args.document_url
output_dir: str = "robot" if parsed_args.output_dir is None else parsed_args.output_dir
output_dir: str = parsed_args.output_dir
config_path: Path | None = parsed_args.config_path

# Next, parses additional config arguments.
Expand Down
60 changes: 52 additions & 8 deletions kol/onshape/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ async def gather_dict(d: dict[Tk, Coroutine[Any, Any, Tv]]) -> dict[Tk, Tv]:

class FailedCheckError(ValueError):
def __init__(self, msg: str, *suggestions: str, end_msg: str | None = None) -> None:
self.original_msg = msg
self.suggestions = suggestions
self.end_msg = end_msg
full_msg = f"{msg}" + "".join(f"\n * {s}" for s in suggestions)
if end_msg is not None:
full_msg += f"\n\n{end_msg}"
Expand Down Expand Up @@ -260,10 +263,17 @@ def get_graph(
key_to_mate_feature = get_key_to_mate_feature(key_to_feature)

# Adds all the graph nodes.
for key, _ in key_to_part_instance.items():
for key, instance in key_to_part_instance.items():
if instance.suppressed:
continue
graph.add_node(key)

def add_edge_safe(node_a: Key, node_b: Key, joint: Key) -> None:
part_instance_a = key_to_part_instance[node_a]
part_instance_b = key_to_part_instance[node_b]
if part_instance_a.suppressed or part_instance_b.suppressed:
return

# Here, we are considering a graph with no ignored joints
# to make sure the original graph is connected.
for node_lhs, node_rhs in ((node_a, node_b), (node_b, node_a)):
Expand Down Expand Up @@ -351,6 +361,7 @@ def get_digraph(graph: nx.Graph, override_central_node: Key | None = None) -> tu
def get_joint_list(
digraph: nx.DiGraph,
key_to_mate_feature: dict[Key, MateFeature],
key_namer: KeyNamer,
) -> list[Joint]:
# Creates a BFS ordering of the graph.
bfs_node_ordering = list(nx.topological_sort(digraph))
Expand All @@ -362,9 +373,24 @@ def get_joint_list(
if mate_feature.suppressed:
continue

if len(mate_feature.featureData.matedEntities) == 1:
logger.warning("Mate feature %s has only one entity", mate_feature.featureData.name)
continue

lhs_entity, rhs_entity = mate_feature.featureData.matedEntities
lhs_key, rhs_key = joint_key[:-1] + lhs_entity.key, joint_key[:-1] + rhs_entity.key

if lhs_key not in node_level or rhs_key not in node_level:
lhs_name = key_namer(lhs_key, None, " : ", False)
rhs_name = key_namer(rhs_key, None, " : ", False)
logger.warning(
'Mate feature "%s" has entities "%s" and "%s" that are not connected',
mate_feature.featureData.name,
lhs_name,
rhs_name,
)
continue

lhs_is_first = node_level[lhs_key] < node_level[rhs_key]
parent_key, child_key = (lhs_key, rhs_key) if lhs_is_first else (rhs_key, lhs_key)
parent_entity, child_entity = (lhs_entity, rhs_entity) if lhs_is_first else (rhs_entity, lhs_entity)
Expand Down Expand Up @@ -490,9 +516,9 @@ async def check_part(

if mass <= 0 and config.default_part_mass is None:
raise FailedCheckError(
f'Part "{part_name}" has a mass of {mass}. All parts should have a positive mass. To fix this, '
"either assign a material to the part or manually set the part mass.",
"Note that the Onshape API returns a mass of 0 for standard parts.",
f'Part "{part_name}" has a mass of {mass}',
"All parts should have a positive mass.",
"To fix this, either assign a material to the part or manually set the part mass",
)

return part_metadata, part_mass_properties
Expand Down Expand Up @@ -660,22 +686,40 @@ async def check_document(
) from e

# Checks all the parts in the assembly.
check_part_results = await asyncio.gather(*(catch_error(check_part(part, api)) for part in assembly.parts))
check_part_results = await asyncio.gather(
*(catch_error(check_part(part, api, config=config)) for part in assembly.parts)
)
checked_part_properties, errs = zip(*check_part_results)

if any(err is not None for err in errs):
extra_msgs = {suggestion for err in errs if isinstance(err, FailedCheckError) for suggestion in err.suggestions}
raise FailedCheckError(
f"Invalid parts for document {document_info.get_url()}",
*(f"{type(err).__name__}: {err}" for err in errs if err is not None),
*(
f"{err.original_msg}" if isinstance(err, FailedCheckError) else f"{type(err).__name__}: {err}"
for err in errs
if err is not None
),
*extra_msgs,
"Note that the Onshape API returns a mass of 0 for standard parts.",
"You can manually set the part mass for these parts using the `default_part_mass` option.",
)

part_metadata = {part.key: md for part, (md, _) in zip(assembly.parts, checked_part_properties)}
part_dynamics = {part.key: dyn for part, (_, dyn) in zip(assembly.parts, checked_part_properties)}

# Checks all the joints in the assembly.
joints = get_joint_list(digraph, key_to_mate_feature)
joints = get_joint_list(digraph, key_to_mate_feature, key_namer)
joint_limits = await get_joint_limits(assembly, api)

# Checks that all the document keys have joint limits.
missing_joint_limits = [key for key, euid in key_to_euid.items() if euid not in joint_limits]
if missing_joint_limits:
raise FailedCheckError(
f"Missing joint limits for document {document_info.get_url()}",
*(f"Missing joint limits for part {key_namer(key, None)}" for key in missing_joint_limits),
)

# Checks all the mate relations in the assembly.
try:
mate_relations = await get_mate_relations(key_to_mate_relation_feature, config=config)
Expand Down Expand Up @@ -1127,7 +1171,7 @@ async def main(args: Sequence[str] | None = None) -> DownloadedDocument:
if args is None:
args = sys.argv[1:]
config = DownloadConfig.from_cli_args(args)
configure_logging(level=logging.DEBUG if config.debug else logging.INFO)
configure_logging(level=logging.DEBUG if config.debug else logging.WARN)
return await download(
document_url=config.document_url,
output_dir=config.output_dir,
Expand Down
7 changes: 6 additions & 1 deletion kol/onshape/schema/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class MimicRelation:
multiplier: float


class EmptyInstance(BaseModel):
id: str
name: str


class BaseInstance(BaseModel):
name: str
suppressed: bool
Expand Down Expand Up @@ -64,7 +69,7 @@ def euid(self) -> ElementUid:
)


Instance = AssemblyInstance | PartInstance
Instance = AssemblyInstance | PartInstance | EmptyInstance


class Occurrence(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions kol/onshape/schema/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
class FeatureType(str, Enum):
mate = "mate"
mateRelation = "mateRelation"
relation = "replicate"


class InferenceType(str, Enum):
Expand Down
Loading