Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(example): search agent example #647

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions python/examples/21-search/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
This example consists of the following agents:

* charger_search_agents: An agent offering a search protocol to run detailed queries regarding EV charging stations that go beyond the capability of the search api. There is both a sync and an async implementation of the agent to compare implementational differences for the same solution.
* searching_agent: A user agent showcasing how to execute a search with the search agent.
* hit_agent: An agent satisfying the general parameters and the specific attributes
* miss_agent: An agent satisfying the general parameters but not the specific attributes

How to run the example:

1. Start the charger_search_agent, hit_agent and miss_agent
2. Put the search_agents's address as `SEARCH_AGENT = ...` in searching_agent
2. (?) Wait a couple of minutes to let the information propagate through agentverse etc.
3. Run the searching agent
4. It should only get the hit_agent's address as a result

```
title General Search Agent Flow

participant "User Agent" as U
participant "Search Agent" as S
participant "Search Engine" as E
participant "Hit Agent" as RP
participant "Miss Agent" as RN

note over RP, RN: Assuming all agents representing \nthe desired information / entities \nare supporting the same protocol

U->S: search for agents with filters
note over S: check cache here or only after search engine?
S->E: query general agent group
S<-E: list of agents
loop list of agents
S->RP: query attributes
S->RN: query attributes
end
S<-RP: return attributes
S<-RN: return empty list
loop responses
S->S: add to cache
alt attribute match
S->S: add to result list
end
end
U<-S: result list
U->RP: consume
```
137 changes: 137 additions & 0 deletions python/examples/21-search/charger_search_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import uuid
from datetime import datetime, timedelta
from logging import DEBUG

from protocol.search_protocol import (
AttributeQuery,
AttributeResponse,
SearchRequest,
SearchResponse,
)

# from fetchai import fetch
from uagents import Agent, Context, Protocol
from uagents.experimental import search

# search scenarios (beyond basic search engine search)
# find agents where attribute == value
# find agents where attribute in range(min, max) / [val_1, ..., val_n]
# find agent where attribute is highest/lowest limit num_of_choices

MY_TIMEOUT = 10

agent = Agent(
name="Search Agent for EV Chargers",
# seed = "<put unique seed>",
port=8100,
endpoint="http://localhost:8100/submit",
agentverse="https://staging.agentverse.ai",
log_level=DEBUG,
)

search_proto = Protocol(name="agent-search-protocol", version="0.1.0")


@search_proto.on_message(SearchRequest)
async def handle_search_request(ctx: Context, sender: str, msg: SearchRequest):
ctx.logger.info(f"Received search request from {sender}")
# create a new session to eventually link incoming responses to the original caller
search_session = uuid.uuid4()
storage_dict = {
str(search_session): {
"searcher": sender,
"timeout": str(datetime.now() + timedelta(seconds=MY_TIMEOUT)),
"request": msg.model_dump_json(),
"responses": {},
}
}
sessions = ctx.storage.get("sessions") or {}
sessions.update(storage_dict)
ctx.storage.set("sessions", sessions)

ctx.logger.info(f"starting new session {search_session} for agent {sender}")

agents = search.geosearch_agents_by_text(
latitude=msg.geolocation.latitude,
longitude=msg.geolocation.longitude,
radius=msg.geolocation.radius,
search_text=msg.query,
)

query = AttributeQuery(
search_id=search_session, attributes=list(msg.attribute_filter)
)

for agent in agents:
ctx.logger.info(f"querying agent: {agent.address}")
await ctx.send(agent.address, query)

## broadcast currently doesn't allow for additional filters, so it might reach agents
## that wouldn't be returned by the search engine (e.g., outside of geo search area)
# ctx.broadcast(msg.protocol_digest, query)
## this could be a convenience function to wrap the above loop internally
# ctx.broadcast(msg.protocol_digest, query, msg.query, msg.geolocation, msg.tags)


@agent.on_message(AttributeResponse)
async def handle_attribute_response(ctx: Context, sender: str, msg: AttributeResponse):
"""
Collect responses and map them to the corresponding search sessions so they can be handled
in bulk after session timeout
"""
sessions = ctx.storage.get("sessions")
if msg.attributes is None or len(msg.attributes) == 0:
ctx.logger.info(f"no matching attribute from agent {sender[:8]}...")
return

if not msg.search_id or str(msg.search_id) not in sessions:
ctx.logger.info("AttributeResponse for unknown search_id")
return

# if the agent features the queried attribute, include it in the responses
ctx.logger.info(f"Got a matching response from agent {sender[:8]}...")
responses = sessions[str(msg.search_id)]["responses"]
response = {sender: msg.attributes}
responses.update(response)
ctx.storage.set("sessions", sessions)


@agent.on_interval(6)
async def check_queries(ctx: Context):
"""Check for each active session, if timeout is reached. If so finish the corresponding query"""
sessions_sync = ctx.storage.get("sessions") or {}
sessions = sessions_sync.copy()
ctx.logger.info(f"active sessions: {len(sessions)}")
if len(sessions) > 0:
for session, session_info in sessions.items():
if datetime.fromisoformat(session_info["timeout"]) < datetime.now():
ctx.logger.info(f"finish session {session}")
await finish_query(ctx, session)


async def finish_query(ctx: Context, session: str):
"""
Collect all responses and only return those agents that match the search terms of the
initial request
"""
sessions = ctx.storage.get("sessions")
session_info = sessions.pop(session)
ctx.storage.set("sessions", sessions)
matching_agents = []
# evaluate all replies given the initial query
for agent, attributes in session_info["responses"].items():
# parse into model for better handling
search_request = SearchRequest.model_validate_json(session_info["request"])
# for a first test simply check attribute equality
for attribute, value in search_request.attribute_filter.items():
if attributes[attribute] == value:
matching_agents.append(agent)
await ctx.send(
session_info["searcher"], SearchResponse(result_list=matching_agents)
)


