vlambda博客
学习文章列表

Golang源码深入-Go1.15.6发起http请求流程-2

上一篇文章我们讲到go client的大概实现的大概思路,整理了相关client.go的核心源码,详情请翻阅:笔者这一篇分享一下transport.go相关核心的代码,整理相关核心的技术点,希望读者多交流学习。


1、go client的整理流程,主要函数调用和流程如下:NewRequestWithContext->client.Do->client.do->client.send->send->rt.RoundTrip->Transport.roundTrip->Transport.getConn->Transport.queueForDial->Transport.dialConnFor->Transport.dialConn->Transport.readLoop()/Transport.writeLoop()->persistConn.roundTrip。

2、http.Client对象保存着Transport连接对象,Transport里面是一个最核心的是tcp连接池,连接池是处理http的请求,相对一个服务来说是全局的。在不同的函数中实例化这个对象处理不同的请求,在不重写Transport对象时,一个服务的连接都是默认复用。为什么是复用呢?是因为transport有个全局变量DefaultTransport,默认都是使用DefaultTransport这个全局对象。

3、http.NewRequest针对于每个请求都是独立的,每个请求request都是从http.Client里面获取连接,每个请求request都开启一个写协程处理发送请求,一个读协程处理响应请求,这个request本身则调用roundTrip函数启动for select 来监听读协程的结果,到此则请求完成。

下面我们来看核心模块代码翻译:

1

Transport.RoundTrip实现RoundTripper的方法

1func (t *Transport) RoundTrip(req *Request) (*Response, error) {
2    return t.roundTrip(req)
3}


2

Transport.roundTrip是主入口

 1func (t *Transport) roundTrip(req *Request) (*Response, error) {
2    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
3    ctx := req.Context()
4    trace := httptrace.ContextClientTrace(ctx)
5
6    if req.URL == nil {
7        req.closeBody()
8        return nil, errors.New("http: nil Request.URL")
9    }
10    if req.Header == nil {
11        req.closeBody()
12        return nil, errors.New("http: nil Request.Header")
13    }
14    scheme := req.URL.Scheme
15    isHTTP := scheme == "http" || scheme == "https"
16    // 下面判断request首部的有效性
17    if isHTTP {
18        for k, vv := range req.Header {
19            if !httpguts.ValidHeaderFieldName(k) {
20                req.closeBody()
21                return nil, fmt.Errorf("net/http: invalid header field name %q", k)
22            }
23            for _, v := range vv {
24                if !httpguts.ValidHeaderFieldValue(v) {
25                    req.closeBody()
26                    return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
27                }
28            }
29        }
30    }
31
32    origReq := req
33    cancelKey := cancelKey{origReq}
34    req = setupRewindBody(req)
35
36    if altRT := t.alternateRoundTripper(req); altRT != nil {
37        if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
38            return resp, err
39        }
40        var err error
41        req, err = rewindBody(req)
42        if err != nil {
43            return nil, err
44        }
45    }
46    if !isHTTP {
47        req.closeBody()
48        return nil, badStringError("unsupported protocol scheme", scheme)
49    }
50    if req.Method != "" && !validMethod(req.Method) {
51        req.closeBody()
52        return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
53    }
54    if req.URL.Host == "" {
55        req.closeBody()
56        return nil, errors.New("http: no Host in request URL")
57    }
58
59    // 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled) 的情况是不会进行重试的。具体参见shouldRetryRequest函数
60    for {
61        select {
62        case <-ctx.Done():
63            req.closeBody()
64            return nil, ctx.Err()
65        default:
66        }
67
68        // treq gets modified by roundTrip, so we need to recreate for each retry.
69        treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
70        cm, err := t.connectMethodForRequest(treq)
71        if err != nil {
72            req.closeBody()
73            return nil, err
74        }
75
76        // 获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,使用其自注册的RoundTrip处理,从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
77        if err != nil {
78            // 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置
79            t.setReqCanceler(cancelKey, nil)
80            req.closeBody()
81            return nil, err
82        }
83
84        var resp *Response
85        if pconn.alt != nil {
86            // HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制
87            t.setReqCanceler(cancelKey, nil// not cancelable with CancelRequest
88            resp, err = pconn.alt.RoundTrip(req)
89        } else {
90            // pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。通过writeLoop发送request,通过readLoop返回response
91            resp, err = pconn.roundTrip(treq)
92        }
93        if err == nil {
94            resp.Request = origReq
95            return resp, nil
96        }
97
98        // Failed. Clean up and determine whether to retry.
99        if http2isNoCachedConnError(err) {
100            if t.removeIdleConn(pconn) {
101                t.decConnsPerHost(pconn.cacheKey)
102            }
103        } else if !pconn.shouldRetryRequest(req, err) {
104            // Issue 16465: return underlying net.Conn.Read error from peek,
105            // as we've historically done.
106            if e, ok := err.(transportReadFromServerError); ok {
107                err = e.err
108            }
109            return nil, err
110        }
111        testHookRoundTripRetried()
112
113        // 用于重定向场景
114        req, err = rewindBody(req)
115        if err != nil {
116            return nil, err
117        }
118    }
119}


