Skip to content

Commit

Permalink
Merge branch 'master' into chore.removeDeprecatedMergedSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jun 14, 2023
2 parents b597db9 + af72e90 commit b662575
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 84 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## [1.9.6](https://github.com/rudderlabs/rudder-server/compare/v1.9.5...v1.9.6) (2023-06-09)


### Bug Fixes

* warehouse proxy endpoints ([#3476](https://github.com/rudderlabs/rudder-server/issues/3476)) ([fda977f](https://github.com/rudderlabs/rudder-server/commit/fda977f9e440aaa21337ff620c8fb8fe68385b2a))

## [1.9.5](https://github.com/rudderlabs/rudder-server/compare/v1.9.4...v1.9.5) (2023-06-07)


### Bug Fixes

* flag for warehouse altering ([#3460](https://github.com/rudderlabs/rudder-server/issues/3460)) ([472d310](https://github.com/rudderlabs/rudder-server/commit/472d3102fd04e91da2e832737dbb42ba2671dc3c))

## [1.9.4](https://github.com/rudderlabs/rudder-server/compare/v1.9.3...v1.9.4) (2023-06-06)


Expand Down
27 changes: 9 additions & 18 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,21 +956,6 @@ func (gateway *HandleT) beaconBatchHandler(w http.ResponseWriter, r *http.Reques
gateway.beaconHandler(w, r, "batch")
}

func warehouseHandler(w http.ResponseWriter, r *http.Request) {
origin, err := url.Parse(misc.GetWarehouseURL())
if err != nil {
http.Error(w, err.Error(), 404)
return
}
// gateway.logger.LogRequest(r)
director := func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = origin.Host
}
proxy := &httputil.ReverseProxy{Director: director}
proxy.ServeHTTP(w, r)
}

// ProcessRequest throws a webRequest into the queue and waits for the response before returning
func (*RegularRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
done := make(chan string, 1)
Expand Down Expand Up @@ -1327,11 +1312,17 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
r.Post("/screen", gateway.webScreenHandler)
r.Post("/track", gateway.webTrackHandler)
r.Post("/webhook", gateway.webhookHandler.RequestHandler)
r.Post("/warehouse", warehouseHandler)
r.Post("/warehouse/pending-events", gateway.whProxy.ServeHTTP)

r.Get("/webhook", gateway.webhookHandler.RequestHandler)
r.Get("/warehouse", warehouseHandler)

r.Route("/warehouse", func(r chi.Router) {
r.Post("/pending-events", gateway.whProxy.ServeHTTP)
r.Post("/trigger-upload", gateway.whProxy.ServeHTTP)
r.Post("/jobs", gateway.whProxy.ServeHTTP)
r.Post("/fetch-tables", gateway.whProxy.ServeHTTP)

r.Get("/jobs/status", gateway.whProxy.ServeHTTP)
})
})

srvMux.Get("/health", WithContentType("application/json; charset=utf-8", app.LivenessHandler(gateway.jobsDB)))
Expand Down
78 changes: 28 additions & 50 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"testing"
"time"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -350,10 +353,18 @@ var _ = Describe("Gateway", func() {
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "OK")
}))
serverURL = whServer.URL
parsedURL, err := url.Parse(serverURL)
WHURL := whServer.URL
parsedURL, err := url.Parse(WHURL)
Expect(err).To(BeNil())
whPort := parsedURL.Port()
GinkgoT().Setenv("RSERVER_WAREHOUSE_WEB_PORT", whPort)

serverPort, err := kithelper.GetFreePort()
Expect(err).To(BeNil())
config.Set("Warehouse.webPort", parsedURL.Port())
serverURL = fmt.Sprintf("http://localhost:%d", serverPort)
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

loadConfig()

