Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Refactor. Separate server initialization from server options
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Sep 2, 2019
commit 1bf2966bcfcd651b435520ae5ae50c685215be40
77 changes: 42 additions & 35 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -244,41 +245,8 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

// defaultGRPCServerOpts returns default gRPC server opts that includes:
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, met *grpc_prometheus.ServerMetrics, cert, key, clientCA string) ([]grpc.ServerOption, error) {
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)

panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}

reg.MustRegister(met, panicsTotal)
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}
func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}

if key == "" && cert == "" {
if clientCA != "" {
Expand Down Expand Up @@ -325,6 +293,45 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)
panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
reg.MustRegister(met, panicsTotal)

grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
opts = append(opts,
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, srv)
met.InitializeMetrics(s)

return s
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
Expand Down Expand Up @@ -441,15 +440,11 @@ func runQuery(
}
logger := log.With(logger, "component", component.Query.String())

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, srvCert, srvKey, srvClientCA)
opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, proxy, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -25,7 +24,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -268,14 +266,11 @@ func runReceive(
db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,7 +50,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -494,14 +492,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, store, opts)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -221,14 +219,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, promStore, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
11 changes: 2 additions & 9 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -17,8 +16,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -188,15 +185,11 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down