mit6.824学习 lab1 mapreduce part 3

  • Map/Reduce一个大的卖点是开发者不需要在多台机器上面并行运行它们的代码。在理论上,我们可以运在本地运行word count,然后可以自动并行化。

  • 我们现在的实现是在同一个master上面一个接着一个的运行全部的map和reduce任务。虽然在概念上这种方法十分简单,但是性能不是很好。在这部分实验中,你将会完成另外一个版本的MapReduce,将工作分散到一系列的工作线程,为了充分利用多核的作用。虽然工作不是分布在真正的多机上面,不过你的实现可以使用RPC和Channel来模拟一个真正的分布式实现。

  • 为了协调任务的并行执行,我们将会使用一个特殊的线程,它将会给workers分配工作,然后等待它们完成。为了让实验更加真实,master只能通过RPC和workers交互。我们在mapreduce/worker.go文件中提供了worker的代码,代码会启动workers,然后处理RPC消息(mapreduce/common_rpc.go)。

  • 你的工作是完成mapreduce包中的schedule.go。尤其,你需要修改schedule.go文件中的schedule()函数用于分配map和reduce任务给workers,直到全部任务完成才返回。
    所以我们先来看看测试里面的方法来对脉络进行梳理:

func TestParallelBasic(t *testing.T) {
    mr := setup()
    for i := 0; i < 2; i++ {
        go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
            MapFunc, ReduceFunc, -1, nil)
    }
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

首先执行setp函数 然后再丢两个协程进行RunWorker.
看下setup 它会调用Distributed

func setup() *Master {
    files := makeInputs(nMap)
    master := port("master")
    mr := Distributed("test", files, nReduce, master)
    return mr
}
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
    mr = newMaster(master)
    mr.startRPCServer()
    go mr.run(jobName, files, nreduce,
        func(phase jobPhase) {
            ch := make(chan string)
            go mr.forwardRegistrations(ch)
            //此处调用我们写的schedule函数。。所以这里主要是完成这个工作。 并且会把相应的jobname 文件,nreduce个数。还有最重要的ch(mr转发的worker的注册信息,可以直接通过rpc链接到worker,进行发布task。),并且这里面的ch 是一个多级无缓冲的并且阻塞的一个通道,而且这些worker 进程可以是在调度进程起来之前,也可能实在调度进程起来之后发送这个消息。所以要注意,不停的要 从里面取worker的信息。同时分配task.
            schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
        },
        func() {
            mr.stats = mr.killWorkers()
            mr.stopRPCServer()
        })
    return
}
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

    mr.doneChannel <- true
}
func RunWorker(MasterAddress string, me string,
    MapFunc func(string, string) []KeyValue,
    ReduceFunc func(string, []string) string,
    nRPC int, parallelism *Parallelism,
) {
    debug("RunWorker %s\n", me)
    wk := new(Worker)
    wk.name = me
    wk.Map = MapFunc
    wk.Reduce = ReduceFunc
    wk.nRPC = nRPC
    wk.parallelism = parallelism
    // 这里,worker也起了一个rpc的server,供服务端调用,并且把链接的信息ch传给服务端。
    rpcs := rpc.NewServer()
    rpcs.Register(wk)
    os.Remove(me) // only needed for "unix"
    l, e := net.Listen("unix", me)
    if e != nil {
        log.Fatal("RunWorker: worker ", me, " error: ", e)
    }
    wk.l = l
    wk.register(MasterAddress)

    // DON'T MODIFY CODE BELOW
    for {
        wk.Lock()
        if wk.nRPC == 0 {
            wk.Unlock()
            break
        }
        wk.Unlock()
        conn, err := wk.l.Accept()
        if err == nil {
            wk.Lock()
            wk.nRPC--
            wk.Unlock()
            go rpcs.ServeConn(conn)
        } else {
            break
        }
    }
    wk.l.Close()
    debug("RunWorker %s exit\n", me)
}
//看下ch转发的函数。这个非常重要。客户端链接到server,然后客户端也起个server 供server链接,调用dotask函数进行处理。
func (mr *Master) forwardRegistrations(ch chan string) {
    i := 0
    for {
        mr.Lock()
        if len(mr.workers) > i {
            // there's a worker that we haven't told schedule() about.
            w := mr.workers[i]
            go func() { ch <- w }() // send without holding the lock.
            i = i + 1
        } else {
            // wait for Register() to add an entry to workers[]
            // in response to an RPC from a new worker.
            mr.newCond.Wait()
        }
        mr.Unlock()
    }
}