agent.include(search_proto)

if __name__ == "__main__":
agent.run()
91 changes: 91 additions & 0 deletions python/examples/21-search/charger_search_agent_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import uuid
from logging import DEBUG

from protocol.search_protocol import (
AttributeQuery,
AttributeResponse,
SearchRequest,
SearchResponse,
)

# from fetchai import fetch
from uagents import Agent, Context, Protocol
from uagents.experimental import search

# search scenarios (beyond basic search engine search)
# find agents where attribute == value
# find agents where attribute in range(min, max) / [val_1, ..., val_n]
# find agent where attribute is highest/lowest limit num_of_choices

MY_TIMEOUT = 10

agent = Agent(
name="Search Agent for EV Chargers sync",
# seed = "<put unique seed>",
port=8099,
endpoint="http://localhost:8099/submit",
agentverse="https://staging.agentverse.ai",
log_level=DEBUG,
)

search_proto = Protocol(name="agent-search-protocol", version="0.1.0")


@search_proto.on_message(SearchRequest)
async def handle_search_request(ctx: Context, sender: str, msg: SearchRequest):
ctx.logger.info(f"Received search request from {sender}")
# create a new session to eventually link incoming responses to the original caller
search_session = uuid.uuid4()
ctx.logger.info(f"starting new session {search_session} for agent {sender}")

agents = search.geosearch_agents_by_text(
latitude=msg.geolocation.latitude,
longitude=msg.geolocation.longitude,
radius=msg.geolocation.radius,
search_text=msg.query,
)

query = AttributeQuery(
search_id=search_session, attributes=list(msg.attribute_filter)
)

responses = {}
for agent in agents:
ctx.logger.info(f"querying agent: {agent.address}")
reply: AttributeResponse
reply, status = await ctx.send_and_receive(
agent.address,
query,
response_type=AttributeResponse,
timeout=MY_TIMEOUT,
)
if not isinstance(reply, AttributeResponse):
ctx.logger.info(f"Received unexpected response from {agent.address[:8]}..")
continue

if reply.attributes is None or len(reply.attributes) == 0:
ctx.logger.info(f"no matching attribute from agent {agent.address[:8]}...")
continue

if not reply.search_id or reply.search_id != search_session:
ctx.logger.info("AttributeResponse for unknown search_id")
continue

# if the agent features the queried attribute, include it in the responses
ctx.logger.info(f"Got a matching response from agent {agent.address[:8]}...")
responses.update({agent.address: reply.attributes})

matching_agents = []
# evaluate all replies given the initial query
for agent, attributes in responses.items():
# for a first test simply check attribute equality
for attribute, value in msg.attribute_filter.items():
if attributes[attribute] == value:
matching_agents.append(agent)
await ctx.send(sender, SearchResponse(result_list=matching_agents))


agent.include(search_proto)

if __name__ == "__main__":
agent.run()
35 changes: 35 additions & 0 deletions python/examples/21-search/hit_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from protocol.search_protocol import AttributeQuery, AttributeResponse

from uagents import Context
from uagents.experimental.mobility import MobilityAgent
from uagents.types import AgentGeolocation

SEARCH_AGENT = ""
MY_ATTRIBUTES = {"charging": "yes", "food": "no", "colour": "blue"}
# TODO add agentverse registration so readme can get uploaded
agent = MobilityAgent(
name="uniquechargingquery hit",
port=8102,
endpoint="http://localhost:8102/submit",
location=AgentGeolocation(latitude=48.76, longitude=9.12, radius=2),
mobility_type="vehicle", # completely irrelevant for demo
static_signal="charger",
)


@agent.on_message(AttributeQuery)
async def handle_search_response(ctx: Context, sender: str, msg: AttributeQuery):
attribute_values = {}
for attribute in msg.attributes:
if attribute in MY_ATTRIBUTES:
ctx.logger.info(f"query contains matching attribute: {attribute}")
attribute_values.update({attribute: MY_ATTRIBUTES[attribute]})
else:
ctx.logger.info(f"query contains unknown attribute: {attribute}")
await ctx.send(
sender, AttributeResponse(search_id=msg.search_id, attributes=attribute_values)
)


if __name__ == "__main__":
agent.run()
36 changes: 36 additions & 0 deletions python/examples/21-search/miss_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from protocol.search_protocol import AttributeQuery, AttributeResponse

from uagents import Context
from uagents.experimental.mobility import MobilityAgent
from uagents.types import AgentGeolocation

SEARCH_AGENT = ""
MY_ATTRIBUTES = {"charging": "no", "food": "yes", "colour": "blue"}

agent = MobilityAgent(
name="uniquechargingquery miss",
port=8103,
endpoint="http://localhost:8103/submit",
location=AgentGeolocation(latitude=48.77, longitude=9.11, radius=2),
mobility_type="vehicle", # completely irrelevant for demo
static_signal="food truck",
)


@agent.on_message(AttributeQuery)
async def handle_search_response(ctx: Context, sender: str, msg: AttributeQuery):
attribute_values = {}
for attribute in msg.attributes:
if attribute in MY_ATTRIBUTES:
ctx.logger.info(f"query contains matching attribute: {attribute}")
attribute_values.update({attribute: MY_ATTRIBUTES[attribute]})
else:
ctx.logger.info(f"query contains unknown attribute: {attribute}")
await ctx.send(
sender,
AttributeResponse(search_id=msg.search_id, attributes=attribute_values),
)


if __name__ == "__main__":
agent.run()
Loading