Skip to content

Commit

Permalink
refactor(Tool): Tidy argument parsing etc (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Feb 19, 2024
1 parent cfdcfe3 commit a9fba70
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 337 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion/Pipeline.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Pipeline(task: Task<unit>, triggerStop) =

member _.Monitor = monitor.Value
member _.FlushAsync(): Task<'P> = flush ()
member x.Flush(): Async<'P> = x.FlushAsync() |> Async.AwaitTaskCorrect
member x.Flush(): Async<'P> = x.FlushAsync() |> Async.ofTask

type SinkPipeline<'Ingester> internal (task: Task<unit>, triggerStop, startIngester) =
inherit Pipeline(task, triggerStop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.14.1" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.16" />
<PackageReference Include="TypeShape" Version="10.0.0" />
</ItemGroup>
<ItemGroup>
Expand Down
159 changes: 153 additions & 6 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Configuration(tryGet: string -> string option, get: string -> string) =
module Cosmos =

open Configuration.Cosmos

type [<NoEquality; NoComparison>] Parameters =
| [<AltCommandLine "-m">] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
| [<AltCommandLine "-s">] Connection of string
Expand Down Expand Up @@ -88,13 +89,64 @@ module Cosmos =
let containerId = p.GetResult(Container, fun () -> c.CosmosContainer)
let leasesContainerName = p.GetResult(LeaseContainer, fun () -> containerId + p.GetResult(Suffix, "-aux"))
let checkpointInterval = TimeSpan.hours 1.
member val MaybeLogLagInterval = p.TryPostProcessResult(LagFreqM, TimeSpan.minutes)
member val MaybeLogLagInterval = p.TryGetResult(LagFreqM, TimeSpan.minutes)
member _.CreateLeasesContainer() = connector.CreateLeasesContainer(databaseId, leasesContainerName)
member _.ConnectFeed() = connector.ConnectFeed(databaseId, containerId, leasesContainerName)
member x.CreateCheckpointStore(group, cache, storeLog) = async {
let! context = connector.ConnectContext("Checkpoints", databaseId, containerId, 256)
return Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (group, checkpointInterval) (context, cache) }

open Equinox.CosmosStore.Core.Initialization
type [<NoEquality; NoComparison; RequireSubcommand>] InitParameters =
| [<AltCommandLine "-ru"; Unique>] Rus of int
| [<AltCommandLine "-A"; Unique>] Autoscale
| [<AltCommandLine "-m"; Unique>] Mode of ModeType
| [<AltCommandLine "-s">] Suffix of string
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Rus _ -> "Specify RU/s level to provision for the Aux Container. (with AutoScale, the value represents the maximum RU/s to AutoScale based on)."
| Autoscale -> "Autoscale provisioned throughput. Use --rus to specify the maximum RU/s."
| Mode _ -> "Configure RU mode to use Container-level RU, Database-level RU, or Serverless allocations (Default: Use Container-level allocation)."
| Suffix _ -> "Specify Container Name suffix (default: `-aux`)."
| Cosmos _ -> "Cosmos Connection parameters."
and ModeType = Container | Db | Serverless
type InitArguments(p: ParseResults<InitParameters>) =
let rusOrDefault (value: int) = p.GetResult(Rus, value)
let throughput auto = if auto then Throughput.Autoscale (rusOrDefault 4000)
else Throughput.Manual (rusOrDefault 400)
member val ProvisioningMode =
match p.GetResult(Mode, ModeType.Container), p.Contains Autoscale with
| ModeType.Container, auto -> Provisioning.Container (throughput auto)
| ModeType.Db, auto -> Provisioning.Database (throughput auto)
| ModeType.Serverless, auto when auto || p.Contains Rus -> p.Raise "Cannot specify RU/s or Autoscale in Serverless mode"
| ModeType.Serverless, _ -> Provisioning.Serverless

let initAux (c, p: ParseResults<InitParameters>) =
match p.GetSubCommand() with
| InitParameters.Cosmos sa ->
let mode, a = (InitArguments p).ProvisioningMode, Arguments(c, sa)
let container = a.CreateLeasesContainer()
match mode with
| Provisioning.Container throughput ->
match throughput with
| Throughput.Autoscale rus ->
Log.Information("Provisioning Leases Container with Autoscale throughput of up to {rus:n0} RU/s", rus)
| Throughput.Manual rus ->
Log.Information("Provisioning Leases Container with {rus:n0} RU/s", rus)
| Provisioning.Database throughput ->
let modeStr = "Database"
match throughput with
| Throughput.Autoscale rus ->
Log.Information("Provisioning Leases Container at {modeStr:l} level with Autoscale throughput of up to {rus:n0} RU/s", modeStr, rus)
| Throughput.Manual rus ->
Log.Information("Provisioning Leases Container at {modeStr:l} level with {rus:n0} RU/s", modeStr, rus)
| Provisioning.Serverless ->
let modeStr = "Serverless"
Log.Information("Provisioning Leases Container in {modeStr:l} mode with automatic throughput RU/s as configured in account", modeStr)
initAux container.Database.Client (container.Database.Id, container.Id) mode
| x -> p.Raise $"unexpected subcommand %A{x}"

module Dynamo =

open Configuration.Dynamo
Expand Down Expand Up @@ -144,7 +196,6 @@ module Dynamo =
| IndexSuffix _ -> "specify a suffix for the index store. (not relevant if `Table` or `IndexTable` specified. default: \"-index\")"
| StreamsDop _ -> "parallelism when loading events from Store Feed Source. Default: Don't load events"
| IndexPartition _ -> "Constrain Index Partitions to load. Default: Load all indexed partitions"

type Arguments(c: Configuration, p: ParseResults<Parameters>) =
let conn = match p.TryGetResult RegionProfile |> Option.orElseWith (fun () -> c.DynamoRegion) with
| Some systemName ->
Expand Down Expand Up @@ -198,9 +249,105 @@ module Dynamo =
member _.CreateCheckpointStore(group, cache, storeLog) =
Propulsion.Feed.ReaderCheckpoint.DynamoStore.create storeLog (group, checkpointInterval) (indexReadContext.Value, cache)

type [<NoEquality; NoComparison; RequireSubcommand>] IndexParameters =
| [<AltCommandLine "-p"; Unique>] IndexPartitionId of int
| [<AltCommandLine "-j"; MainCommand>] DynamoDbJson of string
| [<AltCommandLine "-m"; Unique>] MinSizeK of int
| [<AltCommandLine "-b"; Unique>] EventsPerBatch of int
| [<AltCommandLine "-g"; Unique>] GapsLimit of int
| [<CliPrefix(CliPrefix.None)>] Dynamo of ParseResults<Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| IndexPartitionId _ -> "PartitionId to verify/import into. (optional, omitting displays partitions->epochs list)"
| DynamoDbJson _ -> "Source DynamoDB JSON filename(s) to import (optional, omitting displays current state)"
| MinSizeK _ -> "Index Stream minimum Item size in KiB. Default 48"
| EventsPerBatch _ -> "Maximum Events to Ingest as a single batch. Default 10000"
| GapsLimit _ -> "Max Number of gaps to output to console. Default 10"
| Dynamo _ -> "Specify DynamoDB parameters."

open Propulsion.DynamoStore

type IndexerArguments(c, p: ParseResults<IndexParameters>) =
member val GapsLimit = p.GetResult(IndexParameters.GapsLimit, 10)
member val ImportJsonFiles = p.GetResults IndexParameters.DynamoDbJson
member val TrancheId = p.TryGetResult(IndexParameters.IndexPartitionId, string >> AppendsPartitionId.parse)
// Larger optimizes for not needing to use TransactWriteItems as frequently
// Smaller will trigger more items and reduce read costs for Sources reading from the tail
member val MinItemSize = p.GetResult(IndexParameters.MinSizeK, 48)
member val EventsPerBatch = p.GetResult(IndexParameters.EventsPerBatch, 10000)

member val StoreArgs =
match p.GetSubCommand() with
| IndexParameters.Dynamo p -> Arguments (c, p)
| x -> p.Raise $"unexpected subcommand %A{x}"
member x.CreateContext() = x.StoreArgs.CreateContext x.MinItemSize

let dumpSummary gapsLimit streams spanCount =
let mutable totalS, totalE, queuing, buffered, gapped = 0, 0L, 0, 0, 0
for KeyValue (stream, v: DynamoStoreIndex.BufferStreamState) in streams do
totalS <- totalS + 1
totalE <- totalE + int64 v.writePos
if v.spans.Length > 0 then
match v.spans[0].Index - v.writePos with
| 0 ->
if v.spans.Length > 1 then queuing <- queuing + 1 // There's a gap within the queue
else buffered <- buffered + 1 // Everything is fine, just not written yet
| gap ->
gapped <- gapped + 1
if gapped < gapsLimit then
Log.Warning("Gapped stream {stream}@{wp}: Missing {gap} events before {successorEventTypes}", stream, v.writePos, gap, v.spans[0].c)
elif gapped = gapsLimit then
Log.Error("Gapped Streams Dump limit ({gapsLimit}) reached; use commandline flag to show more", gapsLimit)
let level = if gapped > 0 then LogEventLevel.Warning else LogEventLevel.Information
Log.Write(level, "Index {events:n0} events {streams:n0} streams ({spans:n0} spans) Buffered {buffered} Queueing {queuing} Gapped {gapped:n0}",
totalE, totalS, spanCount, buffered, queuing, gapped)

let index (c: Configuration, p: ParseResults<IndexParameters>) = async {
let a = IndexerArguments(c, p)
let context = a.CreateContext()

match a.TrancheId with
| None when (not << List.isEmpty) a.ImportJsonFiles ->
p.Raise "Must specify a trancheId parameter to import into"
| None ->
let index = AppendsIndex.Reader.create Metrics.log context
let! state = index.Read()
Log.Information("Current Partitions / Active Epochs {summary}",
seq { for kvp in state -> struct (kvp.Key, kvp.Value) } |> Seq.sortBy (fun struct (t, _) -> t))

let storeSpecFragment = $"dynamo -t {a.StoreArgs.IndexTable}"
let dumpCmd sn opts = $"eqx -C dump '{sn}' {opts}{storeSpecFragment}"
Log.Information("Inspect Index Partitions list events 👉 {cmd}",
dumpCmd (AppendsIndex.Stream.name ()) "")

let pid, eid = AppendsPartitionId.wellKnownId, FSharp.UMX.UMX.tag<appendsEpochId> 2
Log.Information("Inspect Batches in Epoch {epoch} of Index Partition {partition} 👉 {cmd}",
eid, pid, dumpCmd (AppendsEpoch.Stream.name (pid, eid)) "-B ")
| Some trancheId ->
let! buffer, indexedSpans = DynamoStoreIndex.Reader.loadIndex (Log.Logger, Metrics.log, context) trancheId a.GapsLimit
let dump ingestedCount = dumpSummary a.GapsLimit buffer.Items (indexedSpans + ingestedCount)
dump 0

match a.ImportJsonFiles with
| [] -> ()
| files ->

Log.Information("Ingesting {files}...", files)

let ingest =
let ingester = DynamoStoreIngester(Log.Logger, context, storeLog = Metrics.log)
fun batch -> ingester.Service.IngestWithoutConcurrency(trancheId, batch)
let import = DynamoDbExport.Importer(buffer, ingest, dump)
for file in files do
let! stats = import.IngestDynamoDbJsonFile(file, a.EventsPerBatch)
Log.Information("Merged {file}: {items:n0} items {events:n0} events", file, stats.items, stats.events)
do! import.Flush()
Equinox.DynamoStore.Core.Log.InternalMetrics.dump Log.Logger }

module Mdb =

open Configuration.Mdb
open Npgsql

type [<NoEquality; NoComparison>] Parameters =
| [<AltCommandLine "-c">] ConnectionString of string
| [<AltCommandLine "-cc">] CheckpointConnectionString of string
Expand All @@ -218,14 +365,14 @@ module Mdb =
let checkpointConnectionString () = p.GetResult(CheckpointConnectionString, connectionString)
let schema = p.GetResult(CheckpointSchema, fun () -> c.MdbSchema)

member x.CreateClient() =
member _.CreateClient() =
Array.ofList (p.GetResults Category), connectionString ()

member x.CreateCheckpointStore(group) =
member _.CreateCheckpointStore(group) =
Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConnectionString (), schema, group)

member x.CreateCheckpointStoreTable([<O; D null>] ?ct) = task {
let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConnectionString (), Password = null)
let connStringWithoutPassword = Npgsql.NpgsqlConnectionStringBuilder(checkpointConnectionString (), Password = null)
Log.Information("Authenticating with postgres using {connectionString}", connStringWithoutPassword.ToString())
Log.Information("Creating checkpoints table as {table}", $"{schema}.{Propulsion.MessageDb.ReaderCheckpoint.TableName}")
let checkpointStore = x.CreateCheckpointStore("nil")
Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Metrics =
let [<Literal>] PropertyTag = "isMetric"
let log = Log.ForContext(PropertyTag, true)
/// Allow logging to filter out emission of log messages whose information is also surfaced as metrics
let logEventIsMetric e = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke e
let logEventIsMetric = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke

module EnvVar =

Expand Down
Loading

0 comments on commit a9fba70

Please sign in to comment.