Tuesday, May 27, 2008

Bulk Upload to Amazon SimpleDB

This weekend I was helping a friend with loading some data to Amazon's SimpleDB. The problem was fairly simple. He had a flat file with 170K lines of data. Each line represented a video from YouTube along with some metadata about the video. He wanted to turn that file into a "table" on SimpleDB, where each line (video) from the file would become a "row" in the table.

I decided to use Java for the task. I found a useful Java library for using SimpleDB. Some users of the library didn't like it, as it uses JAXB to turn Amazon's XML based API into a Java based API directly. That didn't bother me so I used it.

I wrote a quick program to do the upload. I knew it would take a while to run, but didn't think too much about it. I had some other things to do, so I set it running. Some three hours later, it was still going. I felt pretty silly. I should have done some math on how long this was going to take. So I scrapped it and adjusted my program.

Amazon has no bulk API, and this is the source of the problem. So you literally have to add one item at a time to SimpleDB. The best I could do was to parallelize the upload, i.e. load multiple items simultaneously, one per thread. Java's concurrency APIs made this very easy. Here is the code that I wrote.

import java.io.BufferedReader;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import com.amazonaws.sdb.AmazonSimpleDB;

import com.amazonaws.sdb.AmazonSimpleDBClient;
import com.amazonaws.sdb.AmazonSimpleDBException;

import com.amazonaws.sdb.model.CreateDomain;
import com.amazonaws.sdb.model.CreateDomainResponse;

import com.amazonaws.sdb.model.PutAttributes;
import com.amazonaws.sdb.model.ReplaceableAttribute;

import com.amazonaws.sdb.util.AmazonSimpleDBUtil;


public class Parser {

private static final String DATA_FILE="your file here";
private static final String ACCESS_KEY_ID = "your key here";
private static final String SECRET_ACCESS_KEY = "your key here";
private static final String DOMAIN = "videos";
private static final int THREAD_COUNT = 40;

public static void main(String[] args) throws Exception{

List<Video> videos = loadVideos();
AmazonSimpleDB service =
new AmazonSimpleDBClient(ACCESS_KEY_ID, SECRET_ACCESS_KEY);
setupDomain(service);
addVideos(videos,service);
}


private static List<Video> loadVideos() throws IOException {

InputStream stream =
Thread.currentThread().getContextClassLoader().getResourceAsStream(DATA_FILE);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
List<Video> videos = new ArrayList<Video>();
String line = reader.readLine();
while (line != null){

Video video = Video.parseVideo(line);
videos.add(video);
line = reader.readLine();
}

return videos;
}

// This creates a table in SimpleDB
private static void setupDomain(AmazonSimpleDB service) {

CreateDomain request = new CreateDomain();
request.setDomainName(DOMAIN);
try {

CreateDomainResponse response = service.createDomain(request);
System.out.println(response);
} catch (AmazonSimpleDBException e) {

e.printStackTrace();
}
}

// adds all videos to SimpleDb
private static void addVideos(List<Video> videos, final AmazonSimpleDB service) throws Exception{

// create a thread pool
ThreadPoolExecutor pool =
new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(videos.size()));
// Create a task for each video, and give the collection to the thread pool

for (final Video v : videos){
Runnable r= new Runnable(){

public void run() {
addVideo(v, service);
}

};
pool.execute(r);
}
}

// This adds a single item to SimpleDB

private static void addVideo(Video v, AmazonSimpleDB service){

PutAttributes request = new PutAttributes();
request.setDomainName(DOMAIN);
request.setItemName(v.getVideoId());
List<ReplaceableAttribute> attrs = videoToAttrs(v);
request.setAttribute(attrs);
try {

service.putAttributes(request);
} catch (AmazonSimpleDBException e) {

e.printStackTrace();
}
}

// Turns a video into a list of name-value pairs

private static List<ReplaceableAttribute> videoToAttrs(Video v){
ReplaceableAttribute author = new ReplaceableAttribute();
author.setName("author");
author.setValue(v.getAuthor());
ReplaceableAttribute date = new ReplaceableAttribute();
date.setName("date");
date.setValue(Long.toString(v.getDate().getTime()));
// for votes we pad so we can sort

ReplaceableAttribute votes = new ReplaceableAttribute();
votes.setName("votes");
votes.setValue(AmazonSimpleDBUtil.encodeZeroPadding(v.getVotes(), 4));
return Arrays.asList(author, date, votes);
}



}



And for completeness, here is the Video class:

import java.util.Date;


public class Video {

private final String videoId;
private final int votes;
private final Date date;
private final String author;
private Video(String videoId, int votes, long date, String author) {

super();
this.videoId = videoId;
this.votes = votes;
this.date = new Date(date);
this.author = author;
}

public String getVideoId() {
return videoId;
}

public int getVotes() {
return votes;
}

public Date getDate() {
return date;
}

public String getAuthor() {
return author;
}


public static Video parseVideo(String data){
String[] fields = data.split(" ");
return new Video(fields[1], Integer.parseInt(fields[0]), 1000*Long.parseLong(fields[2]), fields[3]);
}

}


Some interesting things... I played around with the number of threads to use. Everything seemed to max out at around 3-4 threads, regardless of whether I ran it on my two core laptop or four core workstation. Something seemed amiss. I opened up the Amazon Java client code. I was pleased to see it used a multi-threaded version of the Apache HttpClient, but it was hard-coding the maximum number of connections per host to ... 3. I switched to compiling against source so I could set the maximum number of connections to be the same as the number of threads I was using.

Now I was able to achieve much better throughput. I kept number of threads and max number of http connections the same. For my two-core laptop, I got optimal throughput for 16 threads and connections. For my four-core workstation, I got optimal throughput for 40 threads and connections. I think I will re-factor the Amazon Java API and offer it to the author as a patch. There is no reason to hard code the number of connections to three, just make it configurable. The underlying HttpClient code is highly optimized to allow for this.

3 comments:

Mocky said...

From your code it doesn't like you implemented a way to explicitly monitor your throughput. Were you satisfied with the speed you were able to get in the end?

Unknown said...

Actually I had a more instrumented version of the Parser class that I used to figure out the optimal number of threads and connections. I then removed the instrumentation for the large upload. In the end, I upload the 172K records in 43 minutes on my four-core workstation. I was pleased that it was much faster than the original approach, though I had to do a lot of work to get it that fast. Amazon certainly needs a bulk API, but even a bulk API will certainly benefit from using multiple simultaneous HTTP connections.

SDB Explorer said...

SDB Explorer 2011.05.01.02 version has come up with bulk upload feature. Now user can easily upload large number of data to Amazon SimpleDB in quick steps and using easy interface. You can upload your MY SQL data, can directly edit cells or can import data from CSV file to upload a bulk data to Amazon SimpleDB. Uploading bulk data get started in queue, so that user can view his/her progress of uploading. SDB Explorer provides you better visualization and statistics of your uploading data. SDB Explorer allows you to generate item names automatically for uploading data in bulk.