package e2ethanos
import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"math/big"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/promconfig/discovery/targetgroup"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/exthttp"
"github.com/thanos-io/objstore/providers/s3"
)
const (
infoLogLevel = "info"
)
var defaultBackoffConfig = backoff.Config{
Min: 300 * time.Millisecond,
Max: 600 * time.Millisecond,
MaxRetries: 50,
}
func wrapWithDefaults(opt e2e.StartOptions) e2e.StartOptions {
if opt.User == "" {
opt.User = strconv.Itoa(os.Getuid())
}
if opt.WaitReadyBackoff == nil {
opt.WaitReadyBackoff = &defaultBackoffConfig
}
return opt
}
const (
FeatureExemplarStorage = "exemplar-storage"
)
func DefaultPrometheusImage() string {
return "quay.io/prometheus/prometheus:v2.29.2"
}
func DefaultAlertmanagerImage() string {
return "quay.io/prometheus/alertmanager:v0.20.0"
}
func DefaultImage() string {
if os.Getenv("THANOS_IMAGE") != "" {
return os.Getenv("THANOS_IMAGE")
}
return "thanos"
}
func defaultPromHttpConfig() string {
return `basic_auth:
username: test
password: test
`
}
func NewPrometheus(e e2e.Environment, name, promConfig, webConfig, promImage string, enableFeatures ...string) *e2emon.InstrumentedRunnable {
f := e.Runnable(name).
WithPorts(map[string]int{"http": 9090}).
Future()
if err := os.MkdirAll(f.Dir(), 0750); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create prometheus dir"))}
}
if err := os.WriteFile(filepath.Join(f.Dir(), "prometheus.yml"), []byte(promConfig), 0600); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating prom config"))}
}
if len(webConfig) > 0 {
if err := os.WriteFile(filepath.Join(f.Dir(), "web-config.yml"), []byte(webConfig), 0600); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating web-config"))}
}
}
probe := e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200)
args := e2e.BuildArgs(map[string]string{
"--config.file": filepath.Join(f.InternalDir(), "prometheus.yml"),
"--storage.tsdb.path": f.InternalDir(),
"--storage.tsdb.max-block-duration": "2h",
"--log.level": infoLogLevel,
"--web.listen-address": ":9090",
})
if len(enableFeatures) > 0 {
args = append(args, fmt.Sprintf("--enable-feature=%s", strings.Join(enableFeatures, ",")))
}
if len(webConfig) > 0 {
args = append(args, fmt.Sprintf("--web.config.file=%s", filepath.Join(f.InternalDir(), "web-config.yml")))
probe = e2e.NewHTTPReadinessProbe("http", "/-/ready", 401, 401)
}
return e2emon.AsInstrumented(f.Init(wrapWithDefaults(e2e.StartOptions{
Image: promImage,
Command: e2e.NewCommandWithoutEntrypoint("prometheus", args...),
Readiness: probe,
})), "http")
}
func NewPrometheusWithSidecar(e e2e.Environment, name, promConfig, webConfig, promImage, minTime string, enableFeatures ...string) (*e2emon.InstrumentedRunnable, *e2emon.InstrumentedRunnable) {
return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, minTime, DefaultImage(), enableFeatures...)
}
func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, webConfig, promImage, minTime string, sidecarImage string, enableFeatures ...string) (*e2emon.InstrumentedRunnable, *e2emon.InstrumentedRunnable) {
prom := NewPrometheus(e, name, promConfig, webConfig, promImage, enableFeatures...)
args := map[string]string{
"--debug.name": fmt.Sprintf("sidecar-%v", name),
"--grpc-address": ":9091",
"--grpc-grace-period": "0s",
"--http-address": ":8080",
"--prometheus.url": "http://" + prom.InternalEndpoint("http"),
"--tsdb.path": prom.InternalDir(),
"--log.level": "debug",
}
if len(webConfig) > 0 {
args["--prometheus.http-client"] = defaultPromHttpConfig()
}
if minTime != "" {
args["--min-time"] = minTime
}
sidecarRunnable := e.Runnable(fmt.Sprintf("sidecar-%s", name)).WithPorts(map[string]int{"http": 8080, "grpc": 9091}).Init(wrapWithDefaults(e2e.StartOptions{
Image: sidecarImage,
Command: e2e.NewCommand("sidecar", e2e.BuildArgs(args)...),
Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200),
}))
sidecar := e2emon.AsInstrumented(sidecarRunnable, "http")
return prom, sidecar
}
type QuerierBuilder struct {
name string
routePrefix string
externalPrefix string
image string
storeAddresses []string
fileSDStoreAddresses []string
ruleAddresses []string
metadataAddresses []string
targetAddresses []string
exemplarAddresses []string
enableFeatures []string
endpoints []string
replicaLabels []string
tracingConfig string
e2e.Linkable
f e2e.FutureRunnable
}
func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string) *QuerierBuilder {
f := e.Runnable(fmt.Sprintf("querier-%v", name)).
WithPorts(map[string]int{
"http": 8080,
"grpc": 9091,
}).
Future()
return &QuerierBuilder{
Linkable: f,
f: f,
name: name,
storeAddresses: storeAddresses,
image: DefaultImage(),
replicaLabels: []string{replicaLabel},
}
}
func (q *QuerierBuilder) WithEnabledFeatures(enableFeatures []string) *QuerierBuilder {
q.enableFeatures = enableFeatures
return q
}
func (q *QuerierBuilder) WithImage(image string) *QuerierBuilder {
q.image = image
return q
}
func (q *QuerierBuilder) WithStoreAddresses(storeAddresses ...string) *QuerierBuilder {
q.storeAddresses = storeAddresses
return q
}
func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDStoreAddresses ...string) *QuerierBuilder {
q.fileSDStoreAddresses = fileSDStoreAddresses
return q
}
func (q *QuerierBuilder) WithRuleAddresses(ruleAddresses ...string) *QuerierBuilder {
q.ruleAddresses = ruleAddresses
return q
}
func (q *QuerierBuilder) WithTargetAddresses(targetAddresses ...string) *QuerierBuilder {
q.targetAddresses = targetAddresses
return q
}
func (q *QuerierBuilder) WithExemplarAddresses(exemplarAddresses ...string) *QuerierBuilder {
q.exemplarAddresses = exemplarAddresses
return q
}
func (q *QuerierBuilder) WithMetadataAddresses(metadataAddresses ...string) *QuerierBuilder {
q.metadataAddresses = metadataAddresses
return q
}
func (q *QuerierBuilder) WithEndpoints(endpoints ...string) *QuerierBuilder {
q.endpoints = endpoints
return q
}
func (q *QuerierBuilder) WithRoutePrefix(routePrefix string) *QuerierBuilder {
q.routePrefix = routePrefix
return q
}
func (q *QuerierBuilder) WithExternalPrefix(externalPrefix string) *QuerierBuilder {
q.externalPrefix = externalPrefix
return q
}
func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder {
q.tracingConfig = tracingConfig
return q
}
func (q *QuerierBuilder) WithReplicaLabels(labels ...string) *QuerierBuilder {
q.replicaLabels = labels
return q
}
func (q *QuerierBuilder) Init() *e2emon.InstrumentedRunnable {
args, err := q.collectArgs()
if err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(q.name, err)}
}
return e2emon.AsInstrumented(q.f.Init(wrapWithDefaults(e2e.StartOptions{
Image: q.image,
Command: e2e.NewCommand("query", args...),
Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200),
})), "http")
}
const replicaLabel = "replica"
func (q *QuerierBuilder) collectArgs() ([]string, error) {
args := e2e.BuildArgs(map[string]string{
"--debug.name": fmt.Sprintf("querier-%v", q.name),
"--grpc-address": ":9091",
"--grpc-grace-period": "0s",
"--http-address": ":8080",
"--store.sd-dns-interval": "5s",
"--log.level": infoLogLevel,
"--query.max-concurrent": "1",
"--store.sd-interval": "5s",
})
for _, repl := range q.replicaLabels {
args = append(args, "--query.replica-label="+repl)
}
for _, addr := range q.storeAddresses {
args = append(args, "--store="+addr)
}
for _, addr := range q.ruleAddresses {
args = append(args, "--rule="+addr)
}
for _, addr := range q.targetAddresses {
args = append(args, "--target="+addr)
}
for _, addr := range q.metadataAddresses {
args = append(args, "--metadata="+addr)
}
for _, addr := range q.exemplarAddresses {
args = append(args, "--exemplar="+addr)
}
for _, feature := range q.enableFeatures {
args = append(args, "--enable-feature="+feature)
}
for _, addr := range q.endpoints {
args = append(args, "--endpoint="+addr)
}
if len(q.fileSDStoreAddresses) > 0 {
if err := os.MkdirAll(q.Dir(), 0750); err != nil {
return nil, errors.Wrap(err, "create query dir failed")
}
fileSD := []*targetgroup.Group{{}}
for _, a := range q.fileSDStoreAddresses {
fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)})
}
b, err := yaml.Marshal(fileSD)
if err != nil {
return nil, err
}
if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil {
return nil, errors.Wrap(err, "creating query SD config failed")
}
args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml"))
}
if q.routePrefix != "" {
args = append(args, "--web.route-prefix="+q.routePrefix)
}
if q.externalPrefix != "" {
args = append(args, "--web.external-prefix="+q.externalPrefix)
}
if q.tracingConfig != "" {
args = append(args, "--tracing.config="+q.tracingConfig)
}
return args, nil
}
func NewReverseProxy(e e2e.Environment, name, tenantID, target string) *e2emon.InstrumentedRunnable {
conf := fmt.Sprintf(`
events {
worker_connections 1024;
}
http {
server {
listen 80;
server_name _;
location / {
proxy_set_header THANOS-TENANT %s;
proxy_pass %s;
}
}
}
`, tenantID, target)
f := e.Runnable(fmt.Sprintf("nginx-%s", name)).
WithPorts(map[string]int{"http": 80}).
Future()
if err := os.MkdirAll(f.Dir(), 0750); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create store dir"))}
}
if err := os.WriteFile(filepath.Join(f.Dir(), "nginx.conf"), []byte(conf), 0600); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating nginx config file failed"))}
}
return e2emon.AsInstrumented(f.Init(e2e.StartOptions{
Image: "docker.io/nginx:1.21.1-alpine",
Volumes: []string{filepath.Join(f.Dir(), "/nginx.conf") + ":/etc/nginx/nginx.conf:ro"},
WaitReadyBackoff: &defaultBackoffConfig,
}), "http")
}
func NewMinio(e e2e.Environment, name, bktName string) *e2emon.InstrumentedRunnable {
image := "minio/minio:RELEASE.2022-07-30T05-21-40Z"
minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master"
httpsPort := 8090
consolePort := 8080
f := e.Runnable(fmt.Sprintf("minio-%s", name)).
WithPorts(map[string]int{"https": httpsPort, "console": consolePort}).
Future()
if err := os.MkdirAll(filepath.Join(f.Dir(), "certs", "CAs"), 0750); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create certs dir"))}
}
if err := genCerts(
filepath.Join(f.Dir(), "certs", "public.crt"),
filepath.Join(f.Dir(), "certs", "private.key"),
filepath.Join(f.Dir(), "certs", "CAs", "ca.crt"),
fmt.Sprintf("%s-minio-%s", e.Name(), name),
); err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "fail to generate certs"))}
}
commands := []string{
fmt.Sprintf("curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", minioKESGithubContent, minioKESGithubContent),
fmt.Sprintf("mkdir -p /data/%s && minio server --certs-dir %s/certs --address :%v --console-address :%v /data", bktName, f.InternalDir(), httpsPort, consolePort),
}
minio := e2emon.AsInstrumented(f.Init(e2e.StartOptions{
Image: image,
Command: e2e.NewCommandWithoutEntrypoint("sh", "-c", strings.Join(commands, " && ")),
Readiness: e2e.NewHTTPSReadinessProbe("console", "/", 200, 200),
EnvVars: map[string]string{
"MINIO_ROOT_USER": e2edb.MinioAccessKey,
"MINIO_ROOT_PASSWORD": e2edb.MinioSecretKey,
"MINIO_BROWSER": "on",
"ENABLE_HTTPS": "1",
"MINIO_KMS_KES_ENDPOINT": "https://play.min.io:7373",
"MINIO_KMS_KES_KEY_FILE": "root.key",
"MINIO_KMS_KES_CERT_FILE": "root.cert",
"MINIO_KMS_KES_KEY_NAME": "my-minio-key",
},
}), "https")
return minio
}
func NewMemcached(e e2e.Environment, name string) *e2emon.InstrumentedRunnable {
return e2emon.AsInstrumented(e.Runnable(fmt.Sprintf("memcached-%s", name)).
WithPorts(map[string]int{"memcached": 11211}).
Init(e2e.StartOptions{
Image: "docker.io/memcached:1.6.3-alpine",
Command: e2e.NewCommand("memcached", []string{"-m 1024", "-I 1m", "-c 1024", "-v"}...),
User: strconv.Itoa(os.Getuid()),
WaitReadyBackoff: &defaultBackoffConfig,
}), "memcached")
}
func NewToolsBucketWeb(
e e2e.Environment,
name string,
bucketConfig client.BucketConfig,
routePrefix,
externalPrefix string,
minTime string,
maxTime string,
relabelConfig string,
) *e2emon.InstrumentedRunnable {
bktConfigBytes, err := yaml.Marshal(bucketConfig)
if err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "generate tools bucket web config file: %v", bucketConfig))}
}
f := e.Runnable(fmt.Sprintf("toolsBucketWeb-%s", name)).
WithPorts(map[string]int{"http": 8080, "grpc": 9091}).
Future()
args := e2e.BuildArgs(map[string]string{
"--debug.name": fmt.Sprintf("toolsBucketWeb-%s", name),
"--http-address": ":8080",
"--log.level": infoLogLevel,
"--objstore.config": string(bktConfigBytes),
})
if routePrefix != "" {
args = append(args, "--web.route-prefix="+routePrefix)
}
if externalPrefix != "" {
args = append(args, "--web.external-prefix="+externalPrefix)
}
if minTime != "" {
args = append(args, "--min-time="+minTime)
}
if maxTime != "" {
args = append(args, "--max-time="+maxTime)
}
if relabelConfig != "" {
args = append(args, "--selector.relabel-config="+relabelConfig)
}
args = append([]string{"bucket", "web"}, args...)
return e2emon.AsInstrumented(f.Init(wrapWithDefaults(e2e.StartOptions{
Image: DefaultImage(),
Command: e2e.NewCommand("tools", args...),
Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200),
})), "http")
}
func genCerts(certPath, privkeyPath, caPath, serverName string) error {
var caRoot = &x509.Certificate{
SerialNumber: big.NewInt(2019),
NotAfter: time.Now().AddDate(10, 0, 0),
IsCA: true,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
var cert = &x509.Certificate{
SerialNumber: big.NewInt(1658),
DNSNames: []string{serverName},
IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")},
NotAfter: time.Now().AddDate(10, 0, 0),
SubjectKeyId: []byte{1, 2, 3},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature,
}
caPrivKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return err
}
certPrivKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return err
}
caBytes, err := x509.CreateCertificate(rand.Reader, caRoot, caRoot, &caPrivKey.PublicKey, caPrivKey)
if err != nil {
return err
}
caPEM := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: caBytes,
})
err = os.WriteFile(caPath, caPEM, 0644)
if err != nil {
return err
}
certBytes, err := x509.CreateCertificate(rand.Reader, cert, caRoot, &certPrivKey.PublicKey, caPrivKey)
if err != nil {
return err
}
certPEM := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: certBytes,
})
err = os.WriteFile(certPath, certPEM, 0644)
if err != nil {
return err
}
certPrivKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(certPrivKey),
})
err = os.WriteFile(privkeyPath, certPrivKeyPEM, 0644)
if err != nil {
return err
}
return nil
}
func NewS3Config(bucket, endpoint, basePath string) s3.Config {
return s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: endpoint,
Insecure: false,
HTTPConfig: exthttp.HTTPConfig{
TLSConfig: exthttp.TLSConfig{
CAFile: filepath.Join(basePath, "certs", "CAs", "ca.crt"),
CertFile: filepath.Join(basePath, "certs", "public.crt"),
KeyFile: filepath.Join(basePath, "certs", "private.key"),
},
},
BucketLookupType: s3.AutoLookup,
}
}
var QueryUpWithoutInstance = func() string { return "sum(up) without (instance)" }
const LocalPrometheusTarget = "localhost:9090"
func DefaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string, scrapeTargets ...string) string {
var targets string
if len(scrapeTargets) > 0 {
targets = strings.Join(scrapeTargets, ",")
}
config := fmt.Sprintf(`
global:
external_labels:
prometheus: %v
replica: %v
`, name, replica)
if targets != "" {
config = fmt.Sprintf(`
%s
scrape_configs:
- job_name: 'myself'
# Quick scrapes for test purposes.
scrape_interval: 1s
scrape_timeout: 1s
static_configs:
- targets: [%s]
relabel_configs:
- source_labels: ['__address__']
regex: '^.+:80$'
action: drop
`, config, targets)
}
if remoteWriteEndpoint != "" {
config = fmt.Sprintf(`
%s
remote_write:
- url: "%s"
# Don't spam receiver on mistake.
queue_config:
min_backoff: 2s
max_backoff: 10s
`, config, remoteWriteEndpoint)
}
if ruleFile != "" {
config = fmt.Sprintf(`
%s
rule_files:
- "%s"
`, config, ruleFile)
}
return config
}