Introduction

At some point, you might need to go beyond your local computer for increased performance and scalability. Luckily, because furrr depends on the future framework, this is still possible to accomplish with furrr. In this vignette, you will learn how to scale furrr with AWS EC2 instances in two ways:

  1. Running code remotely on a single EC2 instance
  2. Running code in parallel on multiple EC2 instances

AWS EC2? What?

If you know exactly what AWS EC2 is, and what AMIs are, feel free to skip this section! EC2 is Amazon’s Elastic Compute Cloud service. It is a way for people like you and me to essentially rent a computer (or multiple) in the cloud for a variable amount of time. The computer can be incredibly powerful, or really weak (and cheap!). It can run Linux or Windows. With furrr, we will run our code on these EC2 “instances” pre-loaded with R.

How do we get an instance pre-loaded with R? Great question. We will use an AMI. AMIs are “Amazon Machine Images”, in other words, a custom computer that already has software loaded onto it, rather than one that starts with nothing. A kind soul, Louis Aslett, keeps up-to-date RStudio AMIs on his website. We will use this for our instance.

At this point, I encourage you to look elsewhere for exactly how to set up an AWS instance based on this AMI. I have a blog post dedicated to this, located at my website blog.davisvaughan.com.

Running code remotely on a single EC2 instance

Imagine that you have some models that are going to take a long time to run, and you’d rather not run them on your laptop. Ideally, you’d be able to test them locally, but then you’d like to run them on a more powerful EC2 instance in the cloud. That’s what you’ll learn how to do here. This example won’t actually run the models in parallel. Instead, it will be focused on sending the data to a single EC2 instance so it can run all of the models sequentially. The next example after this one will go one step further to do work in parallel on multiple EC2 instances.

Modeling code

First, we need code to run that we want to run on the instance. For simplicity, say we want to run 3 separate linear models on mtcars, split up by gear.

by_gear <- mtcars %>%
  group_split(gear) 

models <- map(by_gear, ~lm(mpg ~ cyl + hp + wt, data = .))

models
#> [[1]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    30.48956     -0.31883     -0.02326     -2.03083  
#> 
#> 
#> [[2]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    43.69353      0.05647     -0.12331     -3.20537  
#> 
#> 
#> [[3]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    45.29099     -2.03268      0.02655     -6.42290

With furrr, we can run this in parallel locally using the following:

plan(multisession, workers = 2)

models <- future_map(by_gear, ~lm(mpg ~ cyl + hp + wt, data = .))

models
#> [[1]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    30.48956     -0.31883     -0.02326     -2.03083  
#> 
#> 
#> [[2]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    43.69353      0.05647     -0.12331     -3.20537  
#> 
#> 
#> [[3]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    45.29099     -2.03268      0.02655     -6.42290

Note that this is NOT faster than the sequential code, this is just to demonstrate how one might run the models in parallel.

Connecting to an EC2 instance

Now, what if these models took hours to run? Maybe we’d want to run them on a different or more powerful computer, and then have the results returned back to our local R session. In that case, go start up your favorite AWS EC2 instance, pre-loaded with R, and come back when you’ve finished. Then, you’ll need to:

  • Get the Public IP of your EC2 instance. This is located under the Instances section of the EC2 console. Specifically it is the IPv4 Public IP of your instance.

  • Make sure that your Security Group allows for SSH access either from Anywhere or My IP.

  • Find the path to your .pem file that is used to connect to the EC2 instance. This was created when you created the EC2 instance, and hopefully you know where you saved it!

# A t2.micro AWS instance
# Created from http://www.louisaslett.com/RStudio_AMI/
public_ip <- "34.230.28.118"

# This is where my pem file lives (password file to connect).
ssh_private_key_file <- "path/to/file.pem"

With all of this in hand, the next step is to use future::makeClusterPSOCK() to connect to the instance. Traditionally, one would use parallel::makePSOCKcluster() to connect, but the future version has a few additional helpful arguments that allow us to add extra options when connecting to the worker. If the connection is successful, the code below should start outputting package installation messages into your local console.

