Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airtable improvement #3583

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 45 additions & 43 deletions backend/onyx/connectors/airtable/airtable_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ def load_from_state(self) -> GenerateDocumentsOutput:

table = self.airtable_client.table(self.base_id, self.table_name_or_id)
table_id = table.id
record_pages = table.iterate()
# due to https://community.airtable.com/t5/development-apis/pagination-returns-422-error/td-p/54778,
# we can't user the `iterate()` method - we need to get everything up front
# this also means we can't handle tables that won't fit in memory
records = table.all()

table_schema = table.schema()
# have to get the name from the schema, since the table object will
Expand All @@ -216,51 +219,50 @@ def load_from_state(self) -> GenerateDocumentsOutput:
break

record_documents: list[Document] = []
for page in record_pages:
for record in page:
record_id = record["id"]
fields = record["fields"]
sections: list[Section] = []
metadata: dict[str, Any] = {}

# Possibly retrieve the primary field's value
primary_field_value = (
fields.get(primary_field_name) if primary_field_name else None
)
for field_schema in table_schema.fields:
field_name = field_schema.name
field_val = fields.get(field_name)
field_type = field_schema.type

field_sections, field_metadata = self._process_field(
field_name=field_name,
field_info=field_val,
field_type=field_type,
table_id=table_id,
record_id=record_id,
)

sections.extend(field_sections)
metadata.update(field_metadata)

semantic_id = (
f"{table_name}: {primary_field_value}"
if primary_field_value
else table_name
for record in records:
record_id = record["id"]
fields = record["fields"]
sections: list[Section] = []
metadata: dict[str, Any] = {}

# Possibly retrieve the primary field's value
primary_field_value = (
fields.get(primary_field_name) if primary_field_name else None
)
for field_schema in table_schema.fields:
field_name = field_schema.name
field_val = fields.get(field_name)
field_type = field_schema.type

field_sections, field_metadata = self._process_field(
field_name=field_name,
field_info=field_val,
field_type=field_type,
table_id=table_id,
record_id=record_id,
)

record_document = Document(
id=f"airtable__{record_id}",
sections=sections,
source=DocumentSource.AIRTABLE,
semantic_identifier=semantic_id,
metadata=metadata,
)
record_documents.append(record_document)
sections.extend(field_sections)
metadata.update(field_metadata)

semantic_id = (
f"{table_name}: {primary_field_value}"
if primary_field_value
else table_name
)

record_document = Document(
id=f"airtable__{record_id}",
sections=sections,
source=DocumentSource.AIRTABLE,
semantic_identifier=semantic_id,
metadata=metadata,
)
record_documents.append(record_document)

if len(record_documents) >= self.batch_size:
yield record_documents
record_documents = []
if len(record_documents) >= self.batch_size:
yield record_documents
record_documents = []

if record_documents:
yield record_documents
Loading