Skip to content

Commit

Permalink
Merge pull request #5 from nlnwa/logwriter
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
andrbo authored May 4, 2021
2 parents fa13355 + 6f66c01 commit 93a15ea
Show file tree
Hide file tree
Showing 18 changed files with 715 additions and 495 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/testcontainers/testcontainers-go v0.10.0
github.com/uber/jaeger-client-go v2.27.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.26.0
)
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/nlnwa/veidemann-api v1.0.0-beta23 h1:UhBbj8T4sQMn6hpZJyZLwA3ao4fUlhfK/8ueIrJRVuU=
github.com/nlnwa/veidemann-api v1.0.0-beta23/go.mod h1:PNjLnm8lZPGqCtyd+M5aKE2nTETiBAddlX+HCnU1nGs=
github.com/nlnwa/veidemann-api/go v0.0.0-20210413093311-7ff38e848604 h1:sWzOP0T3UbDPCZLmxwSalXGLfnZHpmYA7UFElCOmgyM=
github.com/nlnwa/veidemann-api/go v0.0.0-20210413093311-7ff38e848604/go.mod h1:UVGCJSmHATdV3Eohyq03lF3z86q9nRXRQNv3krrEC8I=
github.com/nlnwa/veidemann-api/go v0.0.0-20210414094839-b36ce92632fe h1:yaxQ13HIpCE+I1ZvcVhM1g+sUAopKAAxtt0k1NBmo2Q=
github.com/nlnwa/veidemann-api/go v0.0.0-20210414094839-b36ce92632fe/go.mod h1:UVGCJSmHATdV3Eohyq03lF3z86q9nRXRQNv3krrEC8I=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -605,6 +609,12 @@ github.com/testcontainers/testcontainers-go v0.10.0 h1:ASWe0nwTNg5z8K3WSQ8aBNB6j
github.com/testcontainers/testcontainers-go v0.10.0/go.mod h1:zFYk0JndthnMHEwtVRHCpLwIP/Ik1G7mvIAQ2MdZ+Ig=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber/jaeger-client-go v1.6.0 h1:3+zLlq+4npI5fg8IsgAje3YsP7TcEdNzJScyqFIzxEQ=
github.com/uber/jaeger-client-go v2.27.0+incompatible h1:6WVONolFJiB8Vx9bq4z9ddyV/SXSpfvvtb7Yl/TGHiE=
github.com/uber/jaeger-client-go v2.27.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down Expand Up @@ -634,6 +644,7 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down
31 changes: 0 additions & 31 deletions internal/connection/server.go

This file was deleted.

16 changes: 16 additions & 0 deletions internal/logger/initlog.go → internal/logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2021 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package logger

