The rworker
package establishes the interface for running asynchronous tasks sent by Celery. It works by listening for new task execution requests (TERs) comming from the message broker and giving it for the background worker processes to execute.
rworker
functionThis is the main function of the package. It basically creates a new instance of the Rworker
object, which is responsible for managing TERs comming from the message broker.
library(rworker)
url <- 'redis://localhost:6379'
rwork <- rworker(name='celery', queue=url, backend=url, workers=2)
The name
argument defines the name of the queue that we should listen for new TERs. By default, Celery queue name is ‘celery’. The queue
and the backend
arguments follow the provider://address:port
format and define the provider and the address of the message queue and the task results backend, respectivelly. The last argument workers
defines the number of background processes responsible for executing the incoming TERs.
Before start receiving TERs, your have to define your tasks. These tasks are simply the functions you want to execute remotelly.
All tasks that you may want to execute from Celery need to be registered in the Rworker
instance.
library(magrittr)
(function(){
# Simulating long running function
Sys.sleep(10)
}) %>% rwork$task(name='long_running_task')
(function(){
# Another dummy function
print('Hello world')
}) %>% rwork$task(name='hello_world')
The name
argument must be unique, since it’s used to identify the correct task to be executed.
Sometimes is nice to now in which point of execution your task currently is. You can do this using the task_progress
function.
(function(){
Sys.sleep(5)
task_progress(50) # 50% progress
Sys.sleep(5)
task_progress(100) # 100% progress
}) %>% rwork$task(name='task_with_progress')
On the Celery side, the progress information is stored inside the .result['progress']
attribute of the AsyncResult
object.
Now that the desired tasks were alredy registered, we just need to listen for new task execution requests
rwork$consume()
Now, every time you send new tasks from Python using Celery the rwork$consume()
method will receive it and execute in background.
Now let’s review the whole ideia. We have three players on the game: Celery (Python), rworker (R) and the message broker (Redis, in this case).
library(rworker)
library(magrittr)
url <- 'redis://localhost:6379'
rwork <- rworker(name='celery', queue=url, backend=url, workers=2)
(function(){
# Simulating long running function
Sys.sleep(10)
}) %>% rwork$task(name='long_running_task')
(function(){
# Another dummy function
print('Hello world')
}) %>% rwork$task(name='hello_world')
(function(){
Sys.sleep(5)
task_progress(50) # 50% progress
Sys.sleep(5)
task_progress(100) # 100% progress
}) %>% rwork$task(name='task_with_progress')
rwork$consume()
from celery import Celery
url = 'redis://localhost:6379/0'
worker = Celery('app', broker=url, backend=url)
async_result = worker.send_task('task_with_progress')
# Check task progress
async_result.info['progress']
# Check task state
async_result.state