gateway = &HandleT{}
err = gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
Expand Down Expand Up @@ -381,7 +392,6 @@ var _ = Describe("Gateway", func() {
}
verifyEndpoint := func(endpoints []string, method string) {
client := &http.Client{}
// baseURL := "http://localhost:8080"
for _, ep := range endpoints {
url := fmt.Sprintf("%s%s", serverURL, ep)
var req *http.Request
Expand Down Expand Up @@ -420,11 +430,14 @@ var _ = Describe("Gateway", func() {
close(wait)
}()
Eventually(func() bool {
resp, _ := http.Get(fmt.Sprintf("%s/version", serverURL))
resp, err := http.Get(fmt.Sprintf("%s/version", serverURL))
if err != nil {
return false
}
return resp.StatusCode == http.StatusOK
}, time.Second*10, time.Second).Should(BeTrue())

getEndpoint, postEndpoints, deleteEndpoints := getEndpointMethodMap()
getEndpoint, postEndpoints, deleteEndpoints := endpointsToVerify()
verifyEndpoint(getEndpoint, http.MethodGet)
verifyEndpoint(postEndpoints, http.MethodPost)
verifyEndpoint(deleteEndpoints, http.MethodDelete)
Expand Down Expand Up @@ -987,43 +1000,6 @@ var _ = Describe("Gateway", func() {
})
})

Context("Warehouse proxy", func() {
DescribeTable("forwarding requests to warehouse with different response codes",
func(url string, code int, payload string) {
gateway := &HandleT{}
whMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expect(r.URL.String()).To(Equal(url))
Expect(r.Body)
Expect(r.Body).To(Not(BeNil()))
defer func() { _ = r.Body.Close() }()
reqBody, err := io.ReadAll(r.Body)
Expect(err).To(BeNil())
Expect(string(reqBody)).To(Equal(payload))
w.WriteHeader(code)
}))
GinkgoT().Setenv("WAREHOUSE_URL", whMock.URL)
GinkgoT().Setenv("RSERVER_WAREHOUSE_MODE", config.OffMode)
err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())

defer func() {
err := gateway.Shutdown()
Expect(err).To(BeNil())
whMock.Close()
}()

req := httptest.NewRequest("POST", "http://rudder-server"+url, bytes.NewBufferString(payload))
w := httptest.NewRecorder()
gateway.whProxy.ServeHTTP(w, req)
resp := w.Result()
Expect(resp.StatusCode).To(Equal(code))
},
Entry("successful request", "/v1/warehouse/pending-events", http.StatusOK, `{"source_id": "1", "task_run_id":"2"}`),
Entry("failed request", "/v1/warehouse/pending-events", http.StatusBadRequest, `{"source_id": "3", "task_run_id":"4"}`),
Entry("request with query parameters", "/v1/warehouse/pending-events?triggerUpload=true", http.StatusOK, `{"source_id": "5", "task_run_id":"6"}`),
)
})