import (
Expand Down
176 changes: 84 additions & 92 deletions internal/scylla/logserver.go → internal/logservice/logserver.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package scylla
/*
* Copyright 2021 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package logservice

import (
"fmt"
"github.com/gocql/gocql"
"github.com/nlnwa/veidemann-api/go/commons/v1"
logV1 "github.com/nlnwa/veidemann-api/go/log/v1"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -148,82 +162,54 @@ func (p *PageLog) toProto() *logV1.PageLog {
}
}

type logServer struct {
// logServer is a scylladb client
*Client
type LogServer struct {
logV1.UnimplementedLogServer

// metric channels
crawlLogMetric chan *logV1.CrawlLog
pageLogMetric chan *logV1.PageLog

// prepared queries
writeCrawlLog *gocqlx.Queryx
writePageLog *gocqlx.Queryx
listCrawlLogsByWarcId *gocqlx.Queryx
listPageLogsByWarcId *gocqlx.Queryx
listCrawlLogsByExecutionId *gocqlx.Queryx
listPageLogsByExecutionId *gocqlx.Queryx
}

// New creates a new client with the specified address and apiKey.
func New(options Options) *logServer {
crawlLogMetric := make(chan *logV1.CrawlLog, 100)
pageLogMetric := make(chan *logV1.PageLog, 100)

go func() {
for crawlLog := range crawlLogMetric {
CollectCrawlLog(crawlLog)
}
}()
go func() {
for pageLog := range pageLogMetric {
CollectPageLog(pageLog)
}
}()

return &logServer{
Client: &Client{
config: createCluster(gocql.LocalQuorum, options.Keyspace, options.Hosts...),
},
crawlLogMetric: crawlLogMetric,
pageLogMetric: pageLogMetric,
}
// pool of prepared queries
insertCrawlLog *Pool
insertPageLog *Pool
listCrawlLogsByWarcId *Pool
listPageLogsByWarcId *Pool
listCrawlLogsByExecutionId *Pool
listPageLogsByExecutionId *Pool
}

// Connect connects to a scylladb cluster.
func (l *logServer) Connect() error {
err := l.Client.Connect()
if err != nil {
return err
func New(session gocqlx.Session, readPoolSize int, writePoolSize int) *LogServer {
return &LogServer{
insertCrawlLog: NewPool(writePoolSize, func() *gocqlx.Queryx {
return qb.Insert("crawl_log").Json().Query(session)
}),
insertPageLog: NewPool(writePoolSize, func() *gocqlx.Queryx {
return qb.Insert("page_log").Json().Query(session)
}),
listPageLogsByExecutionId: NewPool(readPoolSize, func() *gocqlx.Queryx {
return qb.Select("page_log").Where(qb.Eq("execution_id")).Query(session)
}),
listCrawlLogsByExecutionId: NewPool(readPoolSize, func() *gocqlx.Queryx {
return qb.Select("crawl_log").Where(qb.Eq("execution_id")).Query(session)
}),
listPageLogsByWarcId: NewPool(readPoolSize, func() *gocqlx.Queryx {
return qb.Select("page_log").Where(qb.Eq("warc_id")).Query(session)
}),
listCrawlLogsByWarcId: NewPool(readPoolSize, func() *gocqlx.Queryx {
return qb.Select("crawl_log").Where(qb.Eq("warc_id")).Query(session)
}),
}

// setup prepared queries
l.writeCrawlLog = qb.Insert("crawl_log").Json().Query(l.session)
l.writePageLog = qb.Insert("page_log").Json().Query(l.session)
l.listPageLogsByExecutionId = qb.Select("page_log").Where(qb.Eq("execution_id")).Query(l.session)
l.listCrawlLogsByExecutionId = qb.Select("crawl_log").Where(qb.Eq("execution_id")).Query(l.session)
l.listCrawlLogsByWarcId = qb.Select("crawl_log").Where(qb.Eq("warc_id")).Query(l.session)
l.listPageLogsByWarcId = qb.Select("page_log").Where(qb.Eq("warc_id")).Query(l.session)

return nil
}

// Close closes the connection to database session
func (l *logServer) Close() {
l.writeCrawlLog.Release()
l.writePageLog.Release()
l.listCrawlLogsByWarcId.Release()
l.listPageLogsByWarcId.Release()
l.listCrawlLogsByExecutionId.Release()
l.listPageLogsByExecutionId.Release()
l.session.Close()

close(l.crawlLogMetric)
close(l.pageLogMetric)
// Close drains the query pools.
func (l *LogServer) Close() {
l.insertCrawlLog.Drain()
l.insertPageLog.Drain()
l.listCrawlLogsByWarcId.Drain()
l.listPageLogsByWarcId.Drain()
l.listCrawlLogsByExecutionId.Drain()
l.listPageLogsByExecutionId.Drain()
}

func (l *logServer) WriteCrawlLog(stream logV1.Log_WriteCrawlLogServer) error {
func (l *LogServer) WriteCrawlLog(stream logV1.Log_WriteCrawlLogServer) error {
q := l.insertCrawlLog.Borrow()
defer l.insertCrawlLog.Return(q)
for {
req, err := stream.Recv()
if err == io.EOF {
Expand All @@ -232,23 +218,20 @@ func (l *logServer) WriteCrawlLog(stream logV1.Log_WriteCrawlLogServer) error {
if err != nil {
return err
}

cl := req.GetCrawlLog()
l.crawlLogMetric <- cl
if err := writeCrawlLog(l.writeCrawlLog.WithContext(stream.Context()), cl); err != nil {
log.Error().Err(err).Msg("Error writing crawl log")
return err
crawlLog := req.GetCrawlLog()
CollectCrawlLog(crawlLog)
if err := writeCrawlLog(q, crawlLog); err != nil {
return fmt.Errorf("error writing crawl log: %w", err)
}
}
}

func writeCrawlLog(query *gocqlx.Queryx, crawlLog *logV1.CrawlLog) error {
// Generate nanosecond timestamp with millisecond precision.
// Generate timestamp with millisecond precision.
// (ScyllaDB does not allow storing timestamps with better than millisecond precision)
ns := (time.Now().UnixNano() / 1e6) * 1e6
crawlLog.TimeStamp = timestamppb.New(time.Unix(0, ns))
// Convert fetchstimestamp to have millisecond precision
crawlLog.FetchTimeStamp.Nanos = crawlLog.FetchTimeStamp.Nanos - (crawlLog.FetchTimeStamp.Nanos % 1e6)
crawlLog.TimeStamp = timestamppb.New(time.Now().UTC().Truncate(time.Millisecond))
// Convert FetchTimeStamp to millisecond precision
crawlLog.FetchTimeStamp = timestamppb.New(crawlLog.FetchTimeStamp.AsTime().Truncate(time.Millisecond))

cl, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(crawlLog)
if err != nil {
Expand All @@ -262,15 +245,16 @@ func writeCrawlLog(query *gocqlx.Queryx, crawlLog *logV1.CrawlLog) error {
return nil
}

func (l *logServer) WritePageLog(stream logV1.Log_WritePageLogServer) error {
func (l *LogServer) WritePageLog(stream logV1.Log_WritePageLogServer) error {
q := l.insertPageLog.Borrow()
defer l.insertPageLog.Return(q)
pageLog := &logV1.PageLog{}
for {
req, err := stream.Recv()
if err == io.EOF {
l.pageLogMetric <- pageLog
if err := writePageLog(l.writePageLog.WithContext(stream.Context()), pageLog); err != nil {
log.Error().Err(err).Msg("Error writing page log")
return err
CollectPageLog(pageLog)
if err := writePageLog(q, pageLog); err != nil {
return fmt.Errorf("error writing page log: %w", err)
}
return stream.SendAndClose(&emptypb.Empty{})
}
Expand Down Expand Up @@ -303,12 +287,16 @@ func writePageLog(query *gocqlx.Queryx, pageLog *logV1.PageLog) error {
return query.Bind(pl).Exec()
}

func (l *logServer) ListPageLogs(req *logV1.PageLogListRequest, stream logV1.Log_ListPageLogsServer) error {
func (l *LogServer) ListPageLogs(req *logV1.PageLogListRequest, stream logV1.Log_ListPageLogsServer) error {
if len(req.GetWarcId()) > 0 {
return listPageLogsByWarcId(l.listPageLogsByWarcId.WithContext(stream.Context()), req, stream.Send)
q := l.listPageLogsByWarcId.Borrow()
defer l.listPageLogsByWarcId.Return(q)
return listPageLogsByWarcId(q.WithContext(stream.Context()), req, stream.Send)
}
if len(req.GetQueryTemplate().GetExecutionId()) > 0 {
return listPageLogsByExecutionId(l.listPageLogsByExecutionId.WithContext(stream.Context()), req, stream.Send)
q := l.listPageLogsByExecutionId.Borrow()
defer l.listPageLogsByExecutionId.Return(q)
return listPageLogsByExecutionId(q.WithContext(stream.Context()), req, stream.Send)
}
return fmt.Errorf("request must provide warcId or executionId")
}
Expand Down Expand Up @@ -358,12 +346,16 @@ func listPageLogsByExecutionId(query *gocqlx.Queryx, req *logV1.PageLogListReque
return iter.Close()
}

func (l *logServer) ListCrawlLogs(req *logV1.CrawlLogListRequest, stream logV1.Log_ListCrawlLogsServer) error {
func (l *LogServer) ListCrawlLogs(req *logV1.CrawlLogListRequest, stream logV1.Log_ListCrawlLogsServer) error {
if len(req.GetWarcId()) > 0 {
return listCrawlLogsByWarcId(l.listCrawlLogsByWarcId.WithContext(stream.Context()), req, stream.Send)
q := l.listCrawlLogsByWarcId.Borrow()
defer l.listCrawlLogsByWarcId.Return(q)
return listCrawlLogsByWarcId(q.WithContext(stream.Context()), req, stream.Send)
}
if len(req.GetQueryTemplate().GetExecutionId()) > 0 {
return listCrawlLogsByExecutionId(l.listCrawlLogsByExecutionId.WithContext(stream.Context()), req, stream.Send)
q := l.listCrawlLogsByExecutionId.Borrow()
defer l.listCrawlLogsByExecutionId.Return(q)
return listCrawlLogsByExecutionId(q.WithContext(stream.Context()), req, stream.Send)
}
return fmt.Errorf("request must provide warcId or executionId")
}
Expand Down
Loading

0 comments on commit 93a15ea

Please sign in to comment.