3

getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接。

 1func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
2    req := treq.Request
3    trace := treq.trace
4    ctx := req.Context()
5    if trace != nil && trace.GetConn != nil {
6        trace.GetConn(cm.addr())
7    }
8
9    w := &wantConn{
10        cm:         cm,
11        key:        cm.key(),
12        ctx:        ctx,
13        ready:      make(chan struct{}, 1),
14        beforeDial: testHookPrePendingDial,
15        afterDial:  testHookPostPendingDial,
16    }
17    defer func() {
18        if err != nil {
19            w.cancel(t, err)
20        }
21    }()
22
23    // 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
24    if delivered := t.queueForIdleConn(w); delivered {
25        pc := w.pc
26        // Trace only for HTTP/1.
27        // HTTP/2 calls trace.GotConn itself.
28        if pc.alt == nil && trace != nil && trace.GotConn != nil {
29            trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
30        }
31        // set request canceler to some non-nil function so we
32        // can detect whether it was cleared between now and when
33        // we enter roundTrip
34        t.setReqCanceler(treq.cancelKey, func(error) {})
35        return pc, nil
36    }
37
38    cancelc := make(chan error, 1)
39    t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
40
41    // 排队等待获取连接
42    t.queueForDial(w)
43
44    // 通过select监听获取连接完成或者取消
45    select {
46    case <-w.ready:
47        // Trace success but only for HTTP/1.
48        // HTTP/2 calls trace.GotConn itself.
49        if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
50            trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
51        }
52        if w.err != nil {
53            // If the request has been cancelled, that's probably
54            // what caused w.err; if so, prefer to return the
55            // cancellation error (see golang.org/issue/16049).
56            select {
57            case <-req.Cancel:
58                return nil, errRequestCanceledConn
59            case <-req.Context().Done():
60                return nil, req.Context().Err()
61            case err := <-cancelc:
62                if err == errRequestCanceled {
63                    err = errRequestCanceledConn
64                }
65                return nil, err
66            default:
67                // return below
68            }
69        }
70        return w.pc, w.err
71    case <-req.Cancel:
72        return nil, errRequestCanceledConn
73    case <-req.Context().Done():
74        return nil, req.Context().Err()
75    case err := <-cancelc:
76        if err == errRequestCanceled {
77            err = errRequestCanceledConn
78        }
79        return nil, err
80    }
81}


4

排队等待新建连接

 1func (t *Transport) queueForDial(w *wantConn) {
2    w.beforeDial()
3    // 如果没有限制最大连接数,直接建立连接
4    if t.MaxConnsPerHost <= 0 {
5        go t.dialConnFor(w)
6        return
7    }
8
9    t.connsPerHostMu.Lock()
10    defer t.connsPerHostMu.Unlock()
11
12    // 如果没超过连接数限制,直接建立连接
13    if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
14        if t.connsPerHost == nil {
15            t.connsPerHost = make(map[connectMethodKey]int)
16        }
17        t.connsPerHost[w.key] = n + 1
18        go t.dialConnFor(w)
19        return
20    }
21
22    if t.connsPerHostWait == nil {
23        t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
24    }
25    // 排队等待连接建立
26    q := t.connsPerHostWait[w.key]
27    q.cleanFront()
28    q.pushBack(w)
29    t.connsPerHostWait[w.key] = q
30}