// 这里的mr.workers可以看成是worker进程把自己的名字(其实名字就是unix的套接字。传过来了。)
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
    mr.Lock()
    defer mr.Unlock()
    debug("Register: worker %s\n", args.Worker)
    mr.workers = append(mr.workers, args.Worker)

    // tell forwardRegistrations() that there's a new workers[] entry.
    mr.newCond.Broadcast()

    return nil
}
/// worker线程的定义。其中me就是组成了mr.workers的[]。这个me就是由开局的port(**)这个传递进来,也就是其worker rpcserver的 地址,可以直接调用。

//func TestParallelBasic(t *testing.T) {
    //mr := setup()
    //for i := 0; i < 2; i++ {
    //  go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
        //  MapFunc, ReduceFunc, -1, nil)
    //}
    //mr.Wait()
    //check(t, mr.files)
    //checkWorker(t, mr.stats)
    //cleanup(mr)
//}

func RunWorker(MasterAddress string, me string,
    MapFunc func(string, string) []KeyValue,
    ReduceFunc func(string, []string) string,
    nRPC int, parallelism *Parallelism,
) {
    debug("RunWorker %s\n", me)
    wk := new(Worker)
    wk.name = me
    wk.Map = MapFunc
    wk.Reduce = ReduceFunc
    wk.nRPC = nRPC
    wk.parallelism = parallelism
    rpcs := rpc.NewServer()
    rpcs.Register(wk)
    os.Remove(me) // only needed for "unix"
    l, e := net.Listen("unix", me)
    if e != nil {
        log.Fatal("RunWorker: worker ", me, " error: ", e)
    }
    wk.l = l
    wk.register(MasterAddress)

上个函数的work.name这里在注册的时候赋值给args.Workers了。所以其实上面的me 就是 args.Worker,也就是mr.workers[i] 也就是被传到ch通道里面去了。这点很重要。。这个ch最终被传到调度函数的registerChan
///

func (wk *Worker) register(master string) {
    args := new(RegisterArgs)
    args.Worker = wk.name
    ok := call(master, "Master.Register", args, new(struct{}))
    if ok == false {
        fmt.Printf("Register: RPC %s register error\n", master)
    }
}

所以我们需要完成的调度函数,其实需要链接的work线程的rpc服务端已经准备好了,当然可能是在我们调度函数之前起的,也有可能是在我们调度函数之后起的。所以调度函数要注意接收registerChan里面的数据,有了新的,就可以使用work线程了。

package mapreduce

import (
    "fmt"
    "sync"
)

//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce

    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)


        taskwaitgroup := new(sync.WaitGroup)
        taskwaitgroup.Add(ntasks)

        taskChan := make(chan int, ntasks)
        for i:=0;i<ntasks;i++  {
            taskChan <- i
        }

       go func() {
        for {
            ch :=<- registerChan
            go func(c string) {
                for{
                    i:=<- taskChan
                    if call(ch,"Worker.DoTask",&DoTaskArgs{jobName,mapFiles[i],phase,i,n_other},new(struct{})){
                        taskwaitgroup.Done()
                    } else {
                        taskChan <- i
                        }

            }

        }(ch)
        }
       }()
        taskwaitgroup.Wait()

        fmt.Printf("Schedule: %v phase done\n", phase)
    }




    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //


最后才发现,上面的实现,在课程指导里面都有。mit 6.824 rpc notes

About: loony


发表评论

电子邮件地址不会被公开。 必填项已用*标注

Captcha Code