hadoop - Mapreduce java heap space error during reduce phase -
hadoop - Mapreduce java heap space error during reduce phase -
i have simple mapreduce job building tfidf index end java heap space error when reducer @ approx. 70%. tried different approaches, using sorts of structures, telling job utilize more memory in command , running job on smaller sample nil changed slightly. @ end of ideas appreciate tips going on.
mapper produces right output reducer fails due java heap space error.
this command running (i trying specify amount of memory used): hadoop jar wordcountmpv1.jar -d mapreduce.map.memory.mb=2048 -d mapreduce.reduce.memory.mb=2048 --input /user/myslima3/wiki2 --output /user/myslima3/index
my whole mapreduce code:
public class indexer extends configured implements tool { /* * vocabulary: key = term, value = index */ private static map<string, integer> vocab = new hashmap<string, integer>(); private static map<string, double> mapidf = new hashmap<string, double>(); private static final int doc_count = 751300; // total number of documents public static void main(string[] arguments) throws exception { system.exit(toolrunner.run(new indexer(), arguments)); } public static class comparator extends writablecomparator { protected comparator() { super(text.class, true); } @override public int compare(writablecomparable a, writablecomparable b) { homecoming -a.compareto(b); } } public static class indexermapper extends mapper<object, text, intwritable, text> { private text result = new text(); // load vocab distributed cache public void setup(context context) throws ioexception { configuration conf = context.getconfiguration(); filesystem fs = filesystem.get(conf); uri[] cachefiles = distributedcache.getcachefiles(conf); path getpath = new path(cachefiles[0].getpath()); bufferedreader bf = new bufferedreader(new inputstreamreader( fs.open(getpath))); string line = null; while ((line = bf.readline()) != null) { stringtokenizer st = new stringtokenizer(line, " \t"); int index = integer.parseint(st.nexttoken()); // first token line number - term id string word = st.nexttoken(); // sec element term double idf = integer.parseint(st.nexttoken()); // 3rd token df // compute idf idf = (math.log(doc_count / idf)); mapidf.put(word, idf); // save vocab vocab.put(word, index); } } public void map(object key, text value, context context) throws ioexception, interruptedexception { // init tf map map<string, integer> maptf = new hashmap<string, integer>(); // parse input string stringtokenizer st = new stringtokenizer(value.tostring(), " \t"); // first element doc index int index = integer.parseint(st.nexttoken()); //sb.append(index + "\t"); // count term frequencies string word; while (st.hasmoretokens()) { word = st.nexttoken(); // check if word in vocabulary if (vocab.containskey(word)) { if (maptf.containskey(word)) { int count = maptf.get(word); maptf.put(word, count + 1); } else { maptf.put(word, 1); } } } // compute tf-idf double idf; double tfidf; int wordindex; (string term : maptf.keyset()) { int tf = maptf.get(term); if (mapidf.containskey(term)) { idf = mapidf.get(term); tfidf = tf * idf; wordindex = vocab.get(term); context.write(new intwritable(wordindex), new text(index + ":" + tfidf)); } } } } public static class indexerreducer extends reducer<intwritable, text, intwritable, text> { @override public void reduce(intwritable key, iterable<text> values, context context) throws ioexception, interruptedexception { // reset vocab , maps cut down memory vocab = null; mapidf = null; stringbuilder sb = new stringbuilder(); (text value : values) { sb.append(value.tostring() + " "); } context.write(key, new text(sb.tostring())); } } @override public int run(string[] arguments) throws exception { argumentparser parser = new argumentparser("textpreprocessor"); parser.addargument("input", true, true, "specify input directory"); parser.addargument("output", true, true, "specify output directory"); parser.parseandcheck(arguments); path inputpath = new path(parser.getstring("input")); path outputdir = new path(parser.getstring("output")); // create configuration. configuration conf = getconf(); // add together distributed file vocabulary distributedcache .addcachefile(new uri("/user/myslima3/vocab.txt"), conf); // create job. job job = new job(conf, "wordcount"); job.setjarbyclass(indexermapper.class); // setup mapreduce. job.setmapperclass(indexermapper.class); job.setreducerclass(indexerreducer.class); // sort output words in reversed order. job.setsortcomparatorclass(comparator.class); job.setnumreducetasks(1); // specify (key, value). job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(text.class); // input. fileinputformat.addinputpath(job, inputpath); job.setinputformatclass(textinputformat.class); // output. fileoutputformat.setoutputpath(job, outputdir); job.setoutputformatclass(textoutputformat.class); filesystem hdfs = filesystem.get(conf); // delete output directory (if exists). if (hdfs.exists(outputdir)) hdfs.delete(outputdir, true); // execute job. homecoming job.waitforcompletion(true) ? 0 : 1; } }
thanks help!
edit: stacktrace
15/04/06 10:54:38 info mapreduce.job: map 0% cut down 0% 15/04/06 10:54:52 info mapreduce.job: map 25% cut down 0% 15/04/06 10:54:54 info mapreduce.job: map 31% cut down 0% 15/04/06 10:54:55 info mapreduce.job: map 50% cut down 0% 15/04/06 10:54:56 info mapreduce.job: map 55% cut down 0% 15/04/06 10:54:58 info mapreduce.job: map 58% cut down 0% 15/04/06 10:55:00 info mapreduce.job: map 63% cut down 0% 15/04/06 10:55:07 info mapreduce.job: map 69% cut down 0% 15/04/06 10:55:08 info mapreduce.job: map 82% cut down 0% 15/04/06 10:55:10 info mapreduce.job: map 88% cut down 0% 15/04/06 10:55:11 info mapreduce.job: map 96% cut down 0% 15/04/06 10:55:12 info mapreduce.job: map 100% cut down 0% 15/04/06 10:55:25 info mapreduce.job: map 100% cut down 29% 15/04/06 10:55:31 info mapreduce.job: map 100% cut down 36% 15/04/06 10:55:34 info mapreduce.job: map 100% cut down 48% 15/04/06 10:55:37 info mapreduce.job: map 100% cut down 61% 15/04/06 10:55:40 info mapreduce.job: map 100% cut down 68% 15/04/06 10:55:43 info mapreduce.job: map 100% cut down 71% 15/04/06 10:55:44 info mapreduce.job: task id : attempt_1427101801879_0658_r_000000_0, status : failed error: java heap space
look more closely @ stringbuffer gets appended in reducer. don't specify initial size (i think) defaults 16. grows needs re-create larger , larger buffer end buffers of length 16, 32, 48, 64, ... (not sure growth amount, picture). anyway, big number of values passed reducer can cause lot of memory used , garbage collection can handle of until stringbuffer gets big can't grow. in other words, doesn't scale well.
given chosen algorithm, however, can suggest seek giving big initial size see if can lucky , forcefulness growth happens fit in available memory.
failing that, might able create special outputformat that's able concatenate values they're written , creating new line when key changes, haven't thought 1 way through.
java hadoop mapreduce
Comments
Post a Comment