首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Mapreduce过滤器数据

Mapreduce过滤器数据
EN

Stack Overflow用户
提问于 2021-04-01 13:49:14
回答 1查看 41关注 0票数 0

假设我在CSV文件中有一个如下所示的数据集:

代码语言:javascript
复制
Id, Patient cycle no, counseling
2345, 1, No
1234, 22, Yes
4567, 1, No
2378, 10, No

对于患者周期= 1的记录,我想使用map-reduce过滤出咨询为“No”的记录。

我如何才能做到这一点?

EN

回答 1

Stack Overflow用户

发布于 2021-04-01 21:49:07

过滤记录的方式有两种:

  • 仅使用Map函数,其中我们检查每一行的Patient cycle no是否等于1以及counseling是否等于Yes,而不需要任何Reduce函数实现,以及

H19使用D10函数将D11作为关键字,将D12和D13作为组合值,以便按循环编号对数据进行分组,以便我们可以检查D14是否等于Yes.

由于您可能刚刚开始学习MapReduce范式,因此我将遵循这两种方法中的第二种方法,以使MapReduce函数更加清晰(,尽管这个答案的逻辑在可伸缩性和执行时间的整体速度方面似乎效率不高,这取决于输入数据的大小和形式)。

第一步是实现Map函数,其中所要做的就是读取每个记录并重新排列其中的列,以将患者的周期作为关键字。为了防止我们的手被复合值弄得“脏”,Idcounseling字段将被放在由_字符分隔的value字段中,作为分隔符。因此,映射器的输出键-值对将遵循以下模式:

<Patient cycle no, (Id, counseling)>

对于Reduce函数,所有的键值对都按键进行了分组,具有相同键的数据被放在同一个reducer中进行处理。这基本上意味着对于每个患者周期,所有的(Id, counseling)值都存储在Reduce函数的同一实例中,我们可以使用一个简单的for循环逐个访问它们。这就是我们要做的事情:

  1. 使用_分隔符拆分合成值,
  2. 检查我们当前是否在缩减程序上,并将1患者的数据分组为周期数,如果是,则检查counseling是否等于Yes以将其存储到输出
  3. 存储具有不同周期数的所有其他记录

但是,当我们存储记录时,最好让它们与输入数据的形式相似,因此在这里,我们可以通过将patient Id作为关键字,将带有counselingPatient cycle no作为复合值(这里只使用一个空格字符的分隔符)来重新排列数据。

<Id, (Patient cycle no, counseling)>

所有这些都是使用类似下面的源代码完成的(这里的输入数据放在一个名为/patients的文件夹中):

代码语言:javascript
复制
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class PatientFilter 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <patient_cycle, (ID, counceling)>
     */
    public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> 
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            try 
            { 
                if(value.toString().contains("Patient cycle no"))   // remove header
                    return;
                else 
                {
                    String[] columns = value.toString().split(", ");   // split the columns

                    String id = columns[0];
                    int patient_cycle = Integer.parseInt(columns[1]);
                    String counceling = columns[2];

                    // rearrange the columns to put the cycles as key and the rest as a composite value
                    context.write(new IntWritable(patient_cycle), new Text(id + "_" + counceling));
                }
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
        }
    }

    /* input: <patient_cycle, (ID, counceling)>
     * output: <ID, (patient_cycle, counceling)>
     */
    public static class Reduce extends Reducer<IntWritable, Text, Text, Text>
    {
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            // for every value grouped by key...
            for(Text value : values)
            {
                String[] split_value = value.toString().split("_");     // split the composite value

                if(key.get() == 1)      // check if the key/patient cycle in this reducer is 1
                {
                    if(split_value[1].equals("Yes"))    // check if counseling for this record is equal to "Yes"
                    {
                        // for patients with just 1 cycle, only store the records that have "counseling" equal to "Yes"
                        context.write(new Text(split_value[0]), new Text(Integer.toString(key.get()) + " " + split_value[1]));
                    }
                }
                else    // store all the other records as well
                    context.write(new Text(split_value[0]), new Text(Integer.toString(key.get()) + " " + split_value[1]));
            }
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("patients");
        Path output_dir = new Path("patients_out");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Patient Filter");
        job.setJarByClass(PatientFilter.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);    
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        TextInputFormat.addInputPath(job, input_dir);
        TextOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

因此,对于如下所示的输入数据:

代码语言:javascript
复制
Id, Patient cycle no, counseling
2345, 1, No
1234, 22, Yes
4567, 1, No
2378, 10, No
4852, 2, Yes
2510, 9, Yes
6564, 5, No
3000, 1, No
6958, 45, No
1500, 1, No

输出如下所示:

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66899039

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档