Skip to content

Commit

Permalink
Indexing / Updated image
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandPheasant committed Nov 29, 2015
1 parent eee0790 commit 061a150
Showing 1 changed file with 9 additions and 45 deletions.
54 changes: 9 additions & 45 deletions Source/TailBlazer.Domain/FileHandling/SparseIndexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using DynamicData;
using DynamicData.Binding;
using TailBlazer.Domain.Annotations;
Expand All @@ -17,16 +15,13 @@ namespace TailBlazer.Domain.FileHandling
public class SparseIndexer: IDisposable
{
private readonly IDisposable _cleanUp;
private int _endOfFile;
private readonly ISourceList<SparseIndex> _indicies = new SourceList<SparseIndex>();

public Encoding Encoding { get; }
public FileInfo Info { get; }
public int Compression { get; set; }
public int Compression { get; }
public int TailSize { get; }


private int _endOfFile;

private readonly ISourceList<SparseIndex> _indicies= new SourceList<SparseIndex>();

public IObservable<SparseIndicies> Result { get; }

Expand All @@ -47,15 +42,12 @@ public SparseIndexer([NotNull] FileInfo info,
TailSize = tailSize;
Encoding = encoding ?? info.GetEncoding();

//create a resulting index object from the collection of index fragments
//0. create a resulting index object from the collection of index fragments
Result = _indicies
.Connect()
.Sort(SortExpressionComparer<SparseIndex>.Ascending(si => si.Start))
.ToCollection()
.Scan((SparseIndicies)null, (previous, notification) =>
{
return new SparseIndicies(notification, previous, Encoding);
});
.Scan((SparseIndicies)null, (previous, notification) => new SparseIndicies(notification, previous, Encoding));


//1. Get full length of file
Expand Down Expand Up @@ -88,13 +80,8 @@ public SparseIndexer([NotNull] FileInfo info,
});
});

////3. Scan the remainder of the file when the first one has started
//scheduler.Schedule(() =>
//{

//})

var xxx = tailScanner.FirstAsync()
////3. Scan the remainder of the file when the tail has been scanned
var headSubscriber = tailScanner.FirstAsync()
.Subscribe(head =>
{
var estimateLines = EstimateNumberOfLines(head, info);
Expand All @@ -112,35 +99,17 @@ public SparseIndexer([NotNull] FileInfo info,
});
});




_cleanUp = new CompositeDisposable(_indicies,
tailSubscriber,
headSubscriber,
tailScanner.Connect());
}

private IObservable<SparseIndex> ScanHead(SparseIndex head,FileInfo info, int compression)
{
return Observable.Create<SparseIndex>(observer =>
{
var estimateLines = EstimateNumberOfLines(head, info);
var estimate = new SparseIndex(0, head.Start, compression, estimateLines, SpareIndexType.Page);
observer.OnNext(estimate);

var scan = Scan(0, head.Start, compression);
observer.OnNext(estimate);

return Observable.StartAsync( () => ScanAsync(0, head.Start, compression))
.SubscribeSafe(observer);
});
}

private int EstimateNumberOfLines(SparseIndex tail, FileInfo info)
{
//Calculate estimate line count
var averageLineLength = tail.Size/tail.LineCount;
var estimatedLines = info.Length/averageLineLength;
var estimatedLines = (info.Length - tail.Size) /averageLineLength;
return (int) estimatedLines;
}

Expand All @@ -149,11 +118,6 @@ private SparseIndex ScanTail(int start)
return Scan(start,-1, 1);
}

private async Task<SparseIndex> ScanAsync(int start, int end, int compression)
{
return await Task.Run(()=> Scan( start, end, compression));
}

private SparseIndex Scan(int start, int end, int compression)
{
int count = 0;
Expand Down

0 comments on commit 061a150

Please sign in to comment.