5

调用t.dialConn获取一个真正的*persistConn

 1func (t *Transport) dialConnFor(w *wantConn) {
2    defer w.afterDial()
3    // 执行新建连接,拨号功能,如果新建连接成功,则添加当前连接到连接池
4    pc, err := t.dialConn(w.ctx, w.cm)
5    delivered := w.tryDeliver(pc, err)
6    if err == nil && (!delivered || pc.alt != nil) {
7        // pconn was not passed to w,
8        // or it is HTTP/2 and can be shared.
9        // Add to the idle connection pool.
10        t.putOrCloseIdleConn(pc)
11    }
12    // 如果建立连接或者获取连接失败,则删除连接池中的连接。
13    if err != nil {
14        t.decConnsPerHost(w.key)
15    }
16}


6

dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop

 1func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
2    pconn = &persistConn{
3        t:             t,
4        cacheKey:      cm.key(),
5        reqch:         make(chan requestAndChan, 1),
6        writech:       make(chan writeRequest, 1),
7        closech:       make(chan struct{}),
8        writeErrCh:    make(chan error, 1),
9        writeLoopDone: make(chan struct{}),
10    }
11    trace := httptrace.ContextClientTrace(ctx)
12    wrapErr := func(err error) error {
13        if cm.proxyURL != nil {
14            // Return a typed error, per Issue 16997
15            return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
16        }
17        return err
18    }
19    // 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
20    if cm.scheme() == "https" && t.hasCustomTLSDialer() {
21        var err error
22        pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
23        if err != nil {
24            return nil, wrapErr(err)
25        }
26        // 如果连接类型是TLS的,则需要处理TLS协商
27        if tc, ok := pconn.conn.(*tls.Conn); ok {
28            // Handshake here, in case DialTLS didn't. TLSNextProto below
29            // depends on it for knowing the connection state.
30            if trace != nil && trace.TLSHandshakeStart != nil {
31                trace.TLSHandshakeStart()
32            }
33            // 启动TLS协商,如果协商失败需要 关闭连接
34            if err := tc.Handshake(); err != nil {
35                go pconn.conn.Close()
36                if trace != nil && trace.TLSHandshakeDone != nil {
37                    trace.TLSHandshakeDone(tls.ConnectionState{}, err)
38                }
39                return nil, err
40            }
41            cs := tc.ConnectionState()
42            if trace != nil && trace.TLSHandshakeDone != nil {
43                trace.TLSHandshakeDone(cs, nil)
44            }
45            pconn.tlsState = &cs
46        }
47    } else {
48        // 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr()
49        conn, err := t.dial(ctx, "tcp", cm.addr())
50        if err != nil {
51            return nil, wrapErr(err)
52        }
53        pconn.conn = conn
54        if cm.scheme() == "https" {
55            var firstTLSHost string
56            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
57                return nil, wrapErr(err)
58            }
59            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
60                return nil, wrapErr(err)
61            }
62        }
63    }
64
65    // 处理proxy的情况
66    switch {
67    case cm.proxyURL == nil:
68        // Do nothing. Not using a proxy.
69    case cm.proxyURL.Scheme == "socks5":
70        conn := pconn.conn
71        d := socksNewDialer("tcp", conn.RemoteAddr().String())
72        if u := cm.proxyURL.User; u != nil {
73            auth := &socksUsernamePassword{
74                Username: u.Username(),
75            }
76            auth.Password, _ = u.Password()
77            d.AuthMethods = []socksAuthMethod{
78                socksAuthMethodNotRequired,
79                socksAuthMethodUsernamePassword,
80            }
81            d.Authenticate = auth.Authenticate
82        }
83        if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
84            conn.Close()
85            return nil, err
86        }
87    case cm.targetScheme == "http":
88        pconn.isProxy = true
89        if pa := cm.proxyAuth(); pa != "" {
90            pconn.mutateHeaderFunc = func(h Header) {
91                h.Set("Proxy-Authorization", pa)
92            }
93        }
94    case cm.targetScheme == "https":
95        conn := pconn.conn
96        hdr := t.ProxyConnectHeader
97        if hdr == nil {
98            hdr = make(Header)
99        }
100        if pa := cm.proxyAuth(); pa != "" {
101            hdr = hdr.Clone()
102            hdr.Set("Proxy-Authorization", pa)
103        }
104        connectReq := &Request{
105            Method: "CONNECT",
106            URL:    &url.URL{Opaque: cm.targetAddr},
107            Host:   cm.targetAddr,
108            Header: hdr,
109        }
110
111        // If there's no done channel (no deadline or cancellation
112        // from the caller possible), at least set some (long)
113        // timeout here. This will make sure we don't block forever
114        // and leak a goroutine if the connection stops replying
115        // after the TCP connect.
116        connectCtx := ctx
117        if ctx.Done() == nil {
118            newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
119            defer cancel()
120            connectCtx = newCtx
121        }
122
123        didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails
124        var (
125            resp *Response
126            err  error // write or read error
127        )
128        // Write the CONNECT request & read the response.
129        go func() {
130            defer close(didReadResponse)
131            err = connectReq.Write(conn)
132            if err != nil {
133                return
134            }
135            // Okay to use and discard buffered reader here, because
136            // TLS server will not speak until spoken to.
137            br := bufio.NewReader(conn)
138            resp, err = ReadResponse(br, connectReq)
139        }()
140        select {
141        case <-connectCtx.Done():
142            conn.Close()
143            <-didReadResponse
144            return nil, connectCtx.Err()
145        case <-didReadResponse:
146            // resp or err now set
147        }
148        if err != nil {
149            conn.Close()
150            return nil, err
151        }
152        if resp.StatusCode != 200 {
153            f := strings.SplitN(resp.Status, " "2)
154            conn.Close()
155            if len(f) < 2 {
156                return nil, errors.New("unknown status code")
157            }
158            return nil, errors.New(f[1])
159        }
160    }
161
162    if cm.proxyURL != nil && cm.targetScheme == "https" {
163        if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
164            return nil, err
165        }
166    }
167
168    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
169        if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
170            alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
171            if e, ok := alt.(http2erringRoundTripper); ok {
172                // pconn.conn was closed by next (http2configureTransport.upgradeFn).
173                return nil, e.err
174            }
175            return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
176        }
177    }
178
179    pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
180    pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
181    // 处理请求response
182    go pconn.readLoop()
183    // 开启协程处理请求
184    go pconn.writeLoop()
185    return pconn, nil
186}


