How to Download Datas from Multiple Url in Concurrency Mode

How to download multiple files from different URLs using recursiveaction?

You are not using concurrency properly here.

What you should do is something like that:

String link;
File file;

public Parallel(String link, File file) {
this.link = link;
this.file = files;
}

@Override
public void run() {
try {
URL url = new URL(link);
HttpURLConnection http = (HttpURLConnection) url.openConnection();
double fileSize = (double) http.getContentLengthLong();
BufferedInputStream bis = new BufferedInputStream(http.getInputStream());
FileOutputStream fos = new FileOutputStream(file);
BufferedOutputStream bos = new BufferedOutputStream(fos, 600000);
byte[] buffer = new byte[1024];
double downloadedData = 0.00;
int readData = 0;

while ((readData = bis.read(buffer, 0, 1024)) >= 0) {
bos.write(buffer, 0, readData);
downloadedData += readData;
}
bos.close();
bis.close();
System.out.println(file + " -> done");
} catch (IOException ex) {
ex.printStackTrace();
}
}

And then:


String[] links;
File[] files;

//...

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (int i = 0; i <= 3; i++) {
Runnable worker = new Parallel(links[i], files[i]);
executor.execute(worker);
}
executor.shutdown();

Then each download would actually get its own thread.

In your case all downloads get one thread where it all happens sequentally.

how to download multiple file simultaneously and join them in python?

There are 2 ways to do things simultaneously. Or, really, 2-3/4 or so:

  • Multiple threads
    • Or multiple processes, especially if the "things" take a lot of CPU power
    • Or coroutines or greenlets, especially if there are thousands of "things"
    • Or pools of one of the above
  • Event loops (either coded manually)
    • Or hybrid greenlet/event loop systems like gevent.

If you have 1000 URLs, you probably don't want to do 1000 requests at the same time. For example, web browsers typically only do something like 8 requests at a time. A pool is a nice way to do only 8 things at a time, so let's do that.

And, since you're only doing 8 things at a time, and those things are primarily I/O bound, threads are perfect.


I'll implement it with futures. (If you're using Python 2.x, or 3.0-3.1, you will need to install the backport, futures.)

import concurrent.futures

urls = ['http://example.com/foo',
'http://example.com/bar']

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
result = b''.join(executor.map(download, urls))

with open('output_file', 'wb') as f:
f.write(result)

Of course you need to write the download function, but that's exactly the same function you'd write if you were doing these one at a time.

