JobQueueMPI.jl
Installation
You can install JobQueueMPI.jl using the Julia package manager. From the Julia REPL, type ]
to enter the Pkg REPL mode and run:
pkg> add JobQueueMPI
How it works
First, when running a program using MPI, the user has to set the number of processes that will parallelize the computation. One of these processes will be the controller, and the others will be the workers.
We can easily delimit the areas of the code that will be executed only by the controller or the worker.
JobQueueMPI.jl has the following components:
Controller
: The controller is responsible for managing the jobs and the workers. It keeps track of the jobs that have been sent and received and sends the jobs to the available workers.Worker
: The worker is responsible for executing the jobs. It receives the jobs from the controller, executes them, and sends the results back to the controller.
Users can call functions to compute jobs in parallel in two ways:
- Building a function and using a
pmap
implementation that will put the function in the job queue and send it to the workers.
using JobQueueMPI
function sum_100(value)
return value + 100
end
sum_100_answer = JobQueueMPI.pmap(sum_100, collect(1:10))
- Building the jobs and sending them to workers explicitly. There are examples of this structure in the test folder. This way is much more flexible than the first one, but it requires more code and knowledge about how MPI works.
using JobQueueMPI
mutable struct Message
value::Int
vector_idx::Int
end
all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)
function sum_100(message::Message)
message.value += 100
return message
end
function update_data(new_data, message::Message)
idx = message.vector_idx
value = message.value
return new_data[idx] = value
end
function workers_loop()
if JQM.is_worker_process()
worker = JQM.Worker()
while true
job = JQM.receive_job(worker)
message = JQM.get_message(job)
if message == JQM.TerminationMessage()
break
end
return_message = sum_100(message)
JQM.send_job_answer_to_controller(worker, return_message)
end
exit(0)
end
end
function job_queue(data)
JQM.mpi_init()
JQM.mpi_barrier()
T = eltype(data)
N = length(data)
if JQM.is_controller_process()
new_data = Array{T}(undef, N)
controller = JQM.Controller(JQM.num_workers())
for i in eachindex(data)
message = Message(data[i], i)
JQM.add_job_to_queue!(controller, message)
end
while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_job_answers(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
end
end
end
JQM.send_termination_message()
return new_data
end
workers_loop()
JQM.mpi_barrier()
JQM.mpi_finalize()
return nothing
end
data = collect(1:10)
new_data = job_queue(data)
API
JobQueueMPI.Controller
— TypeController
The controller struct is used to manage the workers and the jobs. It keeps track of the workers' status, the job queue, and the pending jobs. It also keeps track of the last job id that was sent to the workers.
JobQueueMPI.Worker
— TypeWorker
A worker process.
JobQueueMPI.add_job_to_queue!
— Functionadd_job_to_queue!(controller::Controller, message::Any)
Add a job to the controller's job queue.
JobQueueMPI.send_jobs_to_any_available_workers
— Functionsend_jobs_to_any_available_workers(controller::Controller)
Send jobs to any available workers.
JobQueueMPI.send_termination_message
— Functionsend_termination_message()
Send a termination message to all workers.
JobQueueMPI.check_for_job_answers
— Functioncheck_for_job_answers(controller::Controller)
Check if any worker has completed a job and return the answer.
JobQueueMPI.send_job_answer_to_controller
— Functionsend_job_answer_to_controller(worker::Worker, message)
Send a job answer to the controller process.
JobQueueMPI.receive_job
— Functionreceive_job(worker::Worker)
Receive a job from the controller process.
JobQueueMPI.get_message
— Functionget_message(job::AbstractJob)
Get the message from a job.
JobQueueMPI.pmap
— Functionpmap(
f::Function,
jobs::Vector,
data_defined_in_process = nothing;
return_result_in_all_processes::Bool = false
)
Parallel map function that works with MPI. If the function is called in parallel, it will distribute the jobs to the workers and collect the results. If the function is called in serial, it will just map the function to the jobs.
The function f
should take one argument, which is the message to be processed. If data_defined_in_process
is not nothing
, the function f
should take two arguments, the first one being data_defined_in_process
.
The return_result_in_all_processes
argument is used to broadcast the result to all processes. If set to true
.
The controller process will return the answer in the same order as the jobs were given. The workers will return nothing.