6.824 第二课 rpc

课程链接: mit 6.824 Schedule具体参考lec2 里面的Rpc and Threads,Crawler. 其中rpc and Threads 有人翻译了。这里就不拿出来了。参考链接
threads 主要提到了一些rpc模型的好处和可能存在的一些问题,需要我们注意和处理的。
RPC的目的:

  • 容易编写网络通信程序
  • 隐藏客户端服务器通信的细节
  • 客户端调用更加像传统的过程调用
  • 服务端处理更加像传统的过程调用

在rpc上,你可以通过调用本地函数一样的方法来调用远程的函数,特别方便。当然你同时也需要知道远端的函数的一些参数,不然是没法调用的。同时也是有些数据是没法传递的 例如channels和function是没法通过rpc传递的。
rpc 处理失败问题的:丢包,网络断线,服务器运行缓慢,服务器崩溃。
所以有了最简单的方案:

  • 至少一次行为:
    rpc等待回复一段时间,如果还没有回复达到,重新发送请求。重复多次,如果还是没有回复,那么返回错误给应用。
    至少一次应用程序处理会出现什么问题:会出现客户端同时出现对同一个资源的同一个操作,例如对同一个key设置值 。
    其实在至少一次行为对于回复操作,例如只读,或者程序有自己处理同时写冲突的处理能力时候可以很好的处理,但是真实场景中,这两点很难同时满足。

  • 至多一次:这是一个更好的rpc处理的方案,但是需要服务端识别客户端每次发送过来的请求,是新请求,还是之前 已经处理过的情况,如果是新请求,直接处理,如果是之前已经请求过了的,返回处理结果。
    同样如何确认一个请求是否与上一个请求相同:那就需要客户端请求的时候带上一个唯一ID,相同请求使用相同的ID。 server : if seen[xid]: r = old[xid] else r=handler() old[xid]= r seen[xid]=true

    • 怎么确认xid是唯一的?
      • 很大的随机数?
      • 将唯一的客户端ID(ip address?)和序列号组合起来?
    • 服务器最后必须丢弃老的RPC信息?
    • 什么时候丢弃是安全的?
      • idea:
        • 唯一的客户端id
        • 上一个rpc请求的序列号
        • 客户端的每一个RPC请求包含”seen all replies <=X”
        • 类似tcp中的seq和ack
        • 或者每次只允许一个RPC调用,到达的是seq+1,那么忽略其他小于seq
        • 客户端最多可以尝试5次,服务器会忽略大于5次的请求。
    • 当原来的请求还在执行,怎么样处理相同seq的请求?
      • 服务器不想运行两次,也不想回复。
      • 想法:给每个执行的RPC,pending标识;等待或者忽略。

附上一份课程代码,然后对一些类似并行处理的争用,采用最简单的锁或者通道的无缓冲性来解决并行冲突。
此段代码命名为crawler.go

What is a crawler?
  goal is to fetch all web pages, e.g. to feed to an indexer
  web pages form a graph
  multiple links to each page
  graph has cycles

Crawler challenges
  Arrange for I/O concurrency
    Fetch many URLs at the same time
    To increase URLs fetched per second
    Since network latency is much more of a limit than network capacity
  Fetch each URL only *once*
    avoid wasting network bandwidth
    be nice to remote servers
    =&gt; Need to remember which URLs visited 
  Know when finished
package main

import (
&quot;fmt&quot;
&quot;sync&quot;
)

//
// Several solutions to the crawler exercise from the Go tutorial
// https://tour.golang.org/concurrency/10
//

//
// Serial crawler
//这里是串行处理,一个一个来,就不存在争用了。。。

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
// 这里进行是否处理的判断,如果处理了,就直接返回。保证每个 URL 只fetch一次。
    if fetched[url] {
        return
    }
    fetched[url] = true
    urls, err := fetcher.Fetch(url)
    if err != nil {
        return
    }
    for _, u := range urls {
        Serial(u, fetcher, fetched)
    }
    return
}

//
// Concurrent crawler with shared state and Mutex
//这里采用共享状态和锁来解决同时处理的冲突。fetchState 包含了一个锁,和一个fetch的kvmap,用来保存fetch状态。