For example, using urlopen (if you're using Python 2.x, use urllib2 instead of urllib.request):

def download(url):
with urllib.request.urlopen(url) as f:
return f.read()

If you want to learn how to build a thread pool executor yourself, the source is actually pretty simple, and multiprocessing.pool is another nice example in the stdlib.

However, both of those have a lot of excess code (handling weak references to improve memory usage, shutting down cleanly, offering different ways of waiting on the results, propagating exceptions properly, etc.) that may get in your way.

If you look around PyPI and ActiveState, you will find simpler designs like threadpool that you may find easier to understand.

But here's the simplest joinable threadpool:

class ThreadPool(object):
def __init__(self, max_workers):
self.queue = queue.Queue()
self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
def start(self):
for worker in self.workers:
worker.start()
def stop(self):
for _ in range(self.workers):
self.queue.put(None)
for worker in self.workers:
worker.join()
def submit(self, job):
self.queue.put(job)
def _worker(self):
while True:
job = self.queue.get()
if job is None:
break
job()

Of course the downside of a dead-simple implementation is that it's not as friendly to use as concurrent.futures.ThreadPoolExecutor:

urls = ['http://example.com/foo', 
'http://example.com/bar']
results = [list() for _ in urls]
results_lock = threading.Lock()

def download(url, i):
with urllib.request.urlopen(url) as f:
result = f.read()
with results_lock:
results[i] = url

pool = ThreadPool(max_workers=8)
pool.start()
for i, url in enumerate(urls):
pool.submit(functools.partial(download, url, i))
pool.stop()

result = b''.join(results)

with open('output_file', 'wb') as f:
f.write(result)

Concurrent download and processing of large files in python

I'd simply use threading.Thread(target=process, args=(fname,)) and start a new thread for processing.

But before that, end last processing thread :

t = None
for fname in download(urls):
if t is not None: # wait for last processing thread to end
t.join()
t = threading.Thread(target=process, args=(fname,))
t.start()
print('[i] thread started for %s' % fname)

See https://docs.python.org/3/library/threading.html

Is there a way to request multiple distinct resources in parallel using URLSession.shared.dataTask

You ask:

Is there a way to request multiple distinct resources in parallel using URLSession.shared.dataTask

By default, it does perform requests in parallel.

Let’s step back for a second: In your prior question, you were asking how to implement a Kingfisher-like UIImageView extension. In my answer, I mentioned using objc_getAssociatedObject and objc_setAssociatedObject to achieve that. But in your question here, you’ve taken that associated object logic and put it in your DataRequest object.

Your thought process, to pull the asynchronous image retrieval logic out of the UIImageView is a good idea: You may want to request images for buttons. You might a general “fetch image asynchronously” routine, completely separate from any UIKit objects. So abstracting the network layer code out of the extension is an excellent idea.

But the whole idea behind asynchronous image retrieval UIImageView/UIButton extensions is that we want a UIKit control where not only can it perform asynchronous requests, but that if the cell with the control is reused, that it will cancel the prior asynchronous request (if any) before starting the next one. That way, if we scroll quickly down to images 80 through 99, the requests for cells 0 through 79 will be canceled, and the visible images won’t get backlogged behind all these old image requests.

But to achieve that, that means that the control needs some way to keep track of the prior request for that reused cell somehow. And because we can’t add stored properties in a UIImageView extension, that’s why we use the objc_getAssociatedObject and objc_setAssociatedObject pattern. But that has to be in the image view.

Unfortunately, in your code above, the associated object is in your DataRequest object. First, as I’ve tried to outline, the whole idea is that the image view must keep track of the prior request for that control. Putting this “keep track of the prior request” inside the DataRequest object defeats that purpose. Second, it’s worth noting that you don’t need associated objects in your own types, like DataRequest. You’d just have a stored property. You only need to go through this associated object silliness when extending another type, such as UIImageView.

Below, is a quick example that I whipped together showing a UIImageView extension for asynchronous image retrieval. Note, this doesn’t have the abstraction of the network code out of the extension, but do note that the associated object logic to keep track of the prior request must remain with the extension.

private var taskKey: Void?

extension UIImageView {
private static let imageProcessingQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".imageprocessing", attributes: .concurrent)

private var savedTask: URLSessionTask? {
get { return objc_getAssociatedObject(self, &taskKey) as? URLSessionTask }
set { objc_setAssociatedObject(self, &taskKey, newValue, .OBJC_ASSOCIATION_RETAIN) }
}

/// Set image asynchronously.
///
/// - Parameters:
/// - url: `URL` for image resource.
/// - placeholder: `UIImage` of placeholder image. If not supplied, `image` will be set to `nil` while request is underway.
/// - shouldResize: Whether the image should be scaled to the size of the image view. Defaults to `true`.

func setImage(_ url: URL, placeholder: UIImage? = nil, shouldResize: Bool = true) {
savedTask?.cancel()
savedTask = nil

image = placeholder
if let image = ImageCache.shared[url] {
DispatchQueue.main.async {
UIView.transition(with: self, duration: 0.1, options: .transitionCrossDissolve, animations: {
self.image = image
}, completion: nil)
}
return
}

var task: URLSessionTask!
let size = bounds.size * UIScreen.main.scale
task = URLSession.shared.dataTask(with: url) { [weak self] data, response, error in
guard
error == nil,
let httpResponse = response as? HTTPURLResponse,
(200..<300) ~= httpResponse.statusCode,
let data = data
else {
return
}

UIImageView.imageProcessingQueue.async { [weak self] in
var image = UIImage(data: data)
if shouldResize {
image = image?.scaledAspectFit(to: size)
}

ImageCache.shared[url] = image

DispatchQueue.main.async {
guard
let self = self,
let savedTask = self.savedTask,
savedTask.taskIdentifier == task.taskIdentifier
else {
return
}
self.savedTask = nil

UIView.transition(with: self, duration: 0.1, options: .transitionCrossDissolve, animations: {
self.image = image
}, completion: nil)
}
}
}
task.resume()
savedTask = task
}
}

