call 9884218531
Hadoop Made it easy
FAQS and Interview questions
What is Big data ?
New technology or concept
Handling large complex dataset to create value for the business
Volume of various data which comes in more velocity gives value to the business through analysis
Handling -- storage in distributed file system and processing thru programming technique .
What is Hadoop ?
Open source Framework ,cluster friendly ,cross platform

Hadoop is for ETL or ELT ?
EsTsL ---> s for store
How does a jobtracker choose a (a) reduce task? (b) a map task?
(a) Reduce task - takes the next in the list. (No data locality considerations)
(b) Map task - Takes into account the tasktrackers network locations and picks a task whose input split is as close as possible to to the tasktracker.
What happens when a jobtracker
receives a notification that the last task for a job is compete?
1. The jobtracker
changes the status of the job to "successful"
2. When the Job object polls for status, it prints a message to tell the user
and returns from the waitingForCompletion() method.
3. Job Statistics and Counters are printed
4. (optional) HTTP job notification (set job.end.notification.url)
5. Cleans up working sate for the job and instructs tasktrackers to do the
same.
At how many nodes does MapReduce1 hit scaleability bottlenecks?
over 4,000 nodes
What are the YARN entities?
1. The Client - submits the MapReduce job(application)
2. resource manager - coordinates allocation of compute resources
3. node managers - launch and monitor the compute containers on machines in the cluster
4. application master - coordinates the tasks running the MapReduce application. The application master and MapReduce tasks run in containers that are scheduled by the resource manager and managed by node managers.
5. Distributed Filesystem
How does job submission work in YARN?
1. new JobID retrieved form the resource manager (called applicationID)
2. The job client checks the output specification, computes input splits and copies job resources to HDFS
3. The job is submitted by calling submitApplication() on the resource manager
How does Job Initialization work in MapReduce 1?
1. The job is put into a queue from where the JobScheduler will pick it up and initialize it. Creates an object to represent the job being run. It encapsulates its task and bookkeeping information to keep track of status of tasks.
2. The Job Scheduler receives the input splits computed by the client from the shared filesystem and creates one map task for each split. Reduce tasks are created.
The job setup task is created which is run by the tasktrackers to setup before and after a job. The job cleanup task is created to delete the temporary workspace for the task output.
3. The tasktrackers send a heartbeat to the jobtracker and is also used to communicate when a new task is ready. The jobtracker will choose a job and then a task within to send in response.
2. The Job Scheduler receives the input splits computed by the client from the shared filesystem and creates one map task for each split. Reduce tasks are created.
The job setup task is created which is run by the tasktrackers to setup before and after a job. The job cleanup task is created to delete the temporary workspace for the task output.
3. The tasktrackers send a heartbeat to the jobtracker and is also used to communicate when a new task is ready. The jobtracker will choose a job and then a task within to send in response.
How does job initialization work in YARN?
1. The scheduler allocates a container and the resource manager then launches the application master's process there, under the node managers management.
2. The application master creates a map task object for each split, as well as a number of reduce tasks (configured: mapreduce.job.reduces)
3. The application master decides if tasks will be run on the same JVM as itself (uberized) or paralell.
Uberized - small, less than 10 mappers and 1 reducer. Input size of less than a block
2. The application master creates a map task object for each split, as well as a number of reduce tasks (configured: mapreduce.job.reduces)
3. The application master decides if tasks will be run on the same JVM as itself (uberized) or paralell.
Uberized - small, less than 10 mappers and 1 reducer. Input size of less than a block
How does task assignment work in YARN? (only if not ubertask)
1. The application master requests containers for all MapReduce tasks in the job from the resource manager. All requests, piggybacked on heartbeat calls, include information about each map tasks data locality and memory requirements for tasks.
2. The scheduler uses locality information to make placement decisions
How does memory utilization in MapReduce 2 get rid of previous memory issues?
Resources are more fine grained instead of having a set number of blocks at a fixed memory amount. With MR2 applications can request a memory capability that is between the min and max allocation set. Default memory allocations are scheduler specific. This removes the previous problem of tasks taking too little/too much memory because they were forced to use a fixed amount.
How does Job Submission work in MapReduce 1?
Job.submit() creates JobSubmitter instance.
The JobSubmitter:
1. calls submitJobInternal()
2. Asks the jobtracker for a new job ID (JobTracker.getNewJobID()) and computes the input splits, if they cant be computed, the job is cancelled. It checks the output specifications to make sure the output dir does not exist.
3. Copies the resources needed to run the job to the jobtracker. The job JAR is copied at a high replication factor. (default = 10 mapred.submit.replication) Why? - so that they are readily available for multiple tasktrackers to access.
4. Tells the jobtracker that the job is ready for execution. JobTracker.submitJob().
How does task execution work in YARN?
1. Once a task is assigned to a container, via the resource managers scheduler, the application master starts the container by contacting the node manager. The container is started via the Java App YARNChild which localizes resources and runs the mapreduce task. YARN does not support JVM reuse.
How do status updates work with YARN?
A Task reports its progress and status back to the application master which has an aggregate view. It is sent every 3 seconds over the umbilical interface.
How often does the client poll the application master?
Every second (mapreduce.client.progress.monitor.pollinterval)
What is the difference in Web UIs MR1vs MR2 ?
JobTracker web UI - list f jobs
Resource Manager web UI - list of running applications with links to their respected application masters, which shows progress and further info.
What happens during a Job Completion in MR2?
(a) Before
(b) During
(c) After
(a) Client cals Job.waitForCompletion() as well as polling the application master every 5 seconds via (mapreduce.client.completion.pollinterval)
(b) Notification via HTTP callback from the application master
(c) Application master and task containers clean up their working state. Job information is archived by the job history server.
How are hanging tasks dealt with in MR1 ?
Once the timeout time has been reached (default = 10 mins) the tasktracker marks the task as failed. The child JVM will be killed automatically.
Timeout is set ( mapred.task.timeout ). Can be set to 0, although not advised because it would slot down slot allocation.
How might a job fail? (MR1)
1. User code throws a runtime exeception - childJVM reports error to tasktracker before it exists. Tasktracker marks task as failed. The error ends up in user logs.
2. Streaming - if streaming process ends with nonzero exit code, is marked as failed. (streaming.non.zero.exit.is.failure. = true)
3. Sudden exit of childJVM - tasktracker notices exit and marks as failed.
What does a Jobtracker do when it is notified of a task attempt which has failed?
How many times will a task be re-tried before job failure?
What are 2 ways to configure failure conditions?
1. It will reschedule the task on a new tasktracker (node)
2. 4 times (default)
3. mapred.map.max.attempts
mapred.reduce.max.attempts
If tasks are allowed to fail to a certain percentage
mapred.max.map.failures.percentage
mapred.max.reduce.failures.percentage
Note: Killed tasks do not count as failures.
How is tasktracker failure handled?
If the heartbeat isn't sent to jobtracker in 10secs (mapred.task.tracker.expiry.interval) The jobtracker removes it from the pool. Any tasks running when removed from the pool have to be re-run.
When are tasktrackers blacklisted? How do blacklisted tasktrackers behave?
1. If more than 4 tasks from the same job fail on a particular tasktracker, (mapred.max.tracker.failures) the jobtracker records this as a fault. If the number of faults is over the minimum threashold (mapred.max.tracker.blacklists) default 4, the tasktracker is blacklisted.
2.They are not assigned tasks. They still communicate with the jobtracker. Faults expire over time (1 per day) so they will get a chance to run again. If the fault can be fixed (ex: hardware) when it restarts it will be re-added.
How does MR1 handle jobtracker failure?
It is a single point of failure, however it is unlikely that particular machine will go down. After restarting, all jobs need to be resubmitted.
How does MR2 handle runtime exception/failure and sudden JVM exits?
Hanging tasks?
What are criteria for job failure?
1. The application master marks them as failed.
2. The application master notices an absence of ping over umbilical channel, task attempt is marked as failed.
3. Same as MR1, same config options. A task is marked as failed after 4 attempts or percentage map/reduce tasks fail
Application Master Failure.
1. Applications are marked as failed if they fail ____ .
2. What happens during failure?
3. How does the MapReduce application manager recover state of which tasks were run successfully?
4. How does the client find a new application master?
1. Once
2. Resource manager notices missing heartbeat from application master and starts a new instance of the master running in a new container. (managed by the node manager)
3. If recovery is enabled (yarn,app.mapreduce.am.job.recovery.enabled to true)
4. During a job initialization, the client asks the resource manager for the application masters addresses and caches them. On failure the client experiences a timeout when it issues a status update where it goes back to the resource manager to find a new address.
When are node managers blacklisted? By what?
If more than 3 tasks fail (mapreduce.job.maxtaskfailures.per.tracker)
by the application master
How is resource manager failure handled?
It is designed to recover by using a checkpoint mechanism to save state. After a crash a new instance is brought up (by the administrator) and it recovers from saved state (consisting of node managers and applications but not tasks which are managed by the application manager) The storage the resource manager uses is configurable (org.apache.hadoop.yarn.server.resourcemanager.recovery.memstore) keeps it in memory therefore its not highly available.
How does a FIFO Scheduler work?
Typically each job would use the whole cluster so jobs had to wait their turn. Has the ability to set a job's priority (very high, high, normal. low, very low) It will choose the highest tasks first, but no preemption (one its running, it can't be replaced)
How does the Fair Scheduler work?
Every user gets a fair share of the cluster capacity over time.
A single job running on the cluster would use full capacity.
A short job belonging to one user will complete in a reasonable time, even while another users long job is running.
Jobs are placed in pools and by default each user gets their own pool. Its possible to create custom pools with a minimum value.Supports preemption - if a pool hasn't received its fair share over time, the scheduler will kill tasks in pools running over capacity in order to give more slots to under capacity pools.
How do you set a MapReduce taskscheduler?
mapred.jobtracker.taskscheduler = org.apache.hadoop.mapred.FairScheduler
How does the Capacity Scheduler work?
A cluster is made up of a number of queues which may be hierarchical and each queue has a capacity.Within each queue jobs are scheduled using FIFO scheduling (with priorities) Allows users (defined by queues) to simulate separate clusters. Does not enforce fair sharing like the Fair Scheduler.
How does the map portion of the MapReduce write output?
1. Each map task has a circular memory buffer that writes output 100MB by default (io.sort.mb)
2. When contents of the buffer meet threshold size (80% default io.sort.spill.percent) a background thread will start to spill the contents to disk. Map outputs continue to be written to the buffer while the spill is taking place. If the buffer fills up before the spill is complete, it will wait.
3. Spills are written round robin to directories specified (mapred.local.dir) Before it writes to disk, the thread divides the data into partitions based on reducer and then the partition is sorted by key. If a combiner exists, it is then run.
4. Each time the memory buffer reaches spill threshold, a new spill file is created. There are at least 3 spill files. (min.num.spills.for.combine) All spill files are combined into a single partitioned and sorted output file.
5. It is a good idea to compress output (Not set by default)
6. Output file partitions are made available to reducers via HTTP. The max amount of worker threads used to serve partitions is controlled by tasktracker.http.threads = 40 (default) (setting per tasktracker, not per map) This is set automatically in MR2 by the number of processors on the machine. (2 x amt of processors)
How does the reduce side of the Shuffle work?
(Copy Phase)
(Copy Phase)
1. Copy Phase - after a map task completes, the reduce task starts copying their outputs. Small numbers of copier threads are used so it can fetch output in parallel. (default = 5 mapred.reduce.parallel.copies)
The output is copied to the reduce task JVM's memory. (if its small enough) otherwise, its copied to disk. When the in-memory buffer reaches threshold size or reaches threshold number of map outputs it is merged and spilled to disk.
mapred.job.shuffle.merge.percent
mapred.inmem.merge.threshold
A combiner would be run here if specified. Any map outputs that were compressed have to be decompressed in memory. When all map outputs have been copied we continue to the Sort phase.
The output is copied to the reduce task JVM's memory. (if its small enough) otherwise, its copied to disk. When the in-memory buffer reaches threshold size or reaches threshold number of map outputs it is merged and spilled to disk.
mapred.job.shuffle.merge.percent
mapred.inmem.merge.threshold
A combiner would be run here if specified. Any map outputs that were compressed have to be decompressed in memory. When all map outputs have been copied we continue to the Sort phase.
How does the reduce side of the Shuffle work?
(Sort Phase & Reduce phase)
2. Sort Phase - (merge phase) This is done in rounds. The number of map outputs/merge factor(io.sort.factor default 10)
50/10 = 5 rounds. (5 intermediate files)
3. Reduce Phase - reduce function is invoked for every key of ouput. The result is written to the output filesystem, typically HDFS
Configuration Tuning Principals
(General & Map side)
General - Give the shuffle as much memory as possible, however you must make sure your map/reduce functions get enough memory to operate
The amount of memory given to the JVM in which map/reduce tasks run is set by mapred.child.java.opts - Make this as large as possible for the amount of memory on your task node.
Map-Side - Best performance by avoiding multiple spills to disk. One spill is optimal. io.sort.mb (increase) There is a counter that counts both map and reduce spills that is helpful.
Configuration Tuning Principals
(Reduce Side & Buffer Size)
Reduce-Side - Best performance when intermediate data can reside entirely in memory. If your reduce function has light memory requirements, you can set mapred.inmem.merge.threshold to 0 and
mapred.job.reduce.input.buffer.percent = 1.0 (or a lower value)
Buffer-Size - 4KB by default, increase it io.file.buffer
How do you access task execution from Streaming?
environment variables (ex in python: os.environ["mapred_job_id"] )
-cmdenv set environment variables via command line
What is Speculative Execution of tasks?
Hadoop detects when a task is running slower than expected and launches another equivalent task as backup. When a task completes successfully, the duplicate tasks are killed. Turned on by default. It is an optimization and not used to make tasks run more reliably.
What are reasons to turn off Speculative Execution?
1. On a busy cluster it can reduce overall throughput since there are duplicate tasks running. Admins can turn it off and have users override it per job if necessary.
2. For reduce tasks, since duplicate tasks have to transfer duplicate map inputs which increases network traffic
3. Tasks that are not idempotent. You can make tasks idempotent using OutputComitter. Idempotent(def) apply operation multiple times and it doesn't change the result.
What are OutputCommitters used for?
How do you implement it?
(a) A commit protocol to ensure that jobs or tasks either succeed or fail cleanly. The framework ensures that the event of multiple task attempts for a particular task, only one will be committed, others will be aborted.
(b) old - JobConf.setOutputComitter() or mapred.output.comitter
new - OutputComitter is decided by the OutputFormat, using getOutputComitter() (default FileOutputComitter)
How do we ensure that multiple instances of the same task don't try to write to the same file?
Tasks write to their working directory, when they are committed, the working directory is promoted to the output directory.
How do you configure JVM reuse?
1. mapred.job.reuse.jvm.num.tasks - map number of tasks to run for a given job for each JVM launched (default =1) There is no distinction between map/reduce tasks, however tasks from different jobs are always run in separate JVMs. If set to -1, no limit and the same JVM may be used for all tasks of a job.
2. JobConf.setNumTasksToExecutePerJVM()
How do you handle corrupt records that are failing in the mapper and reducer code?
Detect and ignore
Abort job, throwing an Exception
Count the total number of bad records in the jobs using Counters to see how widespread the problem is.
What can you use when 3rd party library software is causing bad records that can't be intercepted in the mapper/reducer code?
Skipping Mode - to automatically skip bad records. Enable it and use the SkipBadRecords Class. A task reports the records that are passed back to the tasktracker. Because of extra network traffic and bookkeeping to maintain the failed record ranges, skipping mode is only enabled after 2 failed task attempts. Skipping mode can only detect one bad record per task attempt. (good for catching occasional record errors) To give skipping mode enough attempts to detect and skip all bad records in an input split, increase
mapred.map.max.attempts
mapred.reduce.max.attempts
Where are bad records stored in Hadoop?
Saved as SequenceFiles in the jobs output directory under _logs/skip
Skipping mode is not supported in the new MapReduce API.
When running under the local jobrunner, how many reducers are supported?
0 or 1
What are MapReduce defaults?
(a) job.setInputFormatClass()
(b) job.setMapperClass()
(c) job.setOutputKeyClass()
(d) job.setOutputValueClass()
(a) TextInputFormat.class
(b) Mapper.class
(c) LongWritable.class
(d) Text.class
What are MapReduce defaults?
(e) job.setPartitionerClass()
(f) job.setNumReduceTasks()
(g) job.setReducerClass()
(h) job.setOutputFormatClass()
(e) HashPartitioner.class - hashes a records key to determine which partition the record belongs in
(f) 1
(g) Reducer.class
(h) TextOutputFormat.class (tab delimited)
The number of map tasks is driven by what?
The number of partitions is equal to?
The number of input splits, which is dictated by the size of inputs and block size.
The number of reduce tasks for the job.
What is the behavior of the HashPartitioner?
With multiple reducers, records will be allocated evenly across reduce tasks, with all records that share the same key being processed by the same reduce task.
How do you calculate the optimal number of reducers?
Total number of available reducer slots = # nodes in the cluster x # of slots per node (mapred.tasktraker.reduce.tasks.maximum)
Then slightly fewer reducers than total slots, whih gives you one wave of reduce tasks.
What are the default Streaming attributes?
(a) input
(b) output
(c) inputFormat
(d) mapper
(a) input/ncdc/sample.txt
(b) output
(c) org.apache.hadoop.mapred.TextInputFormat
(d) /bin/cat
What are the default Streaming attributes?
(e) partitioner
(f) numReduceTasks
(g) reducer
(h) outputFormat
(e) org.apache.hadoop.mapred.lib.HashPartitioner
(f) 1
(g) org.apache.hadoop.mapred.lib.IdentityReducer
(h) org.apache.hadoop.mapred.TextOutputFormat
Great article. It helps to explain complicated things to people who are not competent and explicate why this is important.
ReplyDeletedue diligence data room