Skip to content

Commit

Permalink
Merge pull request #153 from lanl/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
MaksimEkin authored Apr 27, 2024
2 parents 3207768 + 6fe30b6 commit 014900f
Show file tree
Hide file tree
Showing 102 changed files with 2,773 additions and 1,161 deletions.
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ authors:
- family-names: Alexandrov
given-names: Boian
title: "Tensor Extraction of Latent Features (T-ELF)"
version: 0.0.16
version: 0.0.17
url: https://github.com/lanl/T-ELF
doi: 10.5281/zenodo.10257897
date-released: 2023-12-04
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ conda install cudnn
| BNMFk | | | | | | | Boolean NMFk | | :soon: |
| SPLIT NMFk | | | | | | | Joint NMFk factorization of multiple data via SPLIT | | :soon: |
| SPLIT Transfer Classifier | | | | | | | Supervised transfer learning method via SPLIT and NMFk | | :soon: |
| CP-ALS | | | | | | | Alternating least squares algorithm for canonical polyadic decomposition | | :soon: |
| CP-APR | | | | | | | Alternating Poisson regression algorithm for canonical polyadic decomposition | | :soon: |
| NTDS_FAPG | | | | | | | Non-negative Tucker Tensor Decomposition | | :soon: |

### TELF.pre_processing

Expand Down Expand Up @@ -156,7 +153,7 @@ If you use T-ELF please cite.

**APA:**
```latex
Eren, M., Solovyev, N., Barron, R., Bhattarai, M., Truong, D., Boureima, I., Skau, E., Rasmussen, K., & Alexandrov, B. (2023). Tensor Extraction of Latent Features (T-ELF) (Version 0.0.16) [Computer software]. https://doi.org/10.5281/zenodo.10257897
Eren, M., Solovyev, N., Barron, R., Bhattarai, M., Truong, D., Boureima, I., Skau, E., Rasmussen, K., & Alexandrov, B. (2023). Tensor Extraction of Latent Features (T-ELF) (Version 0.0.17) [Computer software]. https://doi.org/10.5281/zenodo.10257897
```

**BibTeX:**
Expand Down
80 changes: 14 additions & 66 deletions TELF/factorization/HNMFk.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import pickle
import warnings

from .utilities.hpc_comm_helpers import signal_workers_exit, worker_check_exit_status, get_next_job_at_worker, collect_results_from_workers, send_job_to_worker_nodes

try:
from mpi4py import MPI
except:
Expand Down Expand Up @@ -277,27 +279,30 @@ def fit(self, X, Ks, from_checkpoint=False, save_checkpoint=False):
# check exit status
if len(self.target_jobs) == 0 and rank == 0 and all([info["free"] for _, info in self.node_status.items()]):
if self.n_nodes > 1:
self._signal_workers_exit(comm)
signal_workers_exit(comm, self.n_nodes)
break

#
# worker nodes check exit status
#
if self.n_nodes > 1:
self._worker_check_exit_status(rank, comm)
worker_check_exit_status(rank, comm)

#
# send job to worker nodes
#
if rank == 0 and self.n_nodes > 1:
self._send_job_to_worker_nodes(comm)
send_job_to_worker_nodes(
comm, self.target_jobs, self.node_status
)

#
# recieve jobs from rank 0 at worker nodes
#
elif rank != 0 and self.n_nodes > 1:
job_data, job_flag = self._get_next_job_at_worker(
rank, comm)
job_data, job_flag = get_next_job_at_worker(
rank, comm, self.comm_buff_size
)

#
# single node job schedule
Expand All @@ -322,7 +327,10 @@ def fit(self, X, Ks, from_checkpoint=False, save_checkpoint=False):

# collect results at root
elif rank == 0 and self.n_nodes > 1:
all_node_results = self._collect_results_from_workers(rank, comm)
all_node_results = collect_results_from_workers(
rank, comm, self.n_nodes,
self.node_status, self.comm_buff_size
)
if len(all_node_results) == 0:
continue

Expand Down Expand Up @@ -676,66 +684,6 @@ def _organize_nmfk_params(self, params):


return params

def _signal_workers_exit(self, comm):
for job_rank in range(1, self.n_nodes, 1):
req = comm.isend(np.array([True]),
dest=job_rank, tag=int(f'400{job_rank}'))
req.wait()

def _worker_check_exit_status(self, rank, comm):
if comm.iprobe(source=0, tag=int(f'400{rank}')):
sys.exit(0)

def _get_next_job_at_worker(self, rank, comm):
job_flag = True
if comm.iprobe(source=0, tag=int(f'200{rank}')):
req = comm.irecv(buf=bytearray(b" " * self.comm_buff_size),
source=0, tag=int(f'200{rank}'))
data = req.wait()

else:
job_flag = False
data = {}

return data, job_flag

def _collect_results_from_workers(self, rank, comm):

all_results = []
# collect results at root
if self.n_nodes > 1 and rank == 0:
for job_rank, status_info in self.node_status.items():
if self.node_status[job_rank]["free"] == False and comm.iprobe(source=job_rank, tag=int(f'300{job_rank}')):
req = comm.irecv(buf=bytearray(b" " * self.comm_buff_size),
source=job_rank, tag=int(f'300{job_rank}'))
node_results = req.wait()
self.node_status[job_rank]["free"] = True
self.node_status[job_rank]["job"] = None
all_results.append(node_results)

return all_results

def _send_job_to_worker_nodes(self, comm):
scheduled = 0
available_jobs = list(self.target_jobs.keys())
# remove the jobs that are in the nodes from the available jobs
for _, status_info in self.node_status.items():
if status_info["job"] is not None and status_info["free"] is False:
try:
_ = available_jobs.pop(available_jobs.index(status_info["job"]))
except Exception as e:
print(e)

for job_rank, status_info in self.node_status.items():
if len(available_jobs) > 0 and status_info["free"]:
next_job = available_jobs.pop(0)
req = comm.isend(self.target_jobs[next_job],
dest=job_rank, tag=int(f'200{job_rank}'))
req.wait()
self.node_status[job_rank]["free"] = False
self.node_status[job_rank]["job"] = next_job
scheduled += 1

def _process_results(self, node_results, save_checkpoint):
# remove the job
Expand Down
Loading

0 comments on commit 014900f

Please sign in to comment.