class ImageCache {
static let shared = ImageCache()

private let cache = NSCache<NSURL, UIImage>()
private var observer: NSObjectProtocol?

init() {
observer = NotificationCenter.default.addObserver(forName: UIApplication.didReceiveMemoryWarningNotification, object: nil, queue: nil) { [weak self] _ in
self?.cache.removeAllObjects()
}
}

deinit {
NotificationCenter.default.removeObserver(observer!)
}

subscript(url: URL) -> UIImage? {
get {
return cache.object(forKey: url as NSURL)
}

set {
if let data = newValue {
cache.setObject(data, forKey: url as NSURL)
} else {
cache.removeObject(forKey: url as NSURL)
}
}
}
}

And this is my resizing routine:

extension UIImage {

/// Resize the image to be the required size, stretching it as needed.
///
/// - parameter newSize: The new size of the image.
/// - parameter contentMode: The `UIView.ContentMode` to be applied when resizing image.
/// Either `.scaleToFill`, `.scaleAspectFill`, or `.scaleAspectFit`.
///
/// - returns: Return `UIImage` of resized image.

func scaled(to newSize: CGSize, contentMode: UIView.ContentMode = .scaleToFill) -> UIImage? {
switch contentMode {
case .scaleToFill:
return filled(to: newSize)

case .scaleAspectFill, .scaleAspectFit:
let horizontalRatio = size.width / newSize.width
let verticalRatio = size.height / newSize.height

let ratio: CGFloat!
if contentMode == .scaleAspectFill {
ratio = min(horizontalRatio, verticalRatio)
} else {
ratio = max(horizontalRatio, verticalRatio)
}

let sizeForAspectScale = CGSize(width: size.width / ratio, height: size.height / ratio)
let image = filled(to: sizeForAspectScale)
let doesAspectFitNeedCropping = contentMode == .scaleAspectFit && (newSize.width > sizeForAspectScale.width || newSize.height > sizeForAspectScale.height)
if contentMode == .scaleAspectFill || doesAspectFitNeedCropping {
let subRect = CGRect(
x: floor((sizeForAspectScale.width - newSize.width) / 2.0),
y: floor((sizeForAspectScale.height - newSize.height) / 2.0),
width: newSize.width,
height: newSize.height)
return image?.cropped(to: subRect)
}
return image

default:
return nil
}
}

/// Resize the image to be the required size, stretching it as needed.
///
/// - parameter newSize: The new size of the image.
///
/// - returns: Resized `UIImage` of resized image.

func filled(to newSize: CGSize) -> UIImage? {
let format = UIGraphicsImageRendererFormat()
format.opaque = false
format.scale = scale

return UIGraphicsImageRenderer(size: newSize, format: format).image { _ in
draw(in: CGRect(origin: .zero, size: newSize))
}
}

/// Crop the image to be the required size.
///
/// - parameter bounds: The bounds to which the new image should be cropped.
///
/// - returns: Cropped `UIImage`.

func cropped(to bounds: CGRect) -> UIImage? {
// if bounds is entirely within image, do simple CGImage `cropping` ...

if CGRect(origin: .zero, size: size).contains(bounds) {
return cgImage?.cropping(to: bounds * scale).flatMap {
UIImage(cgImage: $0, scale: scale, orientation: imageOrientation)
}
}

// ... otherwise, manually render whole image, only drawing what we need

let format = UIGraphicsImageRendererFormat()
format.opaque = false
format.scale = scale

return UIGraphicsImageRenderer(size: bounds.size, format: format).image { _ in
let origin = CGPoint(x: -bounds.minX, y: -bounds.minY)
draw(in: CGRect(origin: origin, size: size))
}
}

/// Resize the image to fill the rectange of the specified size, preserving the aspect ratio, trimming if needed.
///
/// - parameter newSize: The new size of the image.
///
/// - returns: Return `UIImage` of resized image.

func scaledAspectFill(to newSize: CGSize) -> UIImage? {
return scaled(to: newSize, contentMode: .scaleAspectFill)
}

/// Resize the image to fit within the required size, preserving the aspect ratio, with no trimming taking place.
///
/// - parameter newSize: The new size of the image.
///
/// - returns: Return `UIImage` of resized image.

func scaledAspectFit(to newSize: CGSize) -> UIImage? {
return scaled(to: newSize, contentMode: .scaleAspectFit)
}

/// Create smaller image from `Data`
///
/// - Parameters:
/// - data: The image `Data`.
/// - maxSize: The maximum edge size.
/// - scale: The scale of the image (defaults to device scale if 0 or omitted.
/// - Returns: The scaled `UIImage`.

class func thumbnail(from data: Data, maxSize: CGFloat, scale: CGFloat = 0) -> UIImage? {
guard let imageSource = CGImageSourceCreateWithData(data as CFData, nil) else {
return nil
}

return thumbnail(from: imageSource, maxSize: maxSize, scale: scale)
}

/// Create smaller image from `URL`
///
/// - Parameters:
/// - data: The image file URL.
/// - maxSize: The maximum edge size.
/// - scale: The scale of the image (defaults to device scale if 0 or omitted.
/// - Returns: The scaled `UIImage`.

class func thumbnail(from fileURL: URL, maxSize: CGFloat, scale: CGFloat = 0) -> UIImage? {
guard let imageSource = CGImageSourceCreateWithURL(fileURL as CFURL, nil) else {
return nil
}

return thumbnail(from: imageSource, maxSize: maxSize, scale: scale)
}

private class func thumbnail(from imageSource: CGImageSource, maxSize: CGFloat, scale: CGFloat) -> UIImage? {
let scale = scale == 0 ? UIScreen.main.scale : scale
let options: [NSString: Any] = [
kCGImageSourceThumbnailMaxPixelSize: maxSize * scale,
kCGImageSourceCreateThumbnailFromImageAlways: true
]
if let scaledImage = CGImageSourceCreateThumbnailAtIndex(imageSource, 0, options as CFDictionary) {
return UIImage(cgImage: scaledImage, scale: scale, orientation: .up)
}
return nil
}

}

