Skip to content

Commit

Permalink
Airtable improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
Weves committed Jan 3, 2025
1 parent 2232702 commit 3b21413
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions backend/onyx/connectors/airtable/airtable_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ def load_from_state(self) -> GenerateDocumentsOutput:

table = self.airtable_client.table(self.base_id, self.table_name_or_id)
table_id = table.id
record_pages = table.iterate()
# due to https://community.airtable.com/t5/development-apis/pagination-returns-422-error/td-p/54778,
# we can't user the `iterate()` method - we need to get everything up front
# this also means we can't handle tables that won't fit in memory
records = table.all()

table_schema = table.schema()
# have to get the name from the schema, since the table object will
Expand All @@ -216,51 +219,50 @@ def load_from_state(self) -> GenerateDocumentsOutput:
break

record_documents: list[Document] = []
for page in record_pages:
for record in page:
record_id = record["id"]
fields = record["fields"]
sections: list[Section] = []
metadata: dict[str, Any] = {}

# Possibly retrieve the primary field's value
primary_field_value = (
fields.get(primary_field_name) if primary_field_name else None
)
for field_schema in table_schema.fields:
field_name = field_schema.name
field_val = fields.get(field_name)
field_type = field_schema.type

field_sections, field_metadata = self._process_field(
field_name=field_name,
field_info=field_val,
field_type=field_type,
table_id=table_id,
record_id=record_id,
)

sections.extend(field_sections)
metadata.update(field_metadata)

semantic_id = (
f"{table_name}: {primary_field_value}"
if primary_field_value
else table_name
for record in records:
record_id = record["id"]
fields = record["fields"]
sections: list[Section] = []
metadata: dict[str, Any] = {}

# Possibly retrieve the primary field's value
primary_field_value = (
fields.get(primary_field_name) if primary_field_name else None
)
for field_schema in table_schema.fields:
field_name = field_schema.name
field_val = fields.get(field_name)
field_type = field_schema.type

field_sections, field_metadata = self._process_field(
field_name=field_name,
field_info=field_val,
field_type=field_type,
table_id=table_id,
record_id=record_id,
)

record_document = Document(
id=f"airtable__{record_id}",
sections=sections,
source=DocumentSource.AIRTABLE,
semantic_identifier=semantic_id,
metadata=metadata,
)
record_documents.append(record_document)
sections.extend(field_sections)
metadata.update(field_metadata)

semantic_id = (
f"{table_name}: {primary_field_value}"
if primary_field_value
else table_name
)

record_document = Document(
id=f"airtable__{record_id}",
sections=sections,
source=DocumentSource.AIRTABLE,
semantic_identifier=semantic_id,
metadata=metadata,
)
record_documents.append(record_document)

if len(record_documents) >= self.batch_size:
yield record_documents
record_documents = []
if len(record_documents) >= self.batch_size:
yield record_documents
record_documents = []

if record_documents:
yield record_documents

0 comments on commit 3b21413

Please sign in to comment.