Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: module_executor support layer #8212

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 76 additions & 26 deletions crates/rspack_core/src/compiler/module_executor/ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ use rspack_collections::IdentifierMap;
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver};

use super::{
entry::{EntryParam, EntryTask},
execute::ExecuteTask,
};
use super::{entry::EntryTask, execute::ExecuteTask};
use crate::{
compiler::make::repair::MakeTaskContext,
utils::task_loop::{Task, TaskResult, TaskType},
Dependency, DependencyId, ModuleIdentifier,
Dependency, DependencyId, LoaderImportDependency, ModuleIdentifier,
};

#[derive(Debug)]
Expand Down Expand Up @@ -45,6 +42,29 @@ impl UnfinishCounter {
}
}

#[derive(Default)]
struct ExecuteTaskList(Vec<Box<dyn Task<MakeTaskContext>>>);

impl ExecuteTaskList {
fn add_task(&mut self, task: ExecuteTask) {
self.0.push(Box::new(task));
if self.0.len() > 10000 {
// TODO change to Err
panic!("ExecuteTaskList exceeds limit and may contain circular build dependencies.")
}
}

fn into_vec(self) -> Vec<Box<dyn Task<MakeTaskContext>>> {
self.0
}
}

#[derive(Debug)]
pub enum ExecuteParam {
DependencyId(DependencyId),
Entry(Box<LoaderImportDependency>, Option<String>),
}

// send event can only use in sync task
#[derive(Debug)]
pub enum Event {
Expand All @@ -57,14 +77,13 @@ pub enum Event {
),
// current_module_identifier and sub dependency count
FinishModule(ModuleIdentifier, usize),
ExecuteModule(EntryParam, ExecuteTask),
ExecuteModule(ExecuteParam, ExecuteTask),
Stop(),
}

#[derive(Debug)]
pub struct CtrlTask {
pub event_receiver: UnboundedReceiver<Event>,
execute_task_map: HashMap<DependencyId, ExecuteTask>,
execute_task_map: HashMap<DependencyId, ExecuteTaskList>,
running_module_map: IdentifierMap<UnfinishCounter>,
}

