java - Map reduce: how to implement a custom writable with a whole file input reader -



java - Map reduce: how to implement a custom writable with a whole file input reader -

some background: facing problem of analyzing big info of little text files. * let's each text file contains lot numbers , want able read whole file in order access of it, within mapper class. * also, want create profile each text file. - mapper input single file. - mapper output next pair: when profile writable object. - reducer input be: - reducer output: these requirements.. know whole file reading isn't efficient, of import me that.

i wrote code, run time error in bold line. (tripprofile = values.next() - within reducer class).

i don't think it's writable issue, think writable profile fine, fields writable , has 2 methods of read , write.. suspect has "wholefilereader".

thanks lot,

public class examplefileinputformat extends configured implements tool { /** * class: "input format" responsible validating input * configuration. * 2. split input blocks , files logical chunks. * 3. create record reader used create key/value pairs * raw inputsplit. * these pairs sent 1 1 mapper. */ public static class wholefileinputformat extends fileinputformat<nullwritable, byteswritable> { // file isn't splittable: @override protected boolean issplitable(filesystem fs, path filename) { homecoming false; } // recordreader used create key/value pairs inputsplit. // pairs send mapper 1 one. @override public recordreader<nullwritable, byteswritable> getrecordreader( inputsplit split, jobconf job, reporter reporter) throws ioexception { homecoming new wholefilerecordreader((filesplit) split, job); } } // recordreader used create key/value pairs inputsplit. // pairs send mapper 1 one. public static class wholefilerecordreader implements recordreader<nullwritable, byteswritable> { private filesplit filesplit; private configuration conf; private boolean processed = false; public wholefilerecordreader(filesplit filesplit, configuration conf) throws ioexception { this.filesplit = filesplit; this.conf = conf; } @override public nullwritable createkey() { homecoming nullwritable.get(); } @override public byteswritable createvalue() { homecoming new byteswritable(); } @override public long getpos() throws ioexception { homecoming processed ? filesplit.getlength() : 0; } @override public float getprogress() throws ioexception { homecoming processed ? 1.0f : 0.0f; } @override public boolean next(nullwritable key, byteswritable value) throws ioexception { if (!processed) { byte[] contents = new byte[(int) filesplit.getlength()]; path file = filesplit.getpath(); filesystem fs = file.getfilesystem(conf); fsdatainputstream in = null; seek { in = fs.open(file); ioutils.readfully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } { ioutils.closestream(in); } processed = true; homecoming true; } homecoming false; } @override public void close() throws ioexception { // nil } } /* </generics> */ /* <task1>: */ public static class examplemap extends mapreducebase implements mapper<nullwritable, byteswritable, text, profile> { private final static intwritable 1 = new intwritable(1); private text drivernum; private tripprofilebuilder tprobuilder; public void map(nullwritable key, byteswritable value, outputcollector<text, profile> output, reporter reporter) throws ioexception { // path , file name: path path = ((filesplit) reporter.getinputsplit()).getpath(); tprobuilder = new tripprofilebuilder(path.getname()); drivernum = new text(path.getparent().getname()); system.out.println("mapper: file: " + path.getname() + " driver name: " + drivernum.tostring()); // bytes value of file content: byte[] content = value.getbytes(); inputstream = null; // reading content string line = null; seek { = new bytearrayinputstream(content); bufferedreader bufreader = new bufferedreader( new inputstreamreader(is)); line = bufreader.readline(); // not including '\n' or null. while (line != null) { // split , x,y values (in string form). string[] values = line.split(","); // if legal values: assign tripprofilebuilder if ((!values[0].equals("x") && (values.length == 2))) { // system.out.println("data recovered: " + values[0] + // " " // + values[1]); tprobuilder.assignnextxy(values[0], values[1]); } line = bufreader.readline(); // not including '\n' or null. } } grab (ioexception e) { e.printstacktrace(); } { seek { if (is != null) is.close(); } grab (exception ex) { system.out.println("exception while reading files"); } } profile summary = tprobuilder.getsummary(); output.collect(drivernum, summary); } } /** * need iterate value of driver. * than, combine values 1 profile each driver. */ public static class examplereduce extends mapreducebase implements reducer<text, profile, text, profile> { private profile driverprofile; private double mdriverdistance, mdriveravgspeed, mdrivertotaltime, mdrivermaxspeed; public void reduce(text drivername, iterator<profile> values, outputcollector<text, profile> output, reporter reporter) throws ioexception { profile tripprofile = null; // step 1: combining characteristics: system.out.println("reducer: driver: " + drivername); while (values.hasnext()) { *****tripprofile = values.next();***** system.out.println("profile value: " + tripprofile.tostring()); mdriverdistance += tripprofile.getmsofardistance(); mdriveravgspeed += tripprofile.getmavgspeed(); mdrivertotaltime += tripprofile.getmsofartime(); mdrivermaxspeed += tripprofile.getmmaxspeed(); } mdriveravgspeed /= mdrivertotaltime; // calc avg speed. // step 2: each driver, save pair <driver-name, summary-profile> driverprofile = new profile(new text("summary"), mdriverdistance, mdriveravgspeed, mdrivertotaltime, mdrivermaxspeed); output.collect(drivername, driverprofile); } } /* </task1> */ /* <run> */ public int run(string[] args) throws exception { string inputpath = args[0]; string outputpath = args[1]; deleteoldoutput(outputpath); // new job configuration: jobconf conf = new jobconf(examplefileinputformat.class); conf.setjobname("firstjob"); // define paths input , output: fileoutputformat.setoutputpath(conf, new path(outputpath)); fileinputformat.setinputpaths(conf, new path(inputpath)); // input / output format: conf.setinputformat(wholefileinputformat.class); conf.setoutputformat(textoutputformat.class); // format? conf.setmapperclass(examplemap.class); conf.setreducerclass(examplereduce.class); /* output: key:text -> value:integer */ conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(profile.class); conf.setmapoutputvalueclass(profile.class); conf.setmapoutputkeyclass(text.class); jobclient.runjob(conf); homecoming 0; }

java hadoop

Comments

Popular posts from this blog

java - How to set log4j.defaultInitOverride property to false in jboss server 6 -

c - GStreamer 1.0 1.4.5 RTSP Example Server sends 503 Service unavailable -

Using ajax with sonata admin list view pagination -