Context("jobDataFromRequest", func() {
var (
gateway *HandleT
Expand Down Expand Up @@ -1161,19 +1137,19 @@ func expectHandlerResponse(handler http.HandlerFunc, req *http.Request, response
}

// return all endpoints as key and method as value
func getEndpointMethodMap() (getEndpoints, postEndpoints, deleteEndpoints []string) {
getEndpoints = []string{
func endpointsToVerify() ([]string, []string, []string) {
getEndpoints := []string{
"/version",
"/robots.txt",
"/pixel/v1/track",
"/pixel/v1/page",
"/v1/warehouse",
"/v1/webhook",
"/v1/job-status/123",
"/v1/job-status/123/failed-records",
"/v1/warehouse/jobs/status",
}

postEndpoints = []string{
postEndpoints := []string{
"/v1/batch",
"/v1/identify",
"/v1/track",
Expand All @@ -1185,16 +1161,18 @@ func getEndpointMethodMap() (getEndpoints, postEndpoints, deleteEndpoints []stri
"/v1/import",
"/v1/audiencelist",
"/v1/webhook",
"/v1/warehouse",
"/beacon/v1/batch",
"/internal/v1/extract",
"/v1/warehouse/pending-events",
"/v1/warehouse/trigger-upload",
"/v1/warehouse/jobs",
"/v1/warehouse/fetch-tables",
}

deleteEndpoints = []string{
deleteEndpoints := []string{
"/v1/job-status/1234",
}
return
return getEndpoints, postEndpoints, deleteEndpoints
}

func allHandlers(gateway *HandleT) map[string]http.HandlerFunc {
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/allisson/go-pglock/v2 v2.0.1
github.com/apache/pulsar-client-go v0.10.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.44.275
github.com/aws/aws-sdk-go v1.44.281
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
Expand All @@ -46,7 +46,7 @@ require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-migrate/migrate/v4 v4.16.1
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/golang/mock v1.6.0
github.com/gomodule/redigo v1.8.9
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -82,7 +82,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.4
github.com/tidwall/sjson v1.2.5
github.com/urfave/cli/v2 v2.25.5
github.com/urfave/cli/v2 v2.25.6
github.com/viney-shih/go-lock v1.1.2
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512
Expand All @@ -94,7 +94,7 @@ require (
golang.org/x/exp v0.0.0-20230418202329-0354be287a23
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.2.0
google.golang.org/api v0.125.0
google.golang.org/api v0.127.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
Expand Down Expand Up @@ -181,7 +181,7 @@ require (
github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gax-go/v2 v2.10.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
Expand Down Expand Up @@ -274,7 +274,7 @@ require (
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.9.3 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
22 changes: 12 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.275 h1:VqRULgqrigvQLll4e4hXuc568EQAtZQ6jmBzLlQHzSI=
github.com/aws/aws-sdk-go v1.44.275/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.281 h1:z/ptheJvINaIAsKXthxONM+toTKw2pxyk700Hfm6yUw=
github.com/aws/aws-sdk-go v1.44.281/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2 v1.17.7 h1:CLSjnhJSTSogvqUGhIC6LqFKATMRexcxLZ0i/Nzk9Eg=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
Expand Down Expand Up @@ -1140,8 +1140,8 @@ github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-migrate/migrate/v4 v4.16.1 h1:O+0C55RbMN66pWm5MjO6mw0px6usGpY0+bkSGW9zCo0=
github.com/golang-migrate/migrate/v4 v4.16.1/go.mod h1:qXiwa/3Zeqaltm1MxOCZDYysW/F6folYiBgBG03l9hc=
github.com/golang-migrate/migrate/v4 v4.16.2 h1:8coYbMKUyInrFk1lfGfRovTLAW7PhWp8qQDT2iKfuoA=
github.com/golang-migrate/migrate/v4 v4.16.2/go.mod h1:pfcJX4nPHaVdc5nmdCikFBWtm+UBpiZjRNNsyBbp0/o=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang-sql/sqlexp v0.0.0-20170517235910-f1bb20e5a188/go.mod h1:vXjM/+wXQnTPR4KqTKDgJukSZ6amVRtWMPEjE6sQoK8=
Expand Down Expand Up @@ -1268,8 +1268,9 @@ github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99
github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.4 h1:uGy6JWR/uMIILU8wbf+OkstIrNiMjGpEIyhx8f6W7s4=
github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
Expand Down Expand Up @@ -1868,8 +1869,8 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.25.5 h1:d0NIAyhh5shGscroL7ek/Ya9QYQE0KNabJgiUinIQkc=
github.com/urfave/cli/v2 v2.25.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/urfave/cli/v2 v2.25.6 h1:yuSkgDSZfH3L1CjF2/5fNNg2KbM47pY2EvjBq4ESQnU=
github.com/urfave/cli/v2 v2.25.6/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/viney-shih/go-lock v1.1.2 h1:3TdGTiHZCPqBdTvFbQZQN/TRZzKF3KWw2rFEyKz3YqA=
github.com/viney-shih/go-lock v1.1.2/go.mod h1:Yijm78Ljteb3kRiJrbLAxVntkUukGu5uzSxq/xV7OO8=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
Expand Down Expand Up @@ -2256,8 +2257,9 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -2437,8 +2439,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/
google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI=
google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0=
google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg=
google.golang.org/api v0.125.0 h1:7xGvEY4fyWbhWMHf3R2/4w7L4fXyfpRGE9g6lp8+DCk=
google.golang.org/api v0.125.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw=
google.golang.org/api v0.127.0 h1:v7rj0vA0imM3Ou81k1eyFxQNScLzn71EyGnJDr+V/XI=
google.golang.org/api v0.127.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
15 changes: 15 additions & 0 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type UploadJob struct {
refreshPartitionBatchSize int
retryTimeWindow time.Duration
minRetryAttempts int
DisableAlter bool

errorHandler ErrorHandler
}
Expand Down Expand Up @@ -195,6 +196,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
refreshPartitionBatchSize: config.GetInt("Warehouse.refreshPartitionBatchSize", 100),
retryTimeWindow: retryTimeWindow,
minRetryAttempts: minRetryAttempts,
DisableAlter: config.GetBool("Warehouse.disableAlter", false),

alertSender: alerta.NewClient(
config.GetString("ALERTA_URL", "https://alerta.rudderstack.com/api/"),
Expand Down Expand Up @@ -790,6 +792,19 @@ func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseu
}

func (job *UploadJob) alterColumnsToWarehouse(tName string, columnsMap model.TableSchema) error {
if job.DisableAlter {
pkgLogger.Debugw("skipping alter columns to warehouse",
logfield.SourceID, job.warehouse.Source.ID,
logfield.SourceType, job.warehouse.Source.SourceDefinition.Name,
logfield.DestinationID, job.warehouse.Destination.ID,
logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, job.warehouse.WorkspaceID,
logfield.TableName, tName,
"columns", columnsMap,
)
return nil
}

var responseToAlerta []model.AlterTableResponse
var errs []error

Expand Down

0 comments on commit b662575

Please sign in to comment.