7

readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。

 1func (pc *persistConn) readLoop() {
2    // 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个处理,任何一个loop退出(协议升级除外)则该连接不可用,readLoop跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源
3    closeErr := errReadLoopExiting // default value, if not changed below
4    defer func() {
5        pc.close(closeErr)
6        pc.t.removeIdleConn(pc)
7    }()
8
9    // 尝试将连接放回连接池
10    tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
11        if err := pc.t.tryPutIdleConn(pc); err != nil {
12            closeErr = err
13            if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
14                trace.PutIdleConn(err)
15            }
16            return false
17        }
18        if trace != nil && trace.PutIdleConn != nil {
19            trace.PutIdleConn(nil)
20        }
21        return true
22    }
23
24    // 变量主要用于阻塞调用者协程读取EOF的resp.body,直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,便于连接快速重用
25    eofc := make(chan struct{})
26    defer close(eofc) // unblock reader on errors
27
28    // Read this once, before loop starts. (to avoid races in tests)
29    testHookMu.Lock()
30    testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
31    testHookMu.Unlock()
32
33    alive := true
34    for alive {
35        // 获取允许的response首部的最大字节数
36        pc.readLimit = pc.maxHeaderResponseSize()
37        _, err := pc.br.Peek(1)
38
39        pc.mu.Lock()
40        if pc.numExpectedResponses == 0 {
41            pc.readLoopPeekFailLocked(err)
42            pc.mu.Unlock()
43            return
44        }
45        pc.mu.Unlock()
46
47        rc := <-pc.reqch
48        trace := httptrace.ContextClientTrace(rc.req.Context())
49
50        var resp *Response
51        // 如果有response数据,则读取并解析为Response格式
52        if err == nil {
53            resp, err = pc.readResponse(rc, trace)
54        } else {
55            // 可能的错误如server端关闭,发送EOF
56            err = transportReadFromServerError{err}
57            closeErr = err
58        }
59
60        if err != nil {
61            if pc.readLimit <= 0 {
62                err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
63            }
64
65            select {
66            case rc.ch <- responseAndError{err: err}:
67            case <-rc.callerGone:
68                return
69            }
70            return
71        }
72        pc.readLimit = maxInt64 // effectively no limit for response bodies
73
74        pc.mu.Lock()
75        pc.numExpectedResponses--
76        pc.mu.Unlock()
77
78        bodyWritable := resp.bodyIsWritable()
79        hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
80
81        if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
82            // Don't do keep-alive on error if either party requested a close
83            // or we get an unexpected informational (1xx) response.
84            // StatusCode 100 is already handled above.
85            alive = false
86        }
87
88        if !hasBody || bodyWritable {
89            pc.t.setReqCanceler(rc.cancelKey, nil)
90
91            // Put the idle conn back into the pool before we send the response
92            // so if they process it quickly and make another request, they'll
93            // get this same conn. But we use the unbuffered channel 'rc'
94            // to guarantee that persistConn.roundTrip got out of its select
95            // potentially waiting for this persistConn to close.
96            // but after
97            alive = alive &&
98                !pc.sawEOF &&
99                pc.wroteRequest() &&
100                tryPutIdleConn(trace)
101
102            if bodyWritable {
103                closeErr = errCallerOwnsConn
104            }
105
106            select {
107            case rc.ch <- responseAndError{res: resp}:
108            case <-rc.callerGone:
109                return
110            }
111
112            // Now that they've read from the unbuffered channel, they're safely
113            // out of the select that also waits on this goroutine to die, so
114            // we're allowed to exit now if needed (if alive is false)
115            testHookReadLoopBeforeNextRead()
116            continue
117        }
118
119        waitForBodyRead := make(chan bool2)
120        body := &bodyEOFSignal{
121            body: resp.Body,
122            earlyCloseFn: func() error {
123                waitForBodyRead <- false
124                <-eofc // will be closed by deferred call at the end of the function
125                return nil
126
127            },
128            fn: func(err error) error {
129                isEOF := err == io.EOF
130                waitForBodyRead <- isEOF
131                if isEOF {
132                    <-eofc // see comment above eofc declaration
133                } else if err != nil {
134                    if cerr := pc.canceled(); cerr != nil {
135                        return cerr
136                    }
137                }
138                return err
139            },
140        }
141
142        // 返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
143        resp.Body = body
144        if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
145            resp.Body = &gzipReader{body: body}
146            resp.Header.Del("Content-Encoding")
147            resp.Header.Del("Content-Length")
148            resp.ContentLength = -1
149            resp.Uncompressed = true
150        }
151
152        // 此处与处理不带resp.body的场景相同
153        select {
154        case rc.ch <- responseAndError{res: resp}:
155        case <-rc.callerGone:
156            return
157        }
158
159        // Before looping back to the top of this function and peeking on
160        // the bufio.Reader, wait for the caller goroutine to finish
161        // reading the response body. (or for cancellation or death)
162        select {
163        case bodyEOF := <-waitForBodyRead:
164            // 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。
165            pc.t.setReqCanceler(rc.cancelKey, nil// before pc might return to idle pool
166            alive = alive &&
167                bodyEOF &&
168                !pc.sawEOF &&
169                pc.wroteRequest() &&
170                tryPutIdleConn(trace)
171                // 释放阻塞的读操作
172            if bodyEOF {
173                eofc <- struct{}{}
174            }
175        case <-rc.req.Cancel:
176            alive = false
177            pc.t.CancelRequest(rc.req)
178        case <-rc.req.Context().Done():
179            alive = false
180            pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
181        case <-pc.closech:
182            alive = false
183        }
184
185        testHookReadLoopBeforeNextRead()
186    }
187}