type fetchState struct {
    mu      sync.Mutex
    fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
//首先争用锁,啥时候获得锁了,然后判断判断是否为fetch过,fetch过就直接返回,并且释放锁。
    f.mu.Lock()
    if f.fetched[url] {
        f.mu.Unlock()
        return
    }
    //如果没有fetch过就将fetch当前的url状态改为true(这个时候是享有锁的),并且同时释放锁。。。这里可能会有问题,因为并没有真正的fetch 就改变了当前url的状态。
    f.fetched[url] = true
    f.mu.Unlock()

    urls, err := fetcher.Fetch(url)
    if err != nil {
        return
    }
    //同时这里采用sync.WaiteGroup 阻塞多携程同时处理..
    var done sync.WaitGroup
    for _, u := range urls {
        done.Add(1)
        go func(u string) {
            defer done.Done()
            ConcurrentMutex(u, fetcher, f)
        }(u)
    }
    done.Wait()
    return
}

func makeState() *fetchState {
    f := &amp;fetchState{}
    f.fetched = make(map[string]bool)
    return f
}

//
// Concurrent crawler with channels
// 这里就是用通道来处理了。。首先定义的是worker,如果获取到正确的urls  就将urls 往通道里面送,如果没有获取到就发送空的数据过去。

func worker(url string, ch chan []string, fetcher Fetcher) {
    urls, err := fetcher.Fetch(url)
    if err != nil {
        ch &lt;- []string{}
    } else {
        ch &lt;- urls
    }
}
//这里master进行处理,可能会更简单,会提取通道里面的urllist进行循环处理,直到所有的url 都处理完,就退出。
func master(ch chan []string, fetcher Fetcher) {
    n := 1
    fetched := make(map[string]bool)
    for urls := range ch {
        for _, u := range urls {
            if fetched[u] == false {
                fetched[u] = true
                n += 1
                go worker(u, ch, fetcher)
            }
        }
        n -= 1
        if n == 0 {
            break
        }
    }
}

func ConcurrentChannel(url string, fetcher Fetcher) {
    ch := make(chan []string)
    go func() {
        ch &lt;- []string{url}
    }()
    master(ch, fetcher)
}

//
// main
//

func main() {
    fmt.Printf(&quot;=== Serial===\n&quot;)
    Serial(&quot;http://golang.org/&quot;, fetcher, make(map[string]bool))

    fmt.Printf(&quot;=== ConcurrentMutex ===\n&quot;)
    ConcurrentMutex(&quot;http://golang.org/&quot;, fetcher, makeState())

    fmt.Printf(&quot;=== ConcurrentChannel ===\n&quot;)
    ConcurrentChannel(&quot;http://golang.org/&quot;, fetcher)
}

//
// Fetcher
//

type Fetcher interface {
    // Fetch returns a slice of URLs found on the page.
    Fetch(url string) (urls []string, err error)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
    if res, ok := f[url]; ok {
        fmt.Printf(&quot;found:   %s\n&quot;, url)
        return res.urls, nil
    }
    fmt.Printf(&quot;missing: %s\n&quot;, url)
    return nil, fmt.Errorf(&quot;not found: %s&quot;, url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    &quot;http://golang.org/&quot;: &amp;fakeResult{
        &quot;The Go Programming Language&quot;,
        []string{
            &quot;http://golang.org/pkg/&quot;,
            &quot;http://golang.org/cmd/&quot;,
        },
    },
    &quot;http://golang.org/pkg/&quot;: &amp;fakeResult{
        &quot;Packages&quot;,
        []string{
            &quot;http://golang.org/&quot;,
            &quot;http://golang.org/cmd/&quot;,
            &quot;http://golang.org/pkg/fmt/&quot;,
            &quot;http://golang.org/pkg/os/&quot;,
        },
    },
    &quot;http://golang.org/pkg/fmt/&quot;: &amp;fakeResult{
        &quot;Package fmt&quot;,
        []string{
            &quot;http://golang.org/&quot;,
            &quot;http://golang.org/pkg/&quot;,
        },
    },
    &quot;http://golang.org/pkg/os/&quot;: &amp;fakeResult{
        &quot;Package os&quot;,
        []string{
            &quot;http://golang.org/&quot;,
            &quot;http://golang.org/pkg/&quot;,
        },
    },
}

课程给了一点小结:

When to use sharing and locks, versus channels?
  Most problems can be solved in either style
  What makes the most sense depends on how the programmer thinks
    state -- sharing and locks
    communication -- channels
    waiting for events -- channels
  Use Go&#039;s race detector:
    https://golang.org/doc/articles/race_detector.html
    go test -race 

About: loony


发表评论

电子邮件地址不会被公开。 必填项已用*标注

Captcha Code