Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use http.client instead of urllib3 #12660

Merged
merged 17 commits into from
Nov 1, 2023
2 changes: 1 addition & 1 deletion libs/cli/langchain_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from langchain_cli.namespaces import app as app_namespace
from langchain_cli.namespaces import template as template_namespace

__version__ = "0.0.10"
__version__ = "0.0.11"

app = typer.Typer(no_args_is_help=True, add_completion=False)
app.add_typer(
Expand Down
68 changes: 28 additions & 40 deletions libs/cli/langchain_cli/utils/events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import http.client
import json
from typing import Any, Dict, List, Optional, TypedDict

import urllib3

WRITE_KEY = "310apTK0HUFl4AOv"


Expand All @@ -11,43 +10,32 @@ class EventDict(TypedDict):
properties: Optional[Dict[str, Any]]


def create_event(event: EventDict) -> None:
"""
Creates a new event with the given type and payload.
"""
data = {
"write_key": WRITE_KEY,
"event": event["event"],
"properties": event.get("properties"),
}
try:
urllib3.request(
"POST",
"https://app.firstpartyhq.com/events/v1/track",
body=json.dumps(data),
headers={"Content-Type": "application/json"},
)
except Exception:
pass


def create_events(events: List[EventDict]) -> None:
data = {
"events": [
{
"write_key": WRITE_KEY,
"event": event["event"],
"properties": event.get("properties"),
}
for event in events
]
}
def create_events(events: List[EventDict]) -> Optional[Any]:
try:
urllib3.request(
"POST",
"https://app.firstpartyhq.com/events/v1/track/bulk",
body=json.dumps(data),
headers={"Content-Type": "application/json"},
)
data = {
"events": [
{
"write_key": WRITE_KEY,
"name": event["event"],
"properties": event.get("properties"),
}
for event in events
]
}

conn = http.client.HTTPSConnection("app.firstpartyhq.com")

payload = json.dumps(data)

headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}

conn.request("POST", "/events/v1/track/bulk", payload, headers)

res = conn.getresponse()

return json.loads(res.read())
except Exception:
pass
return None
6 changes: 3 additions & 3 deletions libs/cli/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/cli/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langchain-cli"
version = "0.0.10"
version = "0.0.11"
description = "CLI for interacting with LangChain"
authors = ["Erick Friis <erick@langchain.dev>"]
readme = "README.md"
Expand Down
7 changes: 7 additions & 0 deletions libs/cli/tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from langchain_cli.utils.events import EventDict, create_events


def test_create_events() -> None:
assert create_events(
[EventDict(event="Test Event", properties={"test": "test"})]
) == {"status": "success"}
Loading