8

writeLoop用于发送request请求

 1func (pc *persistConn) writeLoop() {
2    defer close(pc.writeLoopDone)
3    // writeLoop会阻塞等待两个IO case 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;如果底层连接关闭,则退出writeLoop
4    for {
5        select {
6        case wr := <-pc.writech:
7            startBytesWritten := pc.nwrite
8            // 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request
9            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
10            if bre, ok := err.(requestBodyReadError); ok {
11                err = bre.error
12                // Errors reading from the user's
13                // Request.Body are high priority.
14                // Set it here before sending on the
15                // channels below or calling
16                // pc.close() which tears town
17                // connections and causes other
18                // errors.
19                wr.req.setError(err)
20            }
21            // 请求失败时,需要关闭request和底层连接
22            if err == nil {
23                err = pc.bw.Flush()
24            }
25            if err != nil {
26                wr.req.Request.closeBody()
27                if pc.nwrite == startBytesWritten {
28                    err = nothingWrittenError{err}
29                }
30            }
31            // 将结果发送给readLoop的pc.wroteRequest()函数处理
32            pc.writeErrCh <- err 
33            // 将结果返回给roundTrip处理,防止响应超时
34            wr.ch <- err         
35            // 如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,同时会导致readLoop退出
36            if err != nil {
37                pc.close(err)
38                return
39            }
40        case <-pc.closech:
41            return
42        }
43    }
44}


