mirror of https://github.com/caddyserver/caddy
554 lines
16 KiB
Go
554 lines
16 KiB
Go
package reverseproxy
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
weakrand "math/rand"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
|
|
"github.com/caddyserver/caddy/v2"
|
|
)
|
|
|
|
func init() {
|
|
caddy.RegisterModule(SRVUpstreams{})
|
|
caddy.RegisterModule(AUpstreams{})
|
|
caddy.RegisterModule(MultiUpstreams{})
|
|
}
|
|
|
|
// SRVUpstreams provides upstreams from SRV lookups.
|
|
// The lookup DNS name can be configured either by
|
|
// its individual parts (that is, specifying the
|
|
// service, protocol, and name separately) to form
|
|
// the standard "_service._proto.name" domain, or
|
|
// the domain can be specified directly in name by
|
|
// leaving service and proto empty. See RFC 2782.
|
|
//
|
|
// Lookups are cached and refreshed at the configured
|
|
// refresh interval.
|
|
//
|
|
// Returned upstreams are sorted by priority and weight.
|
|
type SRVUpstreams struct {
|
|
// The service label.
|
|
Service string `json:"service,omitempty"`
|
|
|
|
// The protocol label; either tcp or udp.
|
|
Proto string `json:"proto,omitempty"`
|
|
|
|
// The name label; or, if service and proto are
|
|
// empty, the entire domain name to look up.
|
|
Name string `json:"name,omitempty"`
|
|
|
|
// The interval at which to refresh the SRV lookup.
|
|
// Results are cached between lookups. Default: 1m
|
|
Refresh caddy.Duration `json:"refresh,omitempty"`
|
|
|
|
// If > 0 and there is an error with the lookup,
|
|
// continue to use the cached results for up to
|
|
// this long before trying again, (even though they
|
|
// are stale) instead of returning an error to the
|
|
// client. Default: 0s.
|
|
GracePeriod caddy.Duration `json:"grace_period,omitempty"`
|
|
|
|
// Configures the DNS resolver used to resolve the
|
|
// SRV address to SRV records.
|
|
Resolver *UpstreamResolver `json:"resolver,omitempty"`
|
|
|
|
// If Resolver is configured, how long to wait before
|
|
// timing out trying to connect to the DNS server.
|
|
DialTimeout caddy.Duration `json:"dial_timeout,omitempty"`
|
|
|
|
// If Resolver is configured, how long to wait before
|
|
// spawning an RFC 6555 Fast Fallback connection.
|
|
// A negative value disables this.
|
|
FallbackDelay caddy.Duration `json:"dial_fallback_delay,omitempty"`
|
|
|
|
resolver *net.Resolver
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// CaddyModule returns the Caddy module information.
|
|
func (SRVUpstreams) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "http.reverse_proxy.upstreams.srv",
|
|
New: func() caddy.Module { return new(SRVUpstreams) },
|
|
}
|
|
}
|
|
|
|
func (su *SRVUpstreams) Provision(ctx caddy.Context) error {
|
|
su.logger = ctx.Logger()
|
|
if su.Refresh == 0 {
|
|
su.Refresh = caddy.Duration(time.Minute)
|
|
}
|
|
|
|
if su.Resolver != nil {
|
|
err := su.Resolver.ParseAddresses()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d := &net.Dialer{
|
|
Timeout: time.Duration(su.DialTimeout),
|
|
FallbackDelay: time.Duration(su.FallbackDelay),
|
|
}
|
|
su.resolver = &net.Resolver{
|
|
PreferGo: true,
|
|
Dial: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
|
//nolint:gosec
|
|
addr := su.Resolver.netAddrs[weakrand.Intn(len(su.Resolver.netAddrs))]
|
|
return d.DialContext(ctx, addr.Network, addr.JoinHostPort(0))
|
|
},
|
|
}
|
|
}
|
|
if su.resolver == nil {
|
|
su.resolver = net.DefaultResolver
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (su SRVUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
|
|
suAddr, service, proto, name := su.expandedAddr(r)
|
|
|
|
// first, use a cheap read-lock to return a cached result quickly
|
|
srvsMu.RLock()
|
|
cached := srvs[suAddr]
|
|
srvsMu.RUnlock()
|
|
if cached.isFresh() {
|
|
return allNew(cached.upstreams), nil
|
|
}
|
|
|
|
// otherwise, obtain a write-lock to update the cached value
|
|
srvsMu.Lock()
|
|
defer srvsMu.Unlock()
|
|
|
|
// check to see if it's still stale, since we're now in a different
|
|
// lock from when we first checked freshness; another goroutine might
|
|
// have refreshed it in the meantime before we re-obtained our lock
|
|
cached = srvs[suAddr]
|
|
if cached.isFresh() {
|
|
return allNew(cached.upstreams), nil
|
|
}
|
|
|
|
if c := su.logger.Check(zapcore.DebugLevel, "refreshing SRV upstreams"); c != nil {
|
|
c.Write(
|
|
zap.String("service", service),
|
|
zap.String("proto", proto),
|
|
zap.String("name", name),
|
|
)
|
|
}
|
|
|
|
_, records, err := su.resolver.LookupSRV(r.Context(), service, proto, name)
|
|
if err != nil {
|
|
// From LookupSRV docs: "If the response contains invalid names, those records are filtered
|
|
// out and an error will be returned alongside the remaining results, if any." Thus, we
|
|
// only return an error if no records were also returned.
|
|
if len(records) == 0 {
|
|
if su.GracePeriod > 0 {
|
|
if c := su.logger.Check(zapcore.ErrorLevel, "SRV lookup failed; using previously cached"); c != nil {
|
|
c.Write(zap.Error(err))
|
|
}
|
|
cached.freshness = time.Now().Add(time.Duration(su.GracePeriod) - time.Duration(su.Refresh))
|
|
srvs[suAddr] = cached
|
|
return allNew(cached.upstreams), nil
|
|
}
|
|
return nil, err
|
|
}
|
|
if c := su.logger.Check(zapcore.WarnLevel, "SRV records filtered"); c != nil {
|
|
c.Write(zap.Error(err))
|
|
}
|
|
}
|
|
|
|
upstreams := make([]Upstream, len(records))
|
|
for i, rec := range records {
|
|
if c := su.logger.Check(zapcore.DebugLevel, "discovered SRV record"); c != nil {
|
|
c.Write(
|
|
zap.String("target", rec.Target),
|
|
zap.Uint16("port", rec.Port),
|
|
zap.Uint16("priority", rec.Priority),
|
|
zap.Uint16("weight", rec.Weight),
|
|
)
|
|
}
|
|
addr := net.JoinHostPort(rec.Target, strconv.Itoa(int(rec.Port)))
|
|
upstreams[i] = Upstream{Dial: addr}
|
|
}
|
|
|
|
// before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full
|
|
if cached.freshness.IsZero() && len(srvs) >= 100 {
|
|
for randomKey := range srvs {
|
|
delete(srvs, randomKey)
|
|
break
|
|
}
|
|
}
|
|
|
|
srvs[suAddr] = srvLookup{
|
|
srvUpstreams: su,
|
|
freshness: time.Now(),
|
|
upstreams: upstreams,
|
|
}
|
|
|
|
return allNew(upstreams), nil
|
|
}
|
|
|
|
func (su SRVUpstreams) String() string {
|
|
if su.Service == "" && su.Proto == "" {
|
|
return su.Name
|
|
}
|
|
return su.formattedAddr(su.Service, su.Proto, su.Name)
|
|
}
|
|
|
|
// expandedAddr expands placeholders in the configured SRV domain labels.
|
|
// The return values are: addr, the RFC 2782 representation of the SRV domain;
|
|
// service, the service; proto, the protocol; and name, the name.
|
|
// If su.Service and su.Proto are empty, name will be returned as addr instead.
|
|
func (su SRVUpstreams) expandedAddr(r *http.Request) (addr, service, proto, name string) {
|
|
repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
|
|
name = repl.ReplaceAll(su.Name, "")
|
|
if su.Service == "" && su.Proto == "" {
|
|
addr = name
|
|
return
|
|
}
|
|
service = repl.ReplaceAll(su.Service, "")
|
|
proto = repl.ReplaceAll(su.Proto, "")
|
|
addr = su.formattedAddr(service, proto, name)
|
|
return
|
|
}
|
|
|
|
// formattedAddr the RFC 2782 representation of the SRV domain, in
|
|
// the form "_service._proto.name".
|
|
func (SRVUpstreams) formattedAddr(service, proto, name string) string {
|
|
return fmt.Sprintf("_%s._%s.%s", service, proto, name)
|
|
}
|
|
|
|
type srvLookup struct {
|
|
srvUpstreams SRVUpstreams
|
|
freshness time.Time
|
|
upstreams []Upstream
|
|
}
|
|
|
|
func (sl srvLookup) isFresh() bool {
|
|
return time.Since(sl.freshness) < time.Duration(sl.srvUpstreams.Refresh)
|
|
}
|
|
|
|
type IPVersions struct {
|
|
IPv4 *bool `json:"ipv4,omitempty"`
|
|
IPv6 *bool `json:"ipv6,omitempty"`
|
|
}
|
|
|
|
func resolveIpVersion(versions *IPVersions) string {
|
|
resolveIpv4 := versions == nil || (versions.IPv4 == nil && versions.IPv6 == nil) || (versions.IPv4 != nil && *versions.IPv4)
|
|
resolveIpv6 := versions == nil || (versions.IPv6 == nil && versions.IPv4 == nil) || (versions.IPv6 != nil && *versions.IPv6)
|
|
switch {
|
|
case resolveIpv4 && !resolveIpv6:
|
|
return "ip4"
|
|
case !resolveIpv4 && resolveIpv6:
|
|
return "ip6"
|
|
default:
|
|
return "ip"
|
|
}
|
|
}
|
|
|
|
// AUpstreams provides upstreams from A/AAAA lookups.
|
|
// Results are cached and refreshed at the configured
|
|
// refresh interval.
|
|
type AUpstreams struct {
|
|
// The domain name to look up.
|
|
Name string `json:"name,omitempty"`
|
|
|
|
// The port to use with the upstreams. Default: 80
|
|
Port string `json:"port,omitempty"`
|
|
|
|
// The interval at which to refresh the A lookup.
|
|
// Results are cached between lookups. Default: 1m
|
|
Refresh caddy.Duration `json:"refresh,omitempty"`
|
|
|
|
// Configures the DNS resolver used to resolve the
|
|
// domain name to A records.
|
|
Resolver *UpstreamResolver `json:"resolver,omitempty"`
|
|
|
|
// If Resolver is configured, how long to wait before
|
|
// timing out trying to connect to the DNS server.
|
|
DialTimeout caddy.Duration `json:"dial_timeout,omitempty"`
|
|
|
|
// If Resolver is configured, how long to wait before
|
|
// spawning an RFC 6555 Fast Fallback connection.
|
|
// A negative value disables this.
|
|
FallbackDelay caddy.Duration `json:"dial_fallback_delay,omitempty"`
|
|
|
|
// The IP versions to resolve for. By default, both
|
|
// "ipv4" and "ipv6" will be enabled, which
|
|
// correspond to A and AAAA records respectively.
|
|
Versions *IPVersions `json:"versions,omitempty"`
|
|
|
|
resolver *net.Resolver
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// CaddyModule returns the Caddy module information.
|
|
func (AUpstreams) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "http.reverse_proxy.upstreams.a",
|
|
New: func() caddy.Module { return new(AUpstreams) },
|
|
}
|
|
}
|
|
|
|
func (au *AUpstreams) Provision(ctx caddy.Context) error {
|
|
au.logger = ctx.Logger()
|
|
if au.Refresh == 0 {
|
|
au.Refresh = caddy.Duration(time.Minute)
|
|
}
|
|
if au.Port == "" {
|
|
au.Port = "80"
|
|
}
|
|
|
|
if au.Resolver != nil {
|
|
err := au.Resolver.ParseAddresses()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d := &net.Dialer{
|
|
Timeout: time.Duration(au.DialTimeout),
|
|
FallbackDelay: time.Duration(au.FallbackDelay),
|
|
}
|
|
au.resolver = &net.Resolver{
|
|
PreferGo: true,
|
|
Dial: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
|
//nolint:gosec
|
|
addr := au.Resolver.netAddrs[weakrand.Intn(len(au.Resolver.netAddrs))]
|
|
return d.DialContext(ctx, addr.Network, addr.JoinHostPort(0))
|
|
},
|
|
}
|
|
}
|
|
if au.resolver == nil {
|
|
au.resolver = net.DefaultResolver
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (au AUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
|
|
repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
|
|
|
|
// Map ipVersion early, so we can use it as part of the cache-key.
|
|
// This should be fairly inexpensive and comes and the upside of
|
|
// allowing the same dynamic upstream (name + port combination)
|
|
// to be used multiple times with different ip versions.
|
|
//
|
|
// It also forced a cache-miss if a previously cached dynamic
|
|
// upstream changes its ip version, e.g. after a config reload,
|
|
// while keeping the cache-invalidation as simple as it currently is.
|
|
ipVersion := resolveIpVersion(au.Versions)
|
|
|
|
auStr := repl.ReplaceAll(au.String()+ipVersion, "")
|
|
|
|
// first, use a cheap read-lock to return a cached result quickly
|
|
aAaaaMu.RLock()
|
|
cached := aAaaa[auStr]
|
|
aAaaaMu.RUnlock()
|
|
if cached.isFresh() {
|
|
return allNew(cached.upstreams), nil
|
|
}
|
|
|
|
// otherwise, obtain a write-lock to update the cached value
|
|
aAaaaMu.Lock()
|
|
defer aAaaaMu.Unlock()
|
|
|
|
// check to see if it's still stale, since we're now in a different
|
|
// lock from when we first checked freshness; another goroutine might
|
|
// have refreshed it in the meantime before we re-obtained our lock
|
|
cached = aAaaa[auStr]
|
|
if cached.isFresh() {
|
|
return allNew(cached.upstreams), nil
|
|
}
|
|
|
|
name := repl.ReplaceAll(au.Name, "")
|
|
port := repl.ReplaceAll(au.Port, "")
|
|
|
|
if c := au.logger.Check(zapcore.DebugLevel, "refreshing A upstreams"); c != nil {
|
|
c.Write(
|
|
zap.String("version", ipVersion),
|
|
zap.String("name", name),
|
|
zap.String("port", port),
|
|
)
|
|
}
|
|
|
|
ips, err := au.resolver.LookupIP(r.Context(), ipVersion, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
upstreams := make([]Upstream, len(ips))
|
|
for i, ip := range ips {
|
|
if c := au.logger.Check(zapcore.DebugLevel, "discovered A record"); c != nil {
|
|
c.Write(zap.String("ip", ip.String()))
|
|
}
|
|
upstreams[i] = Upstream{
|
|
Dial: net.JoinHostPort(ip.String(), port),
|
|
}
|
|
}
|
|
|
|
// before adding a new one to the cache (as opposed to replacing stale one), make room if cache is full
|
|
if cached.freshness.IsZero() && len(aAaaa) >= 100 {
|
|
for randomKey := range aAaaa {
|
|
delete(aAaaa, randomKey)
|
|
break
|
|
}
|
|
}
|
|
|
|
aAaaa[auStr] = aLookup{
|
|
aUpstreams: au,
|
|
freshness: time.Now(),
|
|
upstreams: upstreams,
|
|
}
|
|
|
|
return allNew(upstreams), nil
|
|
}
|
|
|
|
func (au AUpstreams) String() string { return net.JoinHostPort(au.Name, au.Port) }
|
|
|
|
type aLookup struct {
|
|
aUpstreams AUpstreams
|
|
freshness time.Time
|
|
upstreams []Upstream
|
|
}
|
|
|
|
func (al aLookup) isFresh() bool {
|
|
return time.Since(al.freshness) < time.Duration(al.aUpstreams.Refresh)
|
|
}
|
|
|
|
// MultiUpstreams is a single dynamic upstream source that
|
|
// aggregates the results of multiple dynamic upstream sources.
|
|
// All configured sources will be queried in order, with their
|
|
// results appended to the end of the list. Errors returned
|
|
// from individual sources will be logged and the next source
|
|
// will continue to be invoked.
|
|
//
|
|
// This module makes it easy to implement redundant cluster
|
|
// failovers, especially in conjunction with the `first` load
|
|
// balancing policy: if the first source returns an error or
|
|
// no upstreams, the second source's upstreams will be used
|
|
// naturally.
|
|
type MultiUpstreams struct {
|
|
// The list of upstream source modules to get upstreams from.
|
|
// They will be queried in order, with their results appended
|
|
// in the order they are returned.
|
|
SourcesRaw []json.RawMessage `json:"sources,omitempty" caddy:"namespace=http.reverse_proxy.upstreams inline_key=source"`
|
|
sources []UpstreamSource
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// CaddyModule returns the Caddy module information.
|
|
func (MultiUpstreams) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "http.reverse_proxy.upstreams.multi",
|
|
New: func() caddy.Module { return new(MultiUpstreams) },
|
|
}
|
|
}
|
|
|
|
func (mu *MultiUpstreams) Provision(ctx caddy.Context) error {
|
|
mu.logger = ctx.Logger()
|
|
|
|
if mu.SourcesRaw != nil {
|
|
mod, err := ctx.LoadModule(mu, "SourcesRaw")
|
|
if err != nil {
|
|
return fmt.Errorf("loading upstream source modules: %v", err)
|
|
}
|
|
for _, src := range mod.([]any) {
|
|
mu.sources = append(mu.sources, src.(UpstreamSource))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mu MultiUpstreams) GetUpstreams(r *http.Request) ([]*Upstream, error) {
|
|
var upstreams []*Upstream
|
|
|
|
for i, src := range mu.sources {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return upstreams, context.Canceled
|
|
default:
|
|
}
|
|
|
|
up, err := src.GetUpstreams(r)
|
|
if err != nil {
|
|
if c := mu.logger.Check(zapcore.ErrorLevel, "upstream source returned error"); c != nil {
|
|
c.Write(
|
|
zap.Int("source_idx", i),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
} else if len(up) == 0 {
|
|
if c := mu.logger.Check(zapcore.WarnLevel, "upstream source returned 0 upstreams"); c != nil {
|
|
c.Write(zap.Int("source_idx", i))
|
|
}
|
|
} else {
|
|
upstreams = append(upstreams, up...)
|
|
}
|
|
}
|
|
|
|
return upstreams, nil
|
|
}
|
|
|
|
// UpstreamResolver holds the set of addresses of DNS resolvers of
|
|
// upstream addresses
|
|
type UpstreamResolver struct {
|
|
// The addresses of DNS resolvers to use when looking up the addresses of proxy upstreams.
|
|
// It accepts [network addresses](/docs/conventions#network-addresses)
|
|
// with port range of only 1. If the host is an IP address, it will be dialed directly to resolve the upstream server.
|
|
// If the host is not an IP address, the addresses are resolved using the [name resolution convention](https://golang.org/pkg/net/#hdr-Name_Resolution) of the Go standard library.
|
|
// If the array contains more than 1 resolver address, one is chosen at random.
|
|
Addresses []string `json:"addresses,omitempty"`
|
|
netAddrs []caddy.NetworkAddress
|
|
}
|
|
|
|
// ParseAddresses parses all the configured network addresses
|
|
// and ensures they're ready to be used.
|
|
func (u *UpstreamResolver) ParseAddresses() error {
|
|
for _, v := range u.Addresses {
|
|
addr, err := caddy.ParseNetworkAddressWithDefaults(v, "udp", 53)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if addr.PortRangeSize() != 1 {
|
|
return fmt.Errorf("resolver address must have exactly one address; cannot call %v", addr)
|
|
}
|
|
u.netAddrs = append(u.netAddrs, addr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func allNew(upstreams []Upstream) []*Upstream {
|
|
results := make([]*Upstream, len(upstreams))
|
|
for i := range upstreams {
|
|
results[i] = &Upstream{Dial: upstreams[i].Dial}
|
|
}
|
|
return results
|
|
}
|
|
|
|
var (
|
|
srvs = make(map[string]srvLookup)
|
|
srvsMu sync.RWMutex
|
|
|
|
aAaaa = make(map[string]aLookup)
|
|
aAaaaMu sync.RWMutex
|
|
)
|
|
|
|
// Interface guards
|
|
var (
|
|
_ caddy.Provisioner = (*SRVUpstreams)(nil)
|
|
_ UpstreamSource = (*SRVUpstreams)(nil)
|
|
_ caddy.Provisioner = (*AUpstreams)(nil)
|
|
_ UpstreamSource = (*AUpstreams)(nil)
|
|
)
|