connect_to_ec2 <- function(public_ip, ssh_private_key_file) {
  makeClusterPSOCK(
    
    # Public IP number of EC2 instance
    workers = public_ip,
    
    # User name (always 'ubuntu')
    user = "ubuntu",
    
    # Use private SSH key registered with AWS
    rshopts = c(
      "-o", "StrictHostKeyChecking=no",
      "-o", "IdentitiesOnly=yes",
      "-i", ssh_private_key_file
    ),
    
    rscript_args = c(
      # Set up .libPaths() for the 'ubuntu' user
      "-e", shQuote(paste0(
        "local({",
        "p <- Sys.getenv('R_LIBS_USER'); ",
        "dir.create(p, recursive = TRUE, showWarnings = FALSE); ",
        ".libPaths(p)",
        "})"
      )),
      # Install furrr
      "-e", shQuote("install.packages('furrr')")
    ),
    
    # Switch this to TRUE to see the code that is run on the workers without
    # making the connection
    dryrun = FALSE
  )
}

cl <- connect_to_ec2(public_ip, ssh_private_key_file)

cl
#> Socket cluster with 1 nodes where 1 node is on host ‘34.230.28.118’ (R version 3.6.0 (2019-04-26), platform x86_64-pc-linux-gnu)

Let’s step through this a little.

  • workers - The public ip addresses of the workers you want to connect to. If you have multiple, you can list them here.

  • user - Because we used the RStudio AMI, this is always "ubuntu".

  • rshopts - These are options that are run on the command line of your local computer when connecting to the instance by ssh.

    • StrictHostKeyChecking=no - This is required because by default when connecting to the AWS instance for the first time you are asked if you want to “continue connecting” because authenticity of the AWS instance can’t be verified. Setting this option to no means we won’t have to answer this question.

    • IdentitiesOnly=yes - This is not necessarily required, but specifies that we only want to connect using the identity we supply with -i, which ends up being the .pem file.

  • rscript_args - This very helpful argument allows you to specify R code to run when the command line executable Rscript is called on your worker. Essentially, it allows you to run “start up code” on each worker. In this case, it is used to create package paths for the ubuntu user and to install a few packages that are required to work with furrr.

  • dryrun - This is already set to FALSE by default, but it’s useful to point this argument out as setting it to TRUE allows you to verify that the code that should run on each worker is correct.

Running the code

Now that we have a connection to an EC2 instance loaded with R, we’ll need to tell future and furrr how to use it. Since we already have a cluster object, all we have to do is change the plan() to use this cluster. Rather than using the multisession plan, we use the cluster plan with the extra argument, workers, set to the cluster connection (see ?future::cluster for more info).

plan(cluster, workers = cl)

models <- future_map(by_gear, ~lm(mpg ~ cyl + hp + wt, data = .))

models
#> [[1]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    30.48956     -0.31883     -0.02326     -2.03083  
#> 
#> 
#> [[2]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    43.69353      0.05647     -0.12331     -3.20537  
#> 
#> 
#> [[3]]
#> 
#> Call:
#> lm(formula = mpg ~ cyl + hp + wt, data = .)
#> 
#> Coefficients:
#> (Intercept)          cyl           hp           wt  
#>    45.29099     -2.03268      0.02655     -6.42290  

And that’s it! Your code just ran on an EC2 instance!

It is good practice to always disconnect from your cluster when you are finished working with it. Don’t forget to terminate the instance as well!

# Revert back to a sequential plan
plan(sequential)

parallel::stopCluster(cl)

Running code in parallel on each EC2 instance

Let’s crank it up a notch. In the previous example, code was run sequentially on a single EC2 instance. What if you had multiple EC2 instances, and each of those instances had multiple cores that you could use? For maximum efficiency, you’d want to:

  1. First, parallelize across the EC2 instances.
  2. Then, parallelize across the cores of each EC2 instance.

A concrete example would be if you had 2 t2.xlarge instances, each with 4 physical cores. Technically this means you have 8 logical cores due to hyperthreading, but I rarely see any actual benefits over just using the maximum number of physical cores (the exception might be if you are hitting an API, and most of the time is spent waiting for it to respond).

In the future world, this is dubbed “future topology”. There is an entire vignette about this that you can find here.

Connecting to multiple EC2 instances

So, just like before, start up your EC2 instances (Make sure to check out the EC2 instance type reference to see how many virtual cores (vCPUs) each one has).