Expand Down Expand Up @@ -105,11 +124,13 @@ impl Task<MakeTaskContext> for CtrlTask {
// target module finished
let Some(origin_module_identifier) = origin_module_identifier else {
// origin_module_identifier is none means entry dep
let execute_task = self
let mut tasks = self
.execute_task_map
.remove(&dep_id)
.expect("should have execute task");
return Ok(vec![Box::new(execute_task), self]);
.expect("should have execute task")
.into_vec();
tasks.push(self);
return Ok(tasks);
};

let value = self
Expand Down Expand Up @@ -138,12 +159,26 @@ impl Task<MakeTaskContext> for CtrlTask {
}
}
Event::ExecuteModule(param, execute_task) => {
let dep_id = match &param {
EntryParam::DependencyId(id, _) => *id,
EntryParam::Entry(dep) => *dep.id(),
match param {
ExecuteParam::Entry(dep, layer) => {
let dep_id = dep.id();
if let Some(tasks) = self.execute_task_map.get_mut(dep_id) {
tasks.add_task(execute_task)
} else {
let mut list = ExecuteTaskList::default();
list.add_task(execute_task);
self.execute_task_map.insert(*dep_id, list);
return Ok(vec![Box::new(EntryTask { dep, layer }), self]);
}
}
ExecuteParam::DependencyId(dep_id) => {
if let Some(tasks) = self.execute_task_map.get_mut(&dep_id) {
tasks.add_task(execute_task)
} else {
return Ok(vec![Box::new(execute_task), self]);
}
}
};
self.execute_task_map.insert(dep_id, execute_task);
return Ok(vec![Box::new(EntryTask { param }), self]);
}
Event::Stop() => {
return Ok(vec![]);
Expand All @@ -155,7 +190,6 @@ impl Task<MakeTaskContext> for CtrlTask {
}
}

#[derive(Debug)]
struct FinishModuleTask {
ctrl_task: Box<CtrlTask>,
module_identifier: ModuleIdentifier,
Expand Down Expand Up @@ -212,8 +246,9 @@ impl Task<MakeTaskContext> for FinishModuleTask {
let execute_task = ctrl_task
.execute_task_map
.remove(&dep_id)
.expect("should have execute task");
res.push(Box::new(execute_task));
.expect("should have execute task")
.into_vec();
res.extend(execute_task);
continue;
};

Expand All @@ -237,12 +272,26 @@ impl Task<MakeTaskContext> for FinishModuleTask {
}
}
Event::ExecuteModule(param, execute_task) => {
let dep_id = match &param {
EntryParam::DependencyId(id, _) => *id,
EntryParam::Entry(dep) => *dep.id(),
match param {
ExecuteParam::Entry(dep, layer) => {
let dep_id = dep.id();
if let Some(tasks) = ctrl_task.execute_task_map.get_mut(dep_id) {
tasks.add_task(execute_task)
} else {
let mut list = ExecuteTaskList::default();
list.add_task(execute_task);
ctrl_task.execute_task_map.insert(*dep_id, list);
res.push(Box::new(EntryTask { dep, layer }));
}
}
ExecuteParam::DependencyId(dep_id) => {
if let Some(tasks) = ctrl_task.execute_task_map.get_mut(&dep_id) {
tasks.add_task(execute_task)
} else {
res.push(Box::new(execute_task));
}
}
};
ctrl_task.execute_task_map.insert(dep_id, execute_task);
res.push(Box::new(EntryTask { param }));
}
Event::Stop() => {
return Ok(vec![]);
Expand Down Expand Up @@ -273,8 +322,9 @@ impl Task<MakeTaskContext> for FinishModuleTask {
let execute_task = ctrl_task
.execute_task_map
.remove(&connection.dependency_id)
.expect("should have execute task");
res.push(Box::new(execute_task));
.expect("should have execute task")
.into_vec();
res.extend(execute_task);
}
}

Expand Down
80 changes: 29 additions & 51 deletions crates/rspack_core/src/compiler/module_executor/entry.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
use tokio::sync::mpsc::UnboundedSender;

use super::ctrl::Event;
use crate::{
compiler::make::repair::{factorize::FactorizeTask, MakeTaskContext},
utils::task_loop::{Task, TaskResult, TaskType},
Dependency, DependencyId, LoaderImportDependency, ModuleProfile,
Dependency, LoaderImportDependency, ModuleProfile,
};

#[derive(Debug)]
pub enum EntryParam {
DependencyId(DependencyId, UnboundedSender<Event>),
Entry(Box<LoaderImportDependency>),
}

#[derive(Debug)]
pub struct EntryTask {
pub param: EntryParam,
pub dep: Box<LoaderImportDependency>,
pub layer: Option<String>,
}

impl Task<MakeTaskContext> for EntryTask {
Expand All @@ -24,48 +16,34 @@ impl Task<MakeTaskContext> for EntryTask {
}

fn sync_run(self: Box<Self>, context: &mut MakeTaskContext) -> TaskResult<MakeTaskContext> {
let Self { param } = *self;
let Self { dep, layer } = *self;
let mut module_graph =
MakeTaskContext::get_module_graph_mut(&mut context.artifact.module_graph_partial);

match param {
EntryParam::DependencyId(dep_id, sender) => {
if let Some(module_identifier) = module_graph.module_identifier_by_dependency_id(&dep_id) {
sender
.send(Event::FinishDeps(None, dep_id, Some(*module_identifier)))
.expect("should success");
} else {
// no module_identifier means the factorize task not run, do nothing
}
Ok(vec![])
}
EntryParam::Entry(dep) => {
module_graph.add_dependency(dep.clone());
Ok(vec![Box::new(FactorizeTask {
module_factory: context
.dependency_factories
.get(dep.dependency_type())
.unwrap_or_else(|| {
panic!(
"should have dependency_factories for dependency_type: {}",
dep.dependency_type()
)
})
.clone(),
original_module_identifier: None,
original_module_source: None,
issuer: None,
issuer_layer: None,
original_module_context: None,
dependencies: vec![dep],
resolve_options: None,
options: context.compiler_options.clone(),
current_profile: context
.compiler_options
.profile
.then(Box::<ModuleProfile>::default),
})])
}
}
module_graph.add_dependency(dep.clone());
Ok(vec![Box::new(FactorizeTask {
module_factory: context
.dependency_factories
.get(dep.dependency_type())
.unwrap_or_else(|| {
panic!(
"should have dependency_factories for dependency_type: {}",
dep.dependency_type()
)
})
.clone(),
original_module_identifier: None,
original_module_source: None,
issuer: None,
issuer_layer: layer,
original_module_context: None,
dependencies: vec![dep],
resolve_options: None,
options: context.compiler_options.clone(),
current_profile: context
.compiler_options
.profile
.then(Box::<ModuleProfile>::default),
})])
}
}
3 changes: 2 additions & 1 deletion crates/rspack_core/src/compiler/module_executor/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl Task<MakeTaskContext> for ExecuteTask {
let id = EXECUTE_MODULE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let mg = compilation.get_module_graph_mut();
// TODO remove expect and return Err
let entry_module_identifier = mg
.get_module_by_dependency_id(&entry_dep_id)
.expect("should have module")
Expand All @@ -93,7 +94,7 @@ impl Task<MakeTaskContext> for ExecuteTask {
}
}

tracing::info!("modules: {:?}", &modules.iter().collect::<Vec<_>>());
tracing::info!("modules: {:?}", &modules);

let mut chunk_graph = ChunkGraph::default();

Expand Down
11 changes: 5 additions & 6 deletions crates/rspack_core/src/compiler/module_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use tokio::sync::{
};

use self::{
ctrl::{CtrlTask, Event},
entry::EntryParam,
ctrl::{CtrlTask, Event, ExecuteParam},
execute::{ExecuteModuleResult, ExecuteTask},
overwrite::OverwriteTask,
};
Expand All @@ -29,7 +28,7 @@ use crate::{

#[derive(Debug, Default)]
pub struct ModuleExecutor {
request_dep_map: DashMap<String, DependencyId>,
request_dep_map: DashMap<(String, Option<String>), DependencyId>,
pub make_artifact: MakeArtifact,

event_sender: Option<UnboundedSender<Event>>,
Expand Down Expand Up @@ -192,19 +191,19 @@ impl ModuleExecutor {
.event_sender
.as_ref()
.expect("should have event sender");
let (param, dep_id) = match self.request_dep_map.entry(request.clone()) {
let (param, dep_id) = match self.request_dep_map.entry((request.clone(), layer.clone())) {
Entry::Vacant(v) => {
let dep = LoaderImportDependency::new(
request.clone(),
original_module_context.unwrap_or(Context::from("")),
);
let dep_id = *dep.id();
v.insert(dep_id);
(EntryParam::Entry(Box::new(dep)), dep_id)
(ExecuteParam::Entry(Box::new(dep), layer.clone()), dep_id)
}
Entry::Occupied(v) => {
let dep_id = *v.get();
(EntryParam::DependencyId(dep_id, sender.clone()), dep_id)
(ExecuteParam::DependencyId(dep_id), dep_id)
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = "data";
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
it("should compile", () => {
console.log(123);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/** @type {import("../../../../").PitchLoaderDefinitionFunction} */
module.exports = async function (remaining) {
try {
const result = await this.importModule("./banner.js", {
layer: "loader"
});
expect(result).toEqual("data");
return `export default ${result}`;
} catch (e) {
console.error(e);
throw e;
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/** @type {import("@rspack/core").Configuration} */
module.exports = {
entry: "./index.js",
module: {
rules: [
{
test: /\.js/,
loader: "./loader",
issuerLayer: "main",
options: {}
}
]
},
experiments: {
layers: true
}
};
Loading