9

一个roundTrip用于处理一个request,通过for select来监听结果。

 1func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2    testHookEnterRoundTrip()
3    // 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消, 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
4    if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) {
5        pc.t.putOrCloseIdleConn(pc)
6        return nil, errRequestCanceled
7    }
8    pc.mu.Lock()
9    // 与readLoop配合使用,表示期望的响应的个数
10    pc.numExpectedResponses++
11    headerFn := pc.mutateHeaderFunc
12    pc.mu.Unlock()
13
14    if headerFn != nil {
15        headerFn(req.extraHeaders())
16    }
17
18    // Ask for a compressed version if the caller didn't set their
19    // own value for Accept-Encoding. We only attempt to
20    // uncompress the gzip stream if we were the layer that
21    // requested it.
22    requestedGzip := false
23    // 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且仅在调用者没有设置这些首部时设置
24    if !pc.t.DisableCompression &&
25        req.Header.Get("Accept-Encoding") == "" &&
26        req.Header.Get("Range") == "" &&
27        req.Method != "HEAD" {
28        // Request gzip only, not deflate. Deflate is ambiguous and
29        // not as universally supported anyway.
30        // See: https://zlib.net/zlib_faq.html#faq39
31        //
32        // Note that we don't request this for HEAD requests,
33        // due to a bug in nginx:
34        //   https://trac.nginx.org/nginx/ticket/358
35        //   https://golang.org/issue/5522
36        //
37        // We don't request gzip if the request is for a range, since
38        // auto-decoding a portion of a gzipped document will just fail
39        // anyway. See https://golang.org/issue/8923
40        requestedGzip = true
41        req.extraHeaders().Set("Accept-Encoding""gzip")
42    }
43
44    var continueCh chan struct{}
45    if req.ProtoAtLeast(11) && req.Body != nil && req.expectsContinue() {
46        continueCh = make(chan struct{}, 1)
47    }
48
49    // HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
50    if pc.t.DisableKeepAlives && !req.wantsClose() {
51        req.extraHeaders().Set("Connection""close")
52    }
53
54    // 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
55    gone := make(chan struct{})
56    defer close(gone)
57
58    defer func() {
59        if err != nil {
60            pc.t.setReqCanceler(req.cancelKey, nil)
61        }
62    }()
63
64    const debugRoundTrip = false
65
66    // Write the request concurrently with waiting for a response,
67    // in case the server decides to reply before reading our full
68    // request body.
69    startBytesWritten := pc.nwrite
70    // 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
71    writeErrCh := make(chan error, 1)
72    pc.writech <- writeRequest{req, writeErrCh, continueCh}
73
74    resc := make(chan responseAndError)
75    pc.reqch <- requestAndChan{
76        req:        req.Request,
77        cancelKey:  req.cancelKey,
78        ch:         resc,
79        addedGzip:  requestedGzip,
80        continueCh: continueCh,
81        callerGone: gone,
82    }
83
84    var respHeaderTimer <-chan time.Time
85    cancelChan := req.Request.Cancel
86    ctxDoneChan := req.Context().Done()
87    // 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse, 退出roundtrip函数
88    for {
89        testHookWaitResLoop()
90        select {
91        // writeLoop返回发送request后的结果
92        case err := <-writeErrCh:
93            if debugRoundTrip {
94                req.logf("writeErrCh resv: %T/%#v", err, err)
95            }
96            if err != nil {
97                pc.close(fmt.Errorf("write error: %v", err))
98                return nil, pc.mapRoundTripError(req, startBytesWritten, err)
99            }
100            if d := pc.t.ResponseHeaderTimeout; d > 0 {
101                if debugRoundTrip {
102                    req.logf("starting timer for %v", d)
103                }
104                timer := time.NewTimer(d)
105                defer timer.Stop() // prevent leaks
106                respHeaderTimer = timer.C
107            }
108        // 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
109        case <-pc.closech:
110            if debugRoundTrip {
111                req.logf("closech recv: %T %#v", pc.closed, pc.closed)
112            }
113            return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
114        // 等待获取response超时,关闭连接
115        case <-respHeaderTimer:
116            if debugRoundTrip {
117                req.logf("timeout waiting for response headers.")
118            }
119            pc.close(errTimeout)
120            return nil, errTimeout
121        // 接收到readLoop返回的response结果
122        case re := <-resc:
123            // 极异常情况,直接程序panic
124            if (re.res == nil) == (re.err == nil) {
125                panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
126            }
127            if debugRoundTrip {
128                req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
129            }
130            if re.err != nil {
131                return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
132            }
133            // 到这里是最终的成功返回的结果。
134            return re.res, nil
135        // request取消
136        case <-cancelChan:
137            pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
138            // 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
139            cancelChan = nil
140        case <-ctxDoneChan:
141            pc.t.cancelRequest(req.cancelKey, req.Context().Err())
142            cancelChan = nil
143            ctxDoneChan = nil
144        }
145    }
146}