To launch multiple, after clicking on the AMI you want to use from Louis’s page, under “Configure Instance Details” change the “Number of instances” box to whatever you require.

You might also consider changing the Purchasing option to “Request Spot instances” for cheaper instances if you don’t mind the possibility that Amazon could take the instance away from you temporarily at any time (this hasn’t ever happened to me).

Note that you now have a vector of public ip addresses.

# Two t2.xlarge AWS instances
# Created from http://www.louisaslett.com/RStudio_AMI/
public_ip <- c("54.157.169.96", "18.210.19.243")

# This is where my pem file lives (password file to connect).
ssh_private_key_file <- "path/to/file.pem"

Otherwise, the code remains the same to make the connection!

cl_multi <- connect_to_ec2(public_ip, ssh_private_key_file)

cl_multi
#> Socket cluster with 2 nodes where 1 node is on host ‘18.210.19.243’ (R version 3.6.0 (2019-04-26), platform x86_64-pc-linux-gnu), 1 node is on host ‘54.157.169.96’ (R version 3.6.0 (2019-04-26), platform x86_64-pc-linux-gnu)

Running multi-level parallel code

Now for the fun part. How do we tell future to first distribute our code over the 2 instances, and then run in parallel on each instance? You pass in a list of plans to plan(), where you also have the option to tweak() each plan individually (which will be required to set the workers argument!).

plan(list(
  # The outer plan tells future to distribute over the 2 instances
  tweak(cluster, workers = cl_multi), 
  
  # The inner plan says to run in parallel on each instance
  multisession
))

How do we know this is working? Let’s try doing something that would require a fixed amount of time when run locally, then try it in parallel. We are just going to wait for 2 seconds on each iteration, and then return the instance we are on and the core we are using. In total this should take 16 seconds.

t1 <- proc.time()

res <- map(
  
  # Map over the two instances
  .x = c(1, 2), 
  
  .f = ~ {
    
    outer_idx <- .x
    
    map_chr(
      
      # Each instance has 4 cores we can utilize
      .x = c(1, 2, 3, 4), 
      
      .f = ~ {
        inner_idx <- .x
        Sys.sleep(2)
        paste0("Instance: ", outer_idx, " and core: ", inner_idx)
      }
    )
    
  }
)

t2 <- proc.time()

res
#> [[1]]
#> [1] "Instance: 1 and core: 1" "Instance: 1 and core: 2" "Instance: 1 and core: 3" #> "Instance: 1 and core: 4"
#> 
#> [[2]]
#> [1] "Instance: 2 and core: 1" "Instance: 2 and core: 2" "Instance: 2 and core: 3" #> "Instance: 2 and core: 4"
t2 - t1
#>   user  system elapsed 
#>  0.055   0.051  16.022 

Now, in parallel with our cluster. The outer future_map() call distributes over the two instances, and the inner future_map_chr() call distributes over the cores of each instance. This should take ~2 seconds, with some overhead (16 seconds sequentially, split between 2 instances, and then each instance has 4 physical cores. So 2 seconds on each of the 8 available cores).

t1 <- proc.time()

res <- future_map(
  
  # Map over the two instances
  .x = c(1, 2), 
  
  .f = ~{
    
    outer_idx <- .x
    
    future_map_chr(
      
      # Each instance has 4 cores we can utilize
      .x = c(1, 2, 3, 4), 
      
      .f = ~{
        inner_idx <- .x
        Sys.sleep(2)
        paste0("Instance: ", outer_idx, " and core: ", inner_idx)
      }
    )
    
  }
)

t2 <- proc.time()
t2 - t1
#>   user  system elapsed 
#>  0.075   0.018   2.728 

Not bad! The extra time is due to the overhead of communicating with the AWS workers, but with a large model this would not be as relevant.

Don’t forget to close the connection, and then to terminate the EC2 instance!

plan(sequential)

parallel::stopCluster(cl_multi)

Conclusion

In this vignette, you learned how to distribute your code over AWS EC2 instances, and run code in parallel on each instance using future and furrr. Note that the code used here can also be used to run code on platforms such as Google Cloud Compute, or other remote clusters. You will just have to figure out the correct way to connect to those clusters. Additionally, once you have the connection in place you could just run basic future() commands to distribute code as well. This has the added benefit of not locking up your computer until you request the result with value().