Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/danswer-ai/danswer into bug…
Browse files Browse the repository at this point in the history
…fix/light_cpu
  • Loading branch information
Richard Kuo (Danswer) committed Jan 10, 2025
2 parents 9622400 + 1470b7e commit 8e25c3c
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 144 deletions.
4 changes: 3 additions & 1 deletion backend/ee/onyx/external_permissions/confluence/doc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
def _get_server_space_permissions(
confluence_client: OnyxConfluence, space_key: str
) -> ExternalAccess:
space_permissions = confluence_client.get_space_permissions(space_key=space_key)
space_permissions = confluence_client.get_all_space_permissions_server(
space_key=space_key
)

viewspace_permissions = []
for permission_category in space_permissions:
Expand Down
24 changes: 16 additions & 8 deletions backend/onyx/background/celery/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.models import Document
from onyx.db.connector_credential_pair import get_connector_credential_pair
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import TaskStatus
from onyx.db.models import TaskQueueState
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
Expand Down Expand Up @@ -41,14 +42,21 @@ def _get_deletion_status(
return None

redis_connector = RedisConnector(tenant_id, cc_pair.id)
if not redis_connector.delete.fenced:
return None

return TaskQueueState(
task_id="",
task_name=redis_connector.delete.fence_key,
status=TaskStatus.STARTED,
)
if redis_connector.delete.fenced:
return TaskQueueState(
task_id="",
task_name=redis_connector.delete.fence_key,
status=TaskStatus.STARTED,
)

if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
return TaskQueueState(
task_id="",
task_name=redis_connector.delete.fence_key,
status=TaskStatus.PENDING,
)

return None


def get_deletion_attempt_snapshot(
Expand Down
27 changes: 27 additions & 0 deletions backend/onyx/connectors/confluence/onyx_confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,33 @@ def paginated_group_members_retrieval(
group_name = quote(group_name)
yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit)

def get_all_space_permissions_server(
self,
space_key: str,
) -> list[dict[str, Any]]:
"""
This is a confluence server specific method that can be used to
fetch the permissions of a space.
This is better logging than calling the get_space_permissions method
because it returns a jsonrpc response.
"""
url = "rpc/json-rpc/confluenceservice-v2"
data = {
"jsonrpc": "2.0",
"method": "getSpacePermissionSets",
"id": 7,
"params": [space_key],
}
response = self.post(url, data=data)
logger.debug(f"jsonrpc response: {response}")
if not response.get("result"):
logger.warning(
f"No jsonrpc response for space permissions for space {space_key}"
f"\nResponse: {response}"
)

return response.get("result", [])


def _validate_connector_configuration(
credentials: dict[str, Any],
Expand Down
14 changes: 14 additions & 0 deletions backend/onyx/connectors/google_utils/google_kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from onyx.configs.constants import KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY
from onyx.connectors.google_utils.resources import get_drive_service
from onyx.connectors.google_utils.resources import get_gmail_service
from onyx.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_AUTHENTICATION_METHOD,
)
from onyx.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_SERVICE_ACCOUNT_KEY,
)
Expand All @@ -29,6 +32,9 @@
from onyx.connectors.google_utils.shared_constants import (
GOOGLE_SCOPES,
)
from onyx.connectors.google_utils.shared_constants import (
GoogleOAuthAuthenticationMethod,
)
from onyx.connectors.google_utils.shared_constants import (
MISSING_SCOPES_ERROR_STR,
)
Expand Down Expand Up @@ -96,6 +102,7 @@ def update_credential_access_tokens(
user: User,
db_session: Session,
source: DocumentSource,
auth_method: GoogleOAuthAuthenticationMethod,
) -> OAuthCredentials | None:
app_credentials = get_google_app_cred(source)
flow = InstalledAppFlow.from_client_config(
Expand All @@ -119,6 +126,7 @@ def update_credential_access_tokens(
new_creds_dict = {
DB_CREDENTIALS_DICT_TOKEN_KEY: token_json_str,
DB_CREDENTIALS_PRIMARY_ADMIN_KEY: email,
DB_CREDENTIALS_AUTHENTICATION_METHOD: auth_method.value,
}

if not update_credential_json(credential_id, new_creds_dict, user, db_session):
Expand All @@ -129,6 +137,7 @@ def update_credential_access_tokens(
def build_service_account_creds(
source: DocumentSource,
primary_admin_email: str | None = None,
name: str | None = None,
) -> CredentialBase:
service_account_key = get_service_account_key(source=source)

Expand All @@ -138,10 +147,15 @@ def build_service_account_creds(
if primary_admin_email:
credential_dict[DB_CREDENTIALS_PRIMARY_ADMIN_KEY] = primary_admin_email

credential_dict[
DB_CREDENTIALS_AUTHENTICATION_METHOD
] = GoogleOAuthAuthenticationMethod.UPLOADED.value

return CredentialBase(
credential_json=credential_dict,
admin_public=True,
source=source,
name=name,
)


Expand Down
107 changes: 29 additions & 78 deletions backend/onyx/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,12 @@ def update_cc_pair_status(
db_session: Session = Depends(get_session),
tenant_id: str | None = Depends(get_current_tenant_id),
) -> JSONResponse:
"""This method may wait up to 30 seconds if pausing the connector due to the need to
terminate tasks in progress. Tasks are not guaranteed to terminate within the
timeout.
"""This method returns nearly immediately. It simply sets some signals and
optimistically assumes any running background processes will clean themselves up.
This is done to improve the perceived end user experience.
Returns HTTPStatus.OK if everything finished.
Returns HTTPStatus.ACCEPTED if the connector is being paused, but background tasks
did not finish within the timeout.
"""
WAIT_TIMEOUT = 15.0
still_terminating = False

cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=cc_pair_id,
db_session=db_session,
Expand All @@ -188,73 +183,37 @@ def update_cc_pair_status(
detail="Connection not found for current user's permissions",
)

redis_connector = RedisConnector(tenant_id, cc_pair_id)
if status_update_request.status == ConnectorCredentialPairStatus.PAUSED:
redis_connector.stop.set_fence(True)

search_settings_list: list[SearchSettings] = get_active_search_settings(
db_session
)

redis_connector = RedisConnector(tenant_id, cc_pair_id)

try:
redis_connector.stop.set_fence(True)
while True:
logger.debug(
f"Wait for indexing soft termination starting: cc_pair={cc_pair_id}"
)
wait_succeeded = redis_connector.wait_for_indexing_termination(
search_settings_list, WAIT_TIMEOUT
)
if wait_succeeded:
logger.debug(
f"Wait for indexing soft termination succeeded: cc_pair={cc_pair_id}"
)
break

logger.debug(
"Wait for indexing soft termination timed out. "
f"Moving to hard termination: cc_pair={cc_pair_id} timeout={WAIT_TIMEOUT:.2f}"
)

for search_settings in search_settings_list:
redis_connector_index = redis_connector.new_index(
search_settings.id
)
if not redis_connector_index.fenced:
continue

index_payload = redis_connector_index.payload
if not index_payload:
continue

if not index_payload.celery_task_id:
continue

# Revoke the task to prevent it from running
primary_app.control.revoke(index_payload.celery_task_id)

# If it is running, then signaling for termination will get the
# watchdog thread to kill the spawned task
redis_connector_index.set_terminate(index_payload.celery_task_id)

logger.debug(
f"Wait for indexing hard termination starting: cc_pair={cc_pair_id}"
)
wait_succeeded = redis_connector.wait_for_indexing_termination(
search_settings_list, WAIT_TIMEOUT
)
if wait_succeeded:
logger.debug(
f"Wait for indexing hard termination succeeded: cc_pair={cc_pair_id}"
)
break

logger.debug(
f"Wait for indexing hard termination timed out: cc_pair={cc_pair_id}"
)
still_terminating = True
break
finally:
redis_connector.stop.set_fence(False)
while True:
for search_settings in search_settings_list:
redis_connector_index = redis_connector.new_index(search_settings.id)
if not redis_connector_index.fenced:
continue

index_payload = redis_connector_index.payload
if not index_payload:
continue

if not index_payload.celery_task_id:
continue

# Revoke the task to prevent it from running
primary_app.control.revoke(index_payload.celery_task_id)

# If it is running, then signaling for termination will get the
# watchdog thread to kill the spawned task
redis_connector_index.set_terminate(index_payload.celery_task_id)

break
else:
redis_connector.stop.set_fence(False)

update_connector_credential_pair_from_id(
db_session=db_session,
Expand All @@ -264,14 +223,6 @@ def update_cc_pair_status(

db_session.commit()

if still_terminating:
return JSONResponse(
status_code=HTTPStatus.ACCEPTED,
content={
"message": "Request accepted, background task termination still in progress"
},
)

return JSONResponse(
status_code=HTTPStatus.OK, content={"message": str(HTTPStatus.OK)}
)
Expand Down
50 changes: 47 additions & 3 deletions backend/onyx/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
upsert_service_account_key,
)
from onyx.connectors.google_utils.google_kv import verify_csrf
from onyx.connectors.google_utils.shared_constants import DB_CREDENTIALS_DICT_TOKEN_KEY
from onyx.connectors.google_utils.shared_constants import (
DB_CREDENTIALS_DICT_TOKEN_KEY,
GoogleOAuthAuthenticationMethod,
)
from onyx.db.connector import create_connector
from onyx.db.connector import delete_connector
Expand Down Expand Up @@ -314,6 +315,7 @@ def upsert_service_account_credential(
credential_base = build_service_account_creds(
DocumentSource.GOOGLE_DRIVE,
primary_admin_email=service_account_credential_request.google_primary_admin,
name="Service Account (uploaded)",
)
except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
Expand Down Expand Up @@ -408,6 +410,38 @@ def upload_files(
return FileUploadResponse(file_paths=deduped_file_paths)


@router.get("/admin/connector")
def get_connectors_by_credential(
_: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
credential: int | None = None,
) -> list[ConnectorSnapshot]:
"""Get a list of connectors. Allow filtering by a specific credential id."""

connectors = fetch_connectors(db_session)

filtered_connectors = []
for connector in connectors:
if connector.source == DocumentSource.INGESTION_API:
# don't include INGESTION_API, as it's a system level
# connector not manageable by the user
continue

if credential is not None:
found = False
for cc_pair in connector.credentials:
if credential == cc_pair.credential_id:
found = True
break

if not found:
continue

filtered_connectors.append(ConnectorSnapshot.from_connector_db_model(connector))

return filtered_connectors


# Retrieves most recent failure cases for connectors that are currently failing
@router.get("/admin/connector/failed-indexing-status")
def get_currently_failed_indexing_status(
Expand Down Expand Up @@ -987,7 +1021,12 @@ def gmail_callback(
credential_id = int(credential_id_cookie)
verify_csrf(credential_id, callback.state)
credentials: Credentials | None = update_credential_access_tokens(
callback.code, credential_id, user, db_session, DocumentSource.GMAIL
callback.code,
credential_id,
user,
db_session,
DocumentSource.GMAIL,
GoogleOAuthAuthenticationMethod.UPLOADED,
)
if credentials is None:
raise HTTPException(
Expand All @@ -1013,7 +1052,12 @@ def google_drive_callback(
verify_csrf(credential_id, callback.state)

credentials: Credentials | None = update_credential_access_tokens(
callback.code, credential_id, user, db_session, DocumentSource.GOOGLE_DRIVE
callback.code,
credential_id,
user,
db_session,
DocumentSource.GOOGLE_DRIVE,
GoogleOAuthAuthenticationMethod.UPLOADED,
)
if credentials is None:
raise HTTPException(
Expand Down
3 changes: 0 additions & 3 deletions backend/onyx/server/documents/credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from onyx.auth.users import current_user
from onyx.db.credentials import alter_credential
from onyx.db.credentials import cleanup_gmail_credentials
from onyx.db.credentials import cleanup_google_drive_credentials
from onyx.db.credentials import create_credential
from onyx.db.credentials import CREDENTIAL_PERMISSIONS_TO_IGNORE
from onyx.db.credentials import delete_credential
Expand Down Expand Up @@ -133,8 +132,6 @@ def create_credential_from_model(
# Temporary fix for empty Google App credentials
if credential_info.source == DocumentSource.GMAIL:
cleanup_gmail_credentials(db_session=db_session)
if credential_info.source == DocumentSource.GOOGLE_DRIVE:
cleanup_google_drive_credentials(db_session=db_session)

credential = create_credential(credential_info, user, db_session)
return ObjectCreationIdResponse(
Expand Down
Loading

0 comments on commit 8e25c3c

Please sign in to comment.