extension CGSize {
static func * (lhs: CGSize, rhs: CGFloat) -> CGSize {
return CGSize(width: lhs.width * rhs, height: lhs.height * rhs)
}
}

extension CGPoint {
static func * (lhs: CGPoint, rhs: CGFloat) -> CGPoint {
return CGPoint(x: lhs.x * rhs, y: lhs.y * rhs)
}
}

extension CGRect {
static func * (lhs: CGRect, rhs: CGFloat) -> CGRect {
return CGRect(origin: lhs.origin * rhs, size: lhs.size * rhs)
}
}

That having been said, we really should constrain our concurrent requests to something reasonable (4-6 at a time) so that they don’t try to start until the prior requests are done (or are canceled) to avoid timeouts. The typical solution is wrapping the requests with asynchronous Operation subclasses, add them to an operation queue, and constrain the maxConcurrentOperationCount to whatever value you choose.

Downloading Multiple Files Parallelly or Asynchronously in Java

Actually, after carefully looking, Boris' code is faulty and will indeed not set some stuff sometimes. Here's a better version that fixes that:

public List<Attachment> download(List<Attachment> attachments) {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Attachment>> futures = new ArrayList<Future<Attachment>>();
for (final Attachment attachment : attachments) {
futures.add(executorService.submit(new Callable<Attachment>() {
@Override
public Attachment call() throws Exception {
return doDownload(attachment);
}
}));
}
for (Future<Attachment> future: futures) {
try {
future.get();
} catch (Exception ex) {
// Do something
}
}
return attachments;
}

private Attachment doDownload(Attachment attachment) throws Exception {
attachment.setDownStatus("Failed");
attachment.setDestLocation("C:\\Users\\attachments");
String attUrl = attachment.getUrl();
String fileName = attachment.getFileName();
URL url = new URL(attUrl);
File fileLocation = new File(attachment.getDestLocation(), fileName);
FileUtils.copyURLToFile(url, fileLocation);
if (fileLocation.exists()) {
attachment.setDownStatus("Completed");
}
return attachment;
}

