Run Asynchronous Function in R

Run asynchronous function in R

Eventually I stopped on the following solution:

Rpath <- Find(file.exists, c(commandArgs()[[1]], file.path(R.home("bin"), commandArgs()[[1]]),
file.path(R.home("bin"), "R"), file.path(R.home("bin"), "Rscript.exe")))
out <- system('%s --no-save --slave -e \"Your R code here\" ', wait=FALSE)

The first line searches for the path of R executable, and the second executes R code from the command-line environment not waiting for result.

Asynchronous programming in R

It seems to me your call is only blocking while the workers are created, but not for the duration of the actual work. E.g. if do the plan() first, the counter will not block:

library(future)

sampler <- function(){
for(s in 1:10000) sample(1000000)
}

plan(multiprocess)

for(i in 1:100){
message(i)
if(i == 50){
mySamples <- future({ sampler() })
}
}

Also note, that the runtime of sampler() is much longer than the duration of the blocking call in your code and that, after executing your code, mySamples still has the status resolved: FALSE and CPU usage is still high.

How to call asynchronous method from synchronous method in C#?

Asynchronous programming does "grow" through the code base. It has been compared to a zombie virus. The best solution is to allow it to grow, but sometimes that's not possible.

I have written a few types in my Nito.AsyncEx library for dealing with a partially-asynchronous code base. There's no solution that works in every situation, though.

Solution A

If you have a simple asynchronous method that doesn't need to synchronize back to its context, then you can use Task.WaitAndUnwrapException:

var task = MyAsyncMethod();
var result = task.WaitAndUnwrapException();

You do not want to use Task.Wait or Task.Result because they wrap exceptions in AggregateException.

This solution is only appropriate if MyAsyncMethod does not synchronize back to its context. In other words, every await in MyAsyncMethod should end with ConfigureAwait(false). This means it can't update any UI elements or access the ASP.NET request context.

Solution B

If MyAsyncMethod does need to synchronize back to its context, then you may be able to use AsyncContext.RunTask to provide a nested context:

var result = AsyncContext.RunTask(MyAsyncMethod).Result;

*Update 4/14/2014: In more recent versions of the library the API is as follows:

var result = AsyncContext.Run(MyAsyncMethod);

(It's OK to use Task.Result in this example because RunTask will propagate Task exceptions).

The reason you may need AsyncContext.RunTask instead of Task.WaitAndUnwrapException is because of a rather subtle deadlock possibility that happens on WinForms/WPF/SL/ASP.NET:

  1. A synchronous method calls an async method, obtaining a Task.
  2. The synchronous method does a blocking wait on the Task.
  3. The async method uses await without ConfigureAwait.
  4. The Task cannot complete in this situation because it only completes when the async method is finished; the async method cannot complete because it is attempting to schedule its continuation to the SynchronizationContext, and WinForms/WPF/SL/ASP.NET will not allow the continuation to run because the synchronous method is already running in that context.

This is one reason why it's a good idea to use ConfigureAwait(false) within every async method as much as possible.

Solution C

AsyncContext.RunTask won't work in every scenario. For example, if the async method awaits something that requires a UI event to complete, then you'll deadlock even with the nested context. In that case, you could start the async method on the thread pool:

var task = Task.Run(async () => await MyAsyncMethod());
var result = task.WaitAndUnwrapException();

However, this solution requires a MyAsyncMethod that will work in the thread pool context. So it can't update UI elements or access the ASP.NET request context. And in that case, you may as well add ConfigureAwait(false) to its await statements, and use solution A.

Update, 2019-05-01: The current "least-worst practices" are in an MSDN article here.

Call async/await functions in parallel

You can await on Promise.all():

await Promise.all([someCall(), anotherCall()]);

To store the results:

let [someResult, anotherResult] = await Promise.all([someCall(), anotherCall()]);

Note that Promise.all fails fast, which means that as soon as one of the promises supplied to it rejects, then the entire thing rejects.

const happy = (v, ms) => new Promise((resolve) => setTimeout(() => resolve(v), ms))
const sad = (v, ms) => new Promise((_, reject) => setTimeout(() => reject(v), ms))

Promise.all([happy('happy', 100), sad('sad', 50)])
.then(console.log).catch(console.log) // 'sad'

R Shiny Async with Progress Bar

check out the package ipc:

## Only run examples in interactive R sessions
if (interactive()) {
library(shiny)
library(future)
plan(multiprocess)
ui <- fluidPage(
actionButton("run","Run"),
tableOutput("dataset")
)

server <- function(input, output, session) {

dat <- reactiveVal()
observeEvent(input$run, {
progress <- AsyncProgress$new(session, min=1, max=15)
future({
for (i in 1:15) {
progress$set(value = i)
Sys.sleep(0.5)
}
progress$close()
cars
}) %...>% dat
NULL
})

output$dataset <- renderTable({
req(dat())
})
}

shinyApp(ui, server)
}

How can I do a async function over and over?

Your wait function is not a promise-based async function and you need to change it.
Also, you need to await your getFee() function to make an async execution.

import fs from "fs-extra";
import fetch from "node-fetch";

const wait = ms => new Promise((resolve, reject) => setTimeout(resolve, ms));

async function gasFee() {
console.log("fetching ETH Price");
var ethprice = await fetch(
"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd"
);
var ethPriceJSON = await ethprice.json();
console.log("fetching Ethermine GWEI");
var etherminegwei = await fetch("https://api.ethermine.org/poolStats");
var ethermineGweiJSON = await etherminegwei.json();
var ethPrice = ethPriceJSON.ethereum.usd;
var ethermineGwei = ethermineGweiJSON.data.estimates.gasPrice;
var gweiPrice = ethPrice / 1000000000;
var price = ethermineGwei * gweiPrice * (21000).toFixed(2);
var timeNow = new Date();
if (price > 5) {
console.log("Gas Price Logged");
fs.appendFileSync("gasPrice.txt", "$" + price + " | " + timeNow + "\r\n");
} else {
return;
}
if (price <= 5) {
console.log(`Gas Price is $${price} at ${timeNow}`);
fs.appendFileSync(
"lowGasPrice.txt",
"$" + price + " | " + timeNow + "\r\n"
);
} else {
return;
}
}

(async function run() {
while (true) {
await gasFee();
await wait(1500);
}
})();

How do I implement an async Drop in Rust?

It's not clear how to do this, and I can't find anything in the docs

That's because it's not possible; there is no "async Drop". Drop must be synchronous.

See also:

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?

Python Async Error: await only allowed in async function

The entry point to an async program should be asyncio.run, you should wrap your code in an async method then call it

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO

class AsyncContainerClient(ContainerClient):

async def read_blob(self,
blob_name: str,
add_blob_name_col=False,
add_blob_date_col=False,
preprocessing_func=None,
zip_regex=r'.+\.gz$',
csv_regex='.+\.csv(\.gz)?$',
parquet_regex='.+\.parquet$',
regex_string=None,
**kwargs):

assert isinstance(blob_name, str), f'{blob_name} is not a string'

try:
blob = (await self.download_blob(blob_name))

with BytesIO() as byte_stream:
await blob.readinto(byte_stream)
byte_stream.seek(0)
return pd.read_parquet(byte_stream, engine='pyarrow')

except ResourceNotFoundError:
return 0

async def main():
blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)

test_dirs = ["models1/model.parquet", "models2/model.parquet",
"models3/model.parquet"]
return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

asyncio.run(main())


Related Topics



Leave a reply



Submit