diff --git a/Makefile b/Makefile index b331afd8..ab7efcc4 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ pkgs = $(shell go list ./...) +gofiles := $(shell find . -name "*.go" -type f -not -path "./vendor/*") + BUILD_TAG = $(shell git tag --points-at HEAD) @@ -13,9 +15,8 @@ update: dep ensure -update format: - dep ensure go fmt $(pkgs) - gofmt -w -s . + gofmt -w -s $(gofiles) build: go build diff --git a/README.md b/README.md index e410f6ff..c21f440a 100644 --- a/README.md +++ b/README.md @@ -327,7 +327,7 @@ an instant cache flush may be built on top of cache namespaces - just switch to to flush the cache. ### Security -`Chproxy` removes all the query params from input requests (except the `query`, `database`, `default_format`, `compress`, `decompress`, `enable_http_compression`) +`Chproxy` removes all the query params from input requests (except the user's [params](https://github.com/Vertamedia/chproxy/blob/master/config#param_groups_config) and the `query`, `database`, `default_format`, `compress`, `decompress`, `enable_http_compression`) before proxying them to `ClickHouse` nodes. This prevents from unsafe overriding of various `ClickHouse` [settings](http://clickhouse-docs.readthedocs.io/en/latest/interfaces/http_interface.html). @@ -391,6 +391,30 @@ network_groups: - name: "reporting-apps" networks: ["10.10.10.0/24"] +# Optional lists of query params to send with each proxied request to ClickHouse. +# These lists may be used for overriding ClickHouse settings on a per-user basis. +param_groups: + # Group name, which may be passed into `params` option on the `user` level. + - name: "cron-job" + # List of key-value params to send + params: + - key: "max_memory_usage" + value: "40000000000" + + - key: "max_bytes_before_external_group_by" + value: "20000000000" + + - name: "web" + params: + - key: "max_memory_usage" + value: "5000000000" + + - key: "max_columns_to_read" + value: "30" + + - key: "max_execution_time" + value: "30" + # Settings for `chproxy` input interfaces. server: # Configs for input http interface. @@ -467,6 +491,12 @@ users: # By default responses aren't cached. cache: "longterm" + # An optional group of params to send to ClickHouse with each proxied request. + # These params may be set in param_groups block. + # + # By default no additional params are sent to ClickHouse. + params: "web" + # The maximum number of requests that may wait for their chance # to be executed because they cannot run now due to the current limits. # @@ -488,6 +518,8 @@ users: to_user: "default" allowed_networks: ["office", "1.2.3.0/24"] + params: "cron-job" + # The maximum number of concurrently running queries for the user. # # By default there is no limit on the number of concurrently diff --git a/cache/cache.go b/cache/cache.go index d5ab1473..79ffec7b 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -79,12 +79,15 @@ type Key struct { // Namespace is an optional cache namespace. Namespace string + + // UserParamsHash must contain hashed value of users params + UserParamsHash uint32 } // String returns string representation of the key. func (k *Key) String() string { - s := fmt.Sprintf("V%d; Query=%q; AcceptEncoding=%q; DefaultFormat=%q; Database=%q; Compress=%q; EnableHTTPCompression=%q; Namespace=%q", - cacheVersion, k.Query, k.AcceptEncoding, k.DefaultFormat, k.Database, k.Compress, k.EnableHTTPCompression, k.Namespace) + s := fmt.Sprintf("V%d; Query=%q; AcceptEncoding=%q; DefaultFormat=%q; Database=%q; Compress=%q; EnableHTTPCompression=%q; Namespace=%q; UserParams=%d", + cacheVersion, k.Query, k.AcceptEncoding, k.DefaultFormat, k.Database, k.Compress, k.EnableHTTPCompression, k.Namespace, k.UserParamsHash) h := sha256.Sum256([]byte(s)) // The first 16 bytes of the hash should be enough diff --git a/cache/cache_test.go b/cache/cache_test.go index 3d1280e7..75cb598d 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -48,14 +48,14 @@ func TestKeyString(t *testing.T) { key: &Key{ Query: []byte("SELECT 1 FROM system.numbers LIMIT 10"), }, - expected: "cd8ba039ef5df8287ff91a3907e21fcf", + expected: "b84443ea3b7651f8eed84ad70cc17d55", }, { key: &Key{ Query: []byte("SELECT 1 FROM system.numbers LIMIT 10"), AcceptEncoding: "gzip", }, - expected: "e30d9db1d8f6a0844b7c6047d14a6c8b", + expected: "baece3cc15d1aa1516e2729409ece703", }, { key: &Key{ @@ -63,7 +63,7 @@ func TestKeyString(t *testing.T) { AcceptEncoding: "gzip", DefaultFormat: "JSON", }, - expected: "70bd83ab8def6df2c65214b37410050c", + expected: "c238bde938f93419e93b5b7b0341f1ef", }, { key: &Key{ @@ -72,7 +72,7 @@ func TestKeyString(t *testing.T) { DefaultFormat: "JSON", Database: "foobar", }, - expected: "3fb481313cd5a0061832de383183ac3a", + expected: "e55de3951f08688a34e589caaeed437f", }, { key: &Key{ @@ -82,7 +82,7 @@ func TestKeyString(t *testing.T) { Database: "foobar", Namespace: "ns123", }, - expected: "556f114207e2f8bc4cbd574b5035b25e", + expected: "a8676b65119982a1fa135005e0583a07", }, { key: &Key{ @@ -93,7 +93,7 @@ func TestKeyString(t *testing.T) { Compress: "1", Namespace: "ns123", }, - expected: "5fe3d904e7e76c5fe7ba617567a4cb7a", + expected: "9a2ad211524d5c8983d43784fd59677d", }, } diff --git a/config/README.md b/config/README.md index a543c110..1db32e6d 100644 --- a/config/README.md +++ b/config/README.md @@ -20,6 +20,10 @@ hack_me_please | default = false [optional] caches: - ... +# Named list of parameters to apply to each query +param_groups: + - ... + # Named network lists network_groups: ... [optional] @@ -72,6 +76,16 @@ expire: grace_time: ``` +### +```yml +# Group name, which may be passed into `params` option on the `user` level. +- name: +# List of key-value params to send +params: + - key: + value: +``` + ### ```yml # HTTP server configuration @@ -179,9 +193,13 @@ allow_cors: | optional | default = false # Each list item could be IP address or subnet mask allowed_networks: , ... | optional -# Optioanl reponse cache name from +# Optional response cache name from # By default responses aren't cached. cache: | optional + +# Optional group of params name to send to ClickHouse with each proxied request from +# By default no additional params are sent to ClickHouse. +params: | optional ``` ### diff --git a/config/config.go b/config/config.go index 29e9e3f1..8faae86d 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,8 @@ type Config struct { Caches []Cache `yaml:"caches,omitempty"` + ParamGroups []ParamGroup `yaml:"param_groups,omitempty"` + // Catches all undefined fields XXX map[string]interface{} `yaml:",inline"` @@ -411,6 +413,9 @@ type User struct { // Name of Cache configuration to use for responses of this user Cache string `yaml:"cache,omitempty"` + // Name of ParamGroup to use + Params string `yaml:"params,omitempty"` + // Catches all undefined fields XXX map[string]interface{} `yaml:",inline"` } @@ -515,6 +520,42 @@ func (c *Cache) UnmarshalYAML(unmarshal func(interface{}) error) error { return checkOverflow(c.XXX, fmt.Sprintf("cache %q", c.Name)) } +// ParamGroup describes named group of GET params +// for sending with each query +type ParamGroup struct { + // Name of configuration for further assign + Name string `yaml:"name"` + + // Params contains a list of GET params + Params []Param `yaml:"params"` + + // Catches all undefined fields + XXX map[string]interface{} `yaml:",inline"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (pg *ParamGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain ParamGroup + if err := unmarshal((*plain)(pg)); err != nil { + return err + } + if len(pg.Name) == 0 { + return fmt.Errorf("`param_group.name` must be specified") + } + if len(pg.Params) == 0 { + return fmt.Errorf("`param_group.params` must contain at least one param") + } + return checkOverflow(pg.XXX, fmt.Sprintf("param_group %q", pg.Name)) +} + +// Params describes URL param value +type Param struct { + // Key is a name of params + Key string `yaml:"key"` + // Value is a value of param + Value string `yaml:"value"` +} + // ClusterUser describes simplest configuration type ClusterUser struct { // User name in ClickHouse users.xml config diff --git a/config/config_test.go b/config/config_test.go index fd4cbaa7..c6123eee 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -104,6 +104,40 @@ func TestLoadConfig(t *testing.T) { HeartBeatInterval: Duration(5 * time.Second), }, }, + + ParamGroups: []ParamGroup{ + { + Name: "cron-job", + Params: []Param{ + { + Key: "max_memory_usage", + Value: "40000000000", + }, + { + Key: "max_bytes_before_external_group_by", + Value: "20000000000", + }, + }, + }, + { + Name: "web", + Params: []Param{ + { + Key: "max_memory_usage", + Value: "5000000000", + }, + { + Key: "max_columns_to_read", + Value: "30", + }, + { + Key: "max_execution_time", + Value: "30", + }, + }, + }, + }, + Users: []User{ { Name: "web", @@ -116,6 +150,7 @@ func TestLoadConfig(t *testing.T) { MaxQueueSize: 100, MaxQueueTime: Duration(35 * time.Second), Cache: "longterm", + Params: "web", }, { Name: "default", diff --git a/config/testdata/full.yml b/config/testdata/full.yml index 6da12c4d..0a3c02df 100644 --- a/config/testdata/full.yml +++ b/config/testdata/full.yml @@ -51,6 +51,30 @@ network_groups: - name: "reporting-apps" networks: ["10.10.10.0/24"] +# Optional lists of query params to send with each proxied request to ClickHouse. +# These lists may be used for overriding ClickHouse settings on a per-user basis. +param_groups: + # Group name, which may be passed into `params` option on the `user` level. + - name: "cron-job" + # List of key-value params to send + params: + - key: "max_memory_usage" + value: "40000000000" + + - key: "max_bytes_before_external_group_by" + value: "20000000000" + + - name: "web" + params: + - key: "max_memory_usage" + value: "5000000000" + + - key: "max_columns_to_read" + value: "30" + + - key: "max_execution_time" + value: "30" + # Settings for `chproxy` input interfaces. server: # Configs for input http interface. @@ -80,6 +104,7 @@ server: # Certificates are automatically issued and renewed if this section # is present. # There is no need in cert_file and key_file if this section is present. + # Autocert requires application to listen on :80 port for certificate generation autocert: # Path to the directory where autocert certs are cached. cache_dir: "certs_dir" @@ -126,6 +151,12 @@ users: # By default responses aren't cached. cache: "longterm" + # An optional group of params to send to ClickHouse with each proxied request. + # These params may be set in param_groups block. + # + # By default no additional params are sent to ClickHouse. + params: "web" + # The maximum number of requests that may wait for their chance # to be executed because they cannot run now due to the current limits. # diff --git a/proxy.go b/proxy.go index 7825c32e..95cfe830 100644 --- a/proxy.go +++ b/proxy.go @@ -216,6 +216,7 @@ func (rp *reverseProxy) proxyRequest(s *scope, rw http.ResponseWriter, srw *stat err = fmt.Errorf("%s: %s; query: %q", s, timeoutErrMsg, q) respondWith(rw, err, http.StatusGatewayTimeout) srw.statusCode = http.StatusGatewayTimeout + default: panic(fmt.Sprintf("BUG: context.Context.Err() returned unexpected error: %s", err)) } @@ -250,6 +251,10 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h "cluster_user": s.labels["cluster_user"], } + var paramsHash uint32 + if s.user.params != nil { + paramsHash = s.user.params.key + } key := &cache.Key{ Query: skipLeadingComments(q), // sort `Accept-Encoding` header to get the same combination for different browsers @@ -259,6 +264,7 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h Compress: origParams.Get("compress"), EnableHTTPCompression: origParams.Get("enable_http_compression"), Namespace: origParams.Get("cache_namespace"), + UserParamsHash: paramsHash, } startTime := time.Now() @@ -346,7 +352,24 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error { caches[cc.Name] = tmpCache } - users, err := newUsers(cfg.Users, clusters, caches) + params := make(map[string]*paramsRegistry, len(cfg.ParamGroups)) + for _, p := range cfg.ParamGroups { + if _, ok := params[p.Name]; ok { + return fmt.Errorf("duplicate config for ParamGroups %q", p.Name) + } + params[p.Name], err = newParamsRegistry(p.Params) + if err != nil { + return fmt.Errorf("cannot initialize params %q: %s", p.Name, err) + } + } + + profile := &usersProfile{ + cfg: cfg.Users, + clusters: clusters, + caches: caches, + params: params, + } + users, err := profile.newUsers() if err != nil { return err } diff --git a/scope.go b/scope.go index 7ff579e9..6936e40f 100644 --- a/scope.go +++ b/scope.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "hash/fnv" "io/ioutil" "net" "net/http" @@ -308,8 +309,12 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { // Make new params to purify URL. params := make(url.Values) - // Set query_id as scope_id to have possibility to kill query if needed. - params.Set("query_id", s.id.String()) + // Set user params + if s.user.params != nil { + for _, param := range s.user.params.params { + params.Set(param.Key, param.Value) + } + } // Keep allowed params. origParams := req.URL.Query() @@ -332,9 +337,13 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { // disable cache for external_data queries params.Set("no_cache", "1") + log.Debugf("external data params detected - cache will be disabled") } } + // Set query_id as scope_id to have possibility to kill query if needed. + params.Set("query_id", s.id.String()) + req.URL.RawQuery = params.Encode() // Rewrite possible previous Basic Auth and send request @@ -382,6 +391,28 @@ func (s *scope) maxQueueTime() time.Duration { return d } +type paramsRegistry struct { + // key is a hashed concatenation of the params list + key uint32 + + params []config.Param +} + +func newParamsRegistry(params []config.Param) (*paramsRegistry, error) { + if len(params) == 0 { + return nil, fmt.Errorf("params can't be empty") + } + h := fnv.New32a() + for _, p := range params { + str := fmt.Sprintf("%s=%s&", p.Key, p.Value) + h.Write([]byte(str)) + } + return ¶msRegistry{ + key: h.Sum32(), + params: params, + }, nil +} + type user struct { name string password string @@ -406,11 +437,34 @@ type user struct { denyHTTPS bool allowCORS bool - cache *cache.Cache + cache *cache.Cache + params *paramsRegistry +} + +type usersProfile struct { + cfg []config.User + clusters map[string]*cluster + caches map[string]*cache.Cache + params map[string]*paramsRegistry } -func newUser(u config.User, clusters map[string]*cluster, caches map[string]*cache.Cache) (*user, error) { - c, ok := clusters[u.ToCluster] +func (up usersProfile) newUsers() (map[string]*user, error) { + users := make(map[string]*user, len(up.cfg)) + for _, u := range up.cfg { + if _, ok := users[u.Name]; ok { + return nil, fmt.Errorf("duplicate config for user %q", u.Name) + } + tmpU, err := up.newUser(u) + if err != nil { + return nil, fmt.Errorf("cannot initialize user %q: %s", u.Name, err) + } + users[u.Name] = tmpU + } + return users, nil +} + +func (up usersProfile) newUser(u config.User) (*user, error) { + c, ok := up.clusters[u.ToCluster] if !ok { return nil, fmt.Errorf("unknown `to_cluster` %q", u.ToCluster) } @@ -425,12 +479,20 @@ func newUser(u config.User, clusters map[string]*cluster, caches map[string]*cac var cc *cache.Cache if len(u.Cache) > 0 { - cc = caches[u.Cache] + cc = up.caches[u.Cache] if cc == nil { return nil, fmt.Errorf("unknown `cache` %q", u.Cache) } } + var params *paramsRegistry + if len(u.Params) > 0 { + params = up.params[u.Params] + if params == nil { + return nil, fmt.Errorf("unknown `params` %q", u.Params) + } + } + return &user{ name: u.Name, password: u.Password, @@ -446,24 +508,10 @@ func newUser(u config.User, clusters map[string]*cluster, caches map[string]*cac denyHTTPS: u.DenyHTTPS, allowCORS: u.AllowCORS, cache: cc, + params: params, }, nil } -func newUsers(cfg []config.User, clusters map[string]*cluster, caches map[string]*cache.Cache) (map[string]*user, error) { - users := make(map[string]*user, len(cfg)) - for _, u := range cfg { - if _, ok := users[u.Name]; ok { - return nil, fmt.Errorf("duplicate config for user %q", u.Name) - } - tmpU, err := newUser(u, clusters, caches) - if err != nil { - return nil, fmt.Errorf("cannot initialize user %q: %s", u.Name, err) - } - users[u.Name] = tmpU - } - return users, nil -} - type clusterUser struct { name string password string @@ -598,9 +646,7 @@ func (h *host) runHeartbeat(done <-chan struct{}) { } } -func (h *host) isActive() bool { - return atomic.LoadUint32(&h.active) == 1 -} +func (h *host) isActive() bool { return atomic.LoadUint32(&h.active) == 1 } func (r *replica) isActive() bool { // The replica is active if at least a single host is active. diff --git a/scope_test.go b/scope_test.go index 817bd212..addfd87d 100644 --- a/scope_test.go +++ b/scope_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/Vertamedia/chproxy/config" "github.com/prometheus/client_golang/prometheus" ) @@ -321,42 +322,77 @@ func TestDecorateRequest(t *testing.T) { request string contentType string method string + userParams *paramsRegistry expectedParams []string }{ { "http://127.0.0.1?user=default&password=default&query=SELECT&max_result_bytes=4000000&buffer_size=3000000&wait_end_of_query=1", "text/plain", "GET", + nil, []string{"query_id", "query"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&database=default&wait_end_of_query=1", "text/plain", "GET", + nil, []string{"query_id", "query", "database"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", "application/x-www-form-urlencoded", "POST", - []string{"query_id", "query"}, + ¶msRegistry{ + key: uint32(1), + params: []config.Param{ + { + Key: "max_threads", + Value: "1", + }, + }, + }, + []string{"query_id", "query", "max_threads"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", "multipart/form-data", "PUT", + ¶msRegistry{ + key: uint32(1), + params: []config.Param{ + { + Key: "query", + Value: "1", + }, + }, + }, []string{"query_id", "query"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_type_buzz=1&testdata_structure_foo=id+UInt32&testdata_format-bar=TSV", "multipart/form-data; boundary=foobar", "POST", - []string{"query_id", "query", "no_cache"}, + ¶msRegistry{ + key: uint32(1), + params: []config.Param{ + { + Key: "max_threads", + Value: "1", + }, + { + Key: "background_pool_size", + Value: "10", + }, + }, + }, + []string{"query_id", "query", "no_cache", "max_threads", "background_pool_size"}, }, { "http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV", "multipart/form-data; boundary=foobar", "POST", + nil, []string{"query_id", "testdata_structure", "testdata_format", "query", "no_cache"}, }, } @@ -370,7 +406,9 @@ func TestDecorateRequest(t *testing.T) { s := &scope{ id: newScopeID(), clusterUser: &clusterUser{}, - user: &user{}, + user: &user{ + params: tc.userParams, + }, host: &host{ addr: &url.URL{Host: "127.0.0.1"}, },