-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapp.py
42 lines (33 loc) · 1.36 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time
import sqlite3
from dotenv import load_dotenv
from src.channels.telegram import TelegramChannel
from src.agents.personal_assistant import PersonalAssistant
# Load .env variables
load_dotenv()
# Initialize sqlite3 DB for saving agent memory
conn = sqlite3.connect("db/checkpoints.sqlite", check_same_thread=False)
# Use telegram for communicating with the agent
telegram = TelegramChannel()
# Use Slack for communicating with the agent
# slack = SlackChannel()
# Initiate personal assistant
personal_assistant = PersonalAssistant(conn)
# Configuration for the Langgraph checkpoints, specifying thread ID
config = {"configurable": {"thread_id": "1"}}
def monitor_channel(after_timestamp, config):
while True:
new_messages = telegram.receive_messages(after_timestamp)
if new_messages:
for message in new_messages:
sent_message = (
f"Message: {message['text']}\n"
f"Current Date/time: {message['date']}"
)
answer = personal_assistant.invoke(sent_message, config=config)
telegram.send_message(answer)
after_timestamp = int(time.time())
time.sleep(5) # Sleep for 5 seconds before checking again
if __name__ == "__main__":
print("Personal Assistant Manager is running")
monitor_channel(int(time.time()), config)