关于源码就分享到这里。


主要核心流程以及功能梳理图解如下



总结

( 1 )
Golang源码深入-Go1.15.6发起http请求流程-2

go发起http1.1请求,遇到不关心的请求,请务必完整读取响应内容以保障连接复用性。

( 2 )
Golang源码深入-Go1.15.6发起http请求流程-2

果在http.client 中没有设置transport熟悉,则会使用文章开头说的DefaultTransport,这里设置的默认最大空闲连接数MaxIdleConns,每个host最大空闲连数MaxIdleConnsPerHost是2,每个host的最大连接数MaxConnsPerHost是0。在大量并发情况下,默认配置会造成很多链接,进而性能急剧下降。果需控制合适的连接数,就需要使用自定义的client和transport。配置方式如下:

1t := http.DefaultTransport.(*http.Transport).Clone()
2t.MaxIdleConns = 100
3t.MaxConnsPerHost = 100
4t.MaxIdleConnsPerHost = 100
5
6httpClient = &http.Client{
7  Timeout:   10 * time.Second,
8  Transport: t,
9}
( 3 )
Golang源码深入-Go1.15.6发起http请求流程-2

http1.1线头阻塞:http中一个连接上的请求,需要等这个请求处理完了才能继续下一个请求。

参照文献

https://www.jb51.net/article/193675.htm

https://www.cnblogs.com/charlieroro/p/11409153.html

点个在看你最好看