However, this is absolutely not optimal given your structure of Attachment and how you use it. I did not fix that: I only answered the question as it was asked.

How can I download a single file from multiple locations via HTTP?

Assuming this is a programming question (given that this is StackOverflow) I am going to explain how instead of just linking to a download accelerator that takes advantage of this.

What is needed in terms of the server to do this?

  • A server that supports Range HTTP header.
  • A server that allows for concurrent connections. It is possible to support Range while not allowing multiple simultaneous connection by using either endpoint or IP based restrictions server side. For this reason, I recommend you set up a simple test server instead of downloading from a file sharing site while testing this.

What is the Range Header?

Data transmission over HTTP is sent in order starting from the beginning of the file if the Range header is not set. The first byte of the file on the server will be the first byte of the HTTP response and the last byte of the file on the server will be the last byte of the HTTP response. The Range header allows you to specify where the bytes should start sending from allowing you to "skip" the beginning of the response.

Actual Answer Example

Our Situation

The response is plain text. The response content is just one word "StackOverflow!!" encoding ASCII, meaning each character is one byte. Therefore, the Content-Length header's value is 15 octets (another term for bytes).

We are going to download this file using 3 requests. For the sake of this example, we are going to say it will be 3 times faster but you should realize that this method will make downloads slower for very small files. This is because HTTP headers must be sent with each request as well as the 3-way handshake. We will also assume that the server supports HEAD requests and that the Content-Length header is sent with the download response. Finally, this request will be preformed using GET for reasons of HEAD requests. However, there are workarounds for POST.

Juicy Details

First, perform an HTTP HEAD request. Take the "Content-Length" header and divide that value by the amount of concurrent parallel connections you wish to make. For this example, the Content-Length is 15 and we wish to make 3 connections so the divided value will be 5.

Now preform the amount of requests you wished to preform parallel. With each request, set the Range header to "Range: bytes=" followe by how many requests have already been made times the divided value found above. Then append "-" followed by the value you just determined plus the divided value.
For this example, each request should have the header set as followed.

  1. Range: bytes=0-5
  2. Range: bytes=5-10
  3. Range: bytes=10-15

The response of each of these requests should be

  1. Stack
  2. Overf
  3. low!!

In essence, we are just conforming to Range specification (section 3.12 of RFC 2616) as well as Byte Range specification (section 14.35 of RFC 2616).

Finally, append the bytes of each request to form the final response data.

Disclaimer: I've never actually tried this but it should work in theory

Downloading a large file in parts using multiple parallel threads

Here is a version using Python 3 with Asyncio, it's just an example, it can be improved, but you should be able to get everything you need.

  • get_size: Send an HEAD request to get the size of the file
  • download_range: Download a single chunk
  • download: Download all the chunks and merge them
import asyncio
import concurrent.futures
import functools
import requests
import os

# WARNING:
# Here I'm pointing to a publicly available sample video.
# If you are planning on running this code, make sure the
# video is still available as it might change location or get deleted.
# If necessary, replace it with a URL you know is working.
URL = 'https://download.samplelib.com/mp4/sample-30s.mp4'
OUTPUT = 'video.mp4'

async def get_size(url):
response = requests.head(url)
size = int(response.headers['Content-Length'])
return size

def download_range(url, start, end, output):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers)

with open(output, 'wb') as f:
for part in response.iter_content(1024):
f.write(part)

async def download(run, loop, url, output, chunk_size=1000000):
file_size = await get_size(url)
chunks = range(0, file_size, chunk_size)

tasks = [
run(
download_range,
url,
start,
start + chunk_size - 1,
f'{output}.part{i}',
)
for i, start in enumerate(chunks)
]

await asyncio.wait(tasks)

with open(output, 'wb') as o:
for i in range(len(chunks)):
chunk_path = f'{output}.part{i}'

with open(chunk_path, 'rb') as s:
o.write(s.read())

os.remove(chunk_path)

if __name__ == '__main__':
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
loop = asyncio.new_event_loop()
run = functools.partial(loop.run_in_executor, executor)

asyncio.set_event_loop(loop)

try:
loop.run_until_complete(
download(run, loop, URL, OUTPUT)
)
finally:
loop.close()


Related Topics



Leave a reply



Submit