Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelOwenDyer committed Oct 15, 2024
1 parent a5167be commit fb007c3
Show file tree
Hide file tree
Showing 28 changed files with 427 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import de.tum.cit.aet.artemis.atlas.domain.competency.CompetencyTaxonomy;
import de.tum.cit.aet.artemis.core.domain.Course;
import de.tum.cit.aet.artemis.core.domain.User;
import de.tum.cit.aet.artemis.iris.service.pyris.PyrisJobService;
import de.tum.cit.aet.artemis.iris.service.pyris.PyrisPipelineService;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.competency.PyrisCompetencyExtractionPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.competency.PyrisCompetencyRecommendationDTO;
Expand All @@ -27,12 +26,9 @@ public class IrisCompetencyGenerationService {

private final IrisWebsocketService websocketService;

private final PyrisJobService pyrisJobService;

public IrisCompetencyGenerationService(PyrisPipelineService pyrisPipelineService, IrisWebsocketService websocketService, PyrisJobService pyrisJobService) {
public IrisCompetencyGenerationService(PyrisPipelineService pyrisPipelineService, IrisWebsocketService websocketService) {
this.pyrisPipelineService = pyrisPipelineService;
this.websocketService = websocketService;
this.pyrisJobService = pyrisJobService;
}

/**
Expand All @@ -48,7 +44,7 @@ public void executeCompetencyExtractionPipeline(User user, Course course, String
pyrisPipelineService.executePipeline(
"competency-extraction",
"default",
pyrisJobService.createTokenForJob(token -> new CompetencyExtractionJob(token, course.getId(), user.getLogin())),
jobId -> new CompetencyExtractionJob(jobId, course.getId(), user.getLogin()),
executionDto -> new PyrisCompetencyExtractionPipelineExecutionDTO(executionDto, courseDescription, currentCompetencies, CompetencyTaxonomy.values(), 5),
stages -> websocketService.send(user.getLogin(), websocketTopic(course.getId()), new PyrisCompetencyStatusUpdateDTO(stages, null))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import de.tum.cit.aet.artemis.core.exception.AccessForbiddenException;
import de.tum.cit.aet.artemis.core.exception.ConflictException;
import de.tum.cit.aet.artemis.iris.service.pyris.job.CourseChatJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.ExerciseChatJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.IngestionWebhookJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.PyrisJob;

Expand Down Expand Up @@ -69,27 +67,13 @@ public void init() {
* @param tokenToJobFunction the function to run with the token
* @return the generated token
*/
public String createTokenForJob(Function<String, PyrisJob> tokenToJobFunction) {
public String registerJob(Function<String, PyrisJob> tokenToJobFunction) {
var token = generateJobIdToken();
var job = tokenToJobFunction.apply(token);
jobMap.put(token, job);
return token;
}

public String addExerciseChatJob(Long courseId, Long exerciseId, Long sessionId) {
var token = generateJobIdToken();
var job = new ExerciseChatJob(token, courseId, exerciseId, sessionId);
jobMap.put(token, job);
return token;
}

public String addCourseChatJob(Long courseId, Long sessionId) {
var token = generateJobIdToken();
var job = new CourseChatJob(token, courseId, sessionId);
jobMap.put(token, job);
return token;
}

/**
* Adds a new ingestion webhook job to the job map with a timeout.
*
Expand Down Expand Up @@ -129,7 +113,7 @@ public PyrisJob getJob(String token) {
* 2. Retrieves the PyrisJob object associated with the provided token.
* 3. Throws an AccessForbiddenException if the token is invalid or not provided.
* <p>
* The token was previously generated via {@link #createTokenForJob(Function)}
* The token was previously generated via {@link #registerJob(Function)}
*
* @param request the HttpServletRequest object representing the incoming request
* @param jobClass the class of the PyrisJob object to cast the retrieved job to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@

import static de.tum.cit.aet.artemis.core.config.Constants.PROFILE_IRIS;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -17,27 +12,11 @@
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import de.tum.cit.aet.artemis.atlas.domain.competency.CompetencyJol;
import de.tum.cit.aet.artemis.atlas.dto.CompetencyJolDTO;
import de.tum.cit.aet.artemis.core.domain.Course;
import de.tum.cit.aet.artemis.core.repository.CourseRepository;
import de.tum.cit.aet.artemis.exercise.domain.participation.StudentParticipation;
import de.tum.cit.aet.artemis.exercise.repository.StudentParticipationRepository;
import de.tum.cit.aet.artemis.exercise.service.LearningMetricsService;
import de.tum.cit.aet.artemis.iris.domain.session.IrisCourseChatSession;
import de.tum.cit.aet.artemis.iris.domain.session.IrisExerciseChatSession;
import de.tum.cit.aet.artemis.iris.exception.IrisException;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.PyrisPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.PyrisPipelineExecutionSettingsDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.chat.course.PyrisCourseChatPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.chat.exercise.PyrisExerciseChatPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisCourseDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisExtendedCourseDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisUserDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.status.PyrisStageDTO;
import de.tum.cit.aet.artemis.iris.service.websocket.IrisChatWebsocketService;
import de.tum.cit.aet.artemis.programming.domain.ProgrammingExercise;
import de.tum.cit.aet.artemis.programming.domain.ProgrammingSubmission;
import de.tum.cit.aet.artemis.iris.service.pyris.job.PyrisJob;

/**
* Service responsible for executing the various Pyris pipelines in a type-safe manner.
Expand All @@ -53,34 +32,18 @@ public class PyrisPipelineService {

private final PyrisJobService pyrisJobService;

private final PyrisDTOService pyrisDTOService;

private final IrisChatWebsocketService irisChatWebsocketService;

private final CourseRepository courseRepository;

private final StudentParticipationRepository studentParticipationRepository;

private final LearningMetricsService learningMetricsService;

@Value("${server.url}")
private String artemisBaseUrl;

public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJobService pyrisJobService, PyrisDTOService pyrisDTOService,
IrisChatWebsocketService irisChatWebsocketService, CourseRepository courseRepository, LearningMetricsService learningMetricsService,
StudentParticipationRepository studentParticipationRepository) {
public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJobService pyrisJobService) {
this.pyrisConnectorService = pyrisConnectorService;
this.pyrisJobService = pyrisJobService;
this.pyrisDTOService = pyrisDTOService;
this.irisChatWebsocketService = irisChatWebsocketService;
this.courseRepository = courseRepository;
this.learningMetricsService = learningMetricsService;
this.studentParticipationRepository = studentParticipationRepository;
}

/**
* Executes a pipeline on Pyris, identified by the given name and variant.
* The pipeline execution is tracked by a unique job token, which must be provided by the caller.
* The pipeline execution is tracked by a unique job token, which is created for each new job automatically.
* The caller must provide a mapper function to take this job token and produce a {@code PyrisJob} object to be registered.
* The caller must additionally provide a mapper function to create the concrete DTO type for this pipeline from the base DTO.
* The status of the pipeline execution is updated via a consumer that accepts a list of stages. This method will
* call the consumer with the initial stages of the pipeline execution. Later stages will be sent back from Pyris,
Expand All @@ -89,11 +52,12 @@ public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJo
*
* @param name the name of the pipeline to be executed
* @param variant the variant of the pipeline
* @param jobToken a unique job token for tracking the pipeline execution
* @param dtoMapper a function to create the concrete DTO type for this pipeline from the base DTO
* @param statusUpdater a consumer to update the status of the pipeline execution
* @param jobFunction a function from job ID to job. Creates a new {@code PyrisJob} which will be registered in Hazelcast
* @param dtoMapper a function to create the concrete DTO type for this pipeline from the base execution DTO
* @param statusUpdater a consumer of stages to send status updates while the pipeline is being prepared
*/
public void executePipeline(String name, String variant, String jobToken, Function<PyrisPipelineExecutionDTO, Object> dtoMapper, Consumer<List<PyrisStageDTO>> statusUpdater) {
public void executePipeline(String name, String variant, Function<String, PyrisJob> jobFunction, Function<PyrisPipelineExecutionDTO, Object> dtoMapper,
Consumer<List<PyrisStageDTO>> statusUpdater) {
// Define the preparation stages of pipeline execution with their initial states
// There will be more stages added in Pyris later
var preparing = new PyrisStageDTO("Preparing", 10, null, null);
Expand All @@ -102,6 +66,8 @@ public void executePipeline(String name, String variant, String jobToken, Functi
// Send initial status update indicating that the preparation stage is in progress
statusUpdater.accept(List.of(preparing.inProgress(), executing.notStarted()));

String jobToken = pyrisJobService.registerJob(jobFunction);

var baseDto = new PyrisPipelineExecutionDTO(new PyrisPipelineExecutionSettingsDTO(jobToken, List.of(), artemisBaseUrl), List.of(preparing.done()));
var pipelineDto = dtoMapper.apply(baseDto);

Expand All @@ -123,106 +89,4 @@ public void executePipeline(String name, String variant, String jobToken, Functi
statusUpdater.accept(List.of(preparing.error("An internal error occurred"), executing.notStarted()));
}
}

/**
* Execute the exercise chat pipeline for the given session.
* It provides specific data for the exercise chat pipeline, including:
* - The latest submission of the student
* - The programming exercise
* - The course the exercise is part of
* <p>
*
* @param variant the variant of the pipeline
* @param latestSubmission the latest submission of the student
* @param exercise the programming exercise
* @param session the chat session
* @see PyrisPipelineService#executePipeline for more details on the pipeline execution process.
*/
public void executeExerciseChatPipeline(String variant, Optional<ProgrammingSubmission> latestSubmission, ProgrammingExercise exercise, IrisExerciseChatSession session) {
// @formatter:off
executePipeline(
"tutor-chat", // TODO: Rename this to 'exercise-chat' with next breaking Pyris version
variant,
pyrisJobService.addExerciseChatJob(exercise.getCourseViaExerciseGroupOrCourseMember().getId(), exercise.getId(), session.getId()),
executionDto -> {
var course = exercise.getCourseViaExerciseGroupOrCourseMember();
return new PyrisExerciseChatPipelineExecutionDTO(
latestSubmission.map(pyrisDTOService::toPyrisSubmissionDTO).orElse(null),
pyrisDTOService.toPyrisProgrammingExerciseDTO(exercise),
new PyrisCourseDTO(course),
pyrisDTOService.toPyrisMessageDTOList(session.getMessages()),
new PyrisUserDTO(session.getUser()),
executionDto.settings(),
executionDto.initialStages()
);
},
stages -> irisChatWebsocketService.sendStatusUpdate(session, stages)
);
// @formatter:on
}

/**
* Execute the course chat pipeline for the given session.
* It provides specific data for the course chat pipeline, including:
* - The full course with the participation of the student
* - The metrics of the student in the course
* - The competency JoL if this is due to a JoL set event
* <p>
*
* @param variant the variant of the pipeline
* @param session the chat session
* @param competencyJol if this is due to a JoL set event, this must be the newly created competencyJoL
* @see PyrisPipelineService#executePipeline for more details on the pipeline execution process.
*/
public void executeCourseChatPipeline(String variant, IrisCourseChatSession session, CompetencyJol competencyJol) {
// @formatter:off
var courseId = session.getCourse().getId();
var studentId = session.getUser().getId();
executePipeline(
"course-chat",
variant,
pyrisJobService.addCourseChatJob(courseId, session.getId()),
executionDto -> {
var fullCourse = loadCourseWithParticipationOfStudent(courseId, studentId);
return new PyrisCourseChatPipelineExecutionDTO(
PyrisExtendedCourseDTO.of(fullCourse),
learningMetricsService.getStudentCourseMetrics(session.getUser().getId(), courseId),
competencyJol == null ? null : CompetencyJolDTO.of(competencyJol),
pyrisDTOService.toPyrisMessageDTOList(session.getMessages()),
new PyrisUserDTO(session.getUser()),
executionDto.settings(), // flatten the execution dto here
executionDto.initialStages()
);
},
stages -> irisChatWebsocketService.sendStatusUpdate(session, stages)
);
// @formatter:on
}

/**
* Load the course with the participation of the student and set the participations on the exercises.
* <p>
* Spring Boot 3 does not support conditional left joins, so we have to load the participations separately.
*
* @param courseId the id of the course
* @param studentId the id of the student
*/
private Course loadCourseWithParticipationOfStudent(long courseId, long studentId) {
Course course = courseRepository.findWithEagerExercisesAndLecturesAndAttachmentsAndLectureUnitsAndCompetenciesAndExamsById(courseId).orElseThrow();
List<StudentParticipation> participations = studentParticipationRepository.findByStudentIdAndIndividualExercisesWithEagerSubmissionsResultIgnoreTestRuns(studentId,
course.getExercises());

Map<Long, Set<StudentParticipation>> participationMap = new HashMap<>();
for (StudentParticipation participation : participations) {
Long exerciseId = participation.getExercise().getId();
participationMap.computeIfAbsent(exerciseId, k -> new HashSet<>()).add(participation);
}

course.getExercises().forEach(exercise -> {
Set<StudentParticipation> exerciseParticipations = participationMap.getOrDefault(exercise.getId(), Set.of());
exercise.setStudentParticipations(exerciseParticipations);
});

return course;
}
}
Loading

0 comments on commit fb007c3

Please sign in to comment.