From a42038eac7cc22484b700628a9317efa5840d3d8 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Thu, 15 Aug 2024 12:42:24 -0400 Subject: [PATCH] Fix workflow parsing for log artifact Signed-off-by: droctothorpe Co-authored-by: quinnovator --- frontend/server/workflow-helper.ts | 77 ++++++++++++++++---------- frontend/src/pages/RunDetails.test.tsx | 4 +- frontend/src/pages/RunDetails.tsx | 2 +- 3 files changed, 50 insertions(+), 33 deletions(-) diff --git a/frontend/server/workflow-helper.ts b/frontend/server/workflow-helper.ts index 2ba253f5f5f4..0ba4460aa91a 100644 --- a/frontend/server/workflow-helper.ts +++ b/frontend/server/workflow-helper.ts @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -import path from 'path'; import { PassThrough, Stream } from 'stream'; import { ClientOptions as MinioClientOptions } from 'minio'; import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper'; @@ -19,10 +18,20 @@ import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio- export interface PartialArgoWorkflow { status: { + artifactRepositoryRef?: ArtifactRepositoryRef; nodes?: ArgoWorkflowStatusNode; }; } +export interface ArtifactRepositoryRef { + artifactRepository?: ArtifactRepository; +} + +export interface ArtifactRepository { + archiveLogs?: boolean; + s3?: S3Artifact; +} + export interface ArgoWorkflowStatusNode { [key: string]: ArgoWorkflowStatusNodeInfo; } @@ -34,9 +43,12 @@ export interface ArgoWorkflowStatusNodeInfo { } export interface ArtifactRecord { - archiveLogs?: boolean; - name: string; - s3?: S3Artifact; + name?: string; + s3: S3Key; +} + +export interface S3Key { + key: string; } export interface S3Artifact { @@ -80,6 +92,7 @@ export function composePodLogsStreamHandler( /** * Returns a stream containing the pod logs using kubernetes api. * @param podName name of the pod. + * @param createdAt YYYY-MM-DD run was created. Not used. * @param namespace namespace of the pod (uses the same namespace as the server if not provided). * @param containerName container's name of the pod, the default value is 'main'. */ @@ -91,7 +104,9 @@ export async function getPodLogsStreamFromK8s( ) { const stream = new PassThrough(); stream.end(await getPodLogs(podName, namespace, containerName)); - console.log(`Getting logs for pod:${podName} in namespace ${namespace}.`); + console.log( + `Getting logs for pod, ${podName}, in namespace, ${namespace}, by calling the Kubernetes API.`, + ); return stream; } @@ -99,6 +114,7 @@ export async function getPodLogsStreamFromK8s( * Returns a stream containing the pod logs using the information provided in the * workflow status (uses k8s api to retrieve the workflow and secrets). * @param podName name of the pod. + * @param createdAt YYYY-MM-DD run was created. Not used. * @param namespace namespace of the pod (uses the same namespace as the server if not provided). */ export const getPodLogsStreamFromWorkflow = toGetPodLogsStream( @@ -121,7 +137,7 @@ export function toGetPodLogsStream( ) { return async (podName: string, createdAt: string, namespace?: string) => { const request = await getMinioRequestConfig(podName, createdAt, namespace); - console.log(`Getting logs for pod:${podName} from ${request.bucket}/${request.key}.`); + console.log(`Getting logs for pod, ${podName}, from ${request.bucket}/${request.key}.`); return await getObjectStream(request); }; } @@ -193,33 +209,42 @@ export async function getPodLogsMinioRequestConfigfromWorkflow( podName: string, ): Promise { let workflow: PartialArgoWorkflow; + // We should probably parameterize this replace statement. It's brittle to + // changes in implementation. But brittle is better than completely broken. + let workflowName = podName.replace(/-system-container-impl-.*/, ''); try { - workflow = await getArgoWorkflow(workflowNameFromPodName(podName)); + workflow = await getArgoWorkflow(workflowName); } catch (err) { throw new Error(`Unable to retrieve workflow status: ${err}.`); } + // archiveLogs can be set globally for the workflow as a whole and / or for + // each individual task. The compiler sets it globally so we look for it in + // the global field, which is documented here: + // https://argo-workflows.readthedocs.io/en/release-3.4/fields/#workflow + if (!workflow.status.artifactRepositoryRef?.artifactRepository?.archiveLogs) { + throw new Error('Unable to retrieve logs from artifact store; archiveLogs is disabled.'); + } + let artifacts: ArtifactRecord[] | undefined; - // check if required fields are available if (workflow.status && workflow.status.nodes) { - const node = workflow.status.nodes[podName]; - if (node && node.outputs && node.outputs.artifacts) { - artifacts = node.outputs.artifacts; - } + const nodeName = podName.replace('-system-container-impl', ''); + const node = workflow.status.nodes[nodeName]; + artifacts = node?.outputs?.artifacts || undefined; } if (!artifacts) { - throw new Error('Unable to find pod info in workflow status to retrieve logs.'); + throw new Error('Unable to find corresponding log artifact in node.'); } - const archiveLogs: ArtifactRecord[] = artifacts.filter((artifact: any) => artifact.archiveLogs); - - if (archiveLogs.length === 0) { - throw new Error('Unable to find pod log archive information from workflow status.'); + const logKey = + artifacts.find((artifact: ArtifactRecord) => artifact.name === 'main-logs')?.s3.key || false; + if (!logKey) { + throw new Error('No artifact named "main-logs" for node.'); } - const s3Artifact = archiveLogs[0].s3; + const s3Artifact = workflow.status.artifactRepositoryRef.artifactRepository.s3 || false; if (!s3Artifact) { - throw new Error('Unable to find s3 artifact info from workflow status.'); + throw new Error('Unable to find artifact repository information from workflow status.'); } const { host, port } = urlSplit(s3Artifact.endpoint, s3Artifact.insecure); @@ -228,6 +253,8 @@ export async function getPodLogsMinioRequestConfigfromWorkflow( const client = await createMinioClient( { accessKey, + // TODO: endPoint must be set to 'localhost' for local development. + // Validate that the parameterization is actually working. endPoint: host, port, secretKey, @@ -238,7 +265,7 @@ export async function getPodLogsMinioRequestConfigfromWorkflow( return { bucket: s3Artifact.bucket, client, - key: s3Artifact.key, + key: logKey, }; } @@ -268,13 +295,3 @@ function urlSplit(uri: string, insecure: boolean) { } return { host: chunks[0], port: parseInt(chunks[1], 10) }; } - -/** - * Infers workflow name from pod name. - * @param podName name of the pod. - */ -function workflowNameFromPodName(podName: string) { - const chunks = podName.split('-'); - chunks.pop(); - return chunks.join('-'); -} diff --git a/frontend/src/pages/RunDetails.test.tsx b/frontend/src/pages/RunDetails.test.tsx index fa70e8917c00..0962f9c5ccd4 100644 --- a/frontend/src/pages/RunDetails.test.tsx +++ b/frontend/src/pages/RunDetails.test.tsx @@ -1165,6 +1165,7 @@ describe('RunDetails', () => { 'test-run-id', 'workflow1-template1-node1', 'ns', + '', ); expect(tree).toMatchSnapshot(); }); @@ -1205,7 +1206,6 @@ describe('RunDetails', () => { className="" > Logs can also be viewed in - { 'test-run-id', 'workflow1-template1-node1', 'username', + '', ); }); @@ -1302,7 +1303,6 @@ describe('RunDetails', () => { className="" > Logs can also be viewed in - { try { const nodeName = getNodeNameFromNodeId(this.state.workflow!, selectedNodeDetails.id); - selectedNodeDetails.logs = await Apis.getPodLogs(runId, nodeName, namespace); + selectedNodeDetails.logs = await Apis.getPodLogs(runId, nodeName, namespace, ''); } catch (err) { let errMsg = await errorToMessage(err); logsBannerMessage = 'Failed to retrieve pod logs.';