Skip to content

Commit

Permalink
Update to fix some issues with data parallel runs, use that for minrf.
Browse files Browse the repository at this point in the history
  • Loading branch information
liuliu committed May 24, 2024
1 parent 28ab43d commit 7a222c9
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 166 deletions.
122 changes: 83 additions & 39 deletions examples/minrf/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import NNC
import Foundation
import NNC
import TensorBoard

public func timeEmbedding(timesteps: [Float], embeddingSize: Int, maxPeriod: Int)
Expand Down Expand Up @@ -32,7 +32,9 @@ public func TimestepEmbedder(hiddenSize: Int) -> Model {
return Model([x], [out])
}

public func LabelEmbedder<T: TensorNumeric>(_ dataType: T.Type, numClasses: Int, hiddenSize: Int) -> Model {
public func LabelEmbedder<T: TensorNumeric>(_ dataType: T.Type, numClasses: Int, hiddenSize: Int)
-> Model
{
let labelEmbed = Embedding(
T.self, vocabularySize: numClasses + 1, embeddingSize: hiddenSize, name: "label_embedder")
return labelEmbed
Expand Down Expand Up @@ -101,7 +103,8 @@ func DiT(batchSize: Int, hiddenSize: Int, layers: Int) -> Model {
hint: Hint(stride: [1, 1], border: Hint.Border(begin: [2, 2], end: [2, 2])), name: "conv1")
let norm1 = GroupNorm(axis: 1, groups: 32, epsilon: 1e-5, reduce: [2, 3], name: "norm1")
out = norm1(conv1(out).swish())
out = out.reshaped([batchSize, hiddenSize / 2, 16, 2, 16, 2]).permuted(0, 2, 4, 3, 5, 1).contiguous().reshaped([batchSize, 16 * 16, hiddenSize * 2])
out = out.reshaped([batchSize, hiddenSize / 2, 16, 2, 16, 2]).permuted(0, 2, 4, 3, 5, 1)
.contiguous().reshaped([batchSize, 16 * 16, hiddenSize * 2])
let xEmbedder = Dense(count: hiddenSize, name: "x_embedder")
out = xEmbedder(out)

Expand All @@ -110,18 +113,23 @@ func DiT(batchSize: Int, hiddenSize: Int, layers: Int) -> Model {
let timestepEmbedder = TimestepEmbedder(hiddenSize: hiddenSize)
let y = Input()
let labelEmbedder = LabelEmbedder(Float.self, numClasses: 10, hiddenSize: hiddenSize)
let adaLNInput = (timestepEmbedder(t) + labelEmbedder(y)).reshaped([batchSize, 1, hiddenSize]).swish()
let adaLNInput = (timestepEmbedder(t) + labelEmbedder(y)).reshaped([batchSize, 1, hiddenSize])
.swish()
for _ in 0..<layers {
let transformer = TransformerBlock(k: hiddenSize / 8, h: 8, hk: 8, b: batchSize, t: 256)
out = transformer(out, rot, adaLNInput)
}
let norm = LayerNorm(epsilon: 1e-6, axis: [2], elementwiseAffine: false)
out = norm(out)
let adaLNs = [Dense(count: hiddenSize, name: "ada_ln_final_0"), Dense(count: hiddenSize, name: "ada_ln_final_1")]
let adaLNs = [
Dense(count: hiddenSize, name: "ada_ln_final_0"),
Dense(count: hiddenSize, name: "ada_ln_final_1"),
]
let chunks = adaLNs.map { $0(adaLNInput) }
out = out .* (1 + chunks[1]) + chunks[0]
let convOut = Dense(count: 3 * 2 * 2, name: "final")
out = convOut(out).reshaped([batchSize, 16, 16, 2, 2, 3]).permuted(0, 5, 1, 3, 2, 4).contiguous().reshaped([batchSize, 3, 32, 32])
out = convOut(out).reshaped([batchSize, 16, 16, 2, 2, 3]).permuted(0, 5, 1, 3, 2, 4).contiguous()
.reshaped([batchSize, 3, 32, 32])
return Model([x, rot, t, y], [out])
}

Expand All @@ -131,7 +139,7 @@ let dataBatchPath = "/fast/Data/cifar-10/cifar-10-batches-bin/data_batch.bin"

let dataBatchSize = 50_000

let batchSize = 256
let globalBatchSize = 256

/// MARK - Loading Data from Disk

Expand Down Expand Up @@ -169,8 +177,16 @@ trainDataDf["c"] = trainDataDf["main", CIFARData.self].map {
Tensor<Int32>([Int32($0.label)], .CPU, .C(1))
}

var batchedTrainData = trainDataDf["tensor", "c"].combine(size: batchSize)
batchedTrainData["imageGPU"] = batchedTrainData["tensor"]!.toGPU()
let deviceCount = DeviceKind.GPUs.count
let batchSize = globalBatchSize / deviceCount
var batchedTrainData = trainDataDf["tensor", "c"].combine(size: batchSize, repeating: deviceCount)
if deviceCount > 1 {
for i in 0..<deviceCount {
batchedTrainData["imageGPU_\(i)"] = batchedTrainData["tensor_\(i)"]!.toGPU(i)
}
} else {
batchedTrainData["imageGPU"] = batchedTrainData["tensor"]!.toGPU(0)
}

/// MARK - Training Loop

Expand All @@ -192,34 +208,51 @@ for i in 0..<(16 * 16) {
}
}
}
let rotG = rot.toGPU(0)
let rotG = DynamicGraph.Group((0..<deviceCount).map { rot.toGPU($0) })
var optimizer = AdamWOptimizer(graph, rate: 0.0005)
optimizer.parameters = [dit.parameters]
var isLoaded = false
var columns = [String]()
if deviceCount > 1 {
for i in 0..<deviceCount {
columns += ["imageGPU_\(i)", "c_\(i)"]
}
} else {
columns += ["imageGPU", "c"]
}
for epoch in 0..<10000 {
batchedTrainData.shuffle()
var overallLoss: Double = 0
var overallSamples = 0
var lossCount: [Int] = Array(repeating: 0, count: 10)
var lossBins: [Double] = Array(repeating: 0, count: 10)
for (_, batch) in batchedTrainData["imageGPU", "c"].enumerated() {
let x = graph.variable(batch[0] as! Tensor<Float>)
let y = graph.variable(batch[1] as! Tensor<Int32>).reshaped(.C(batchSize))
let labelDrop = graph.variable(.CPU, .C(batchSize), of: Float.self)
for (_, batch) in batchedTrainData[columns].enumerated() {
let x = graph.variable((0..<deviceCount).map { batch[$0 * 2] as! Tensor<Float> })
let y = graph.variable(
(0..<deviceCount).map { (batch[$0 * 2 + 1] as! Tensor<Int32>).reshaped(.C(batchSize)) })
let labelDrop = graph.variable(.CPU, .NC(deviceCount, batchSize), of: Float.self)
labelDrop.rand()
for i in 0..<batchSize {
// 10% chance of dropping the label.
if labelDrop[i] < 0.1 {
y[i] = 10
for i in 0..<deviceCount {
for j in 0..<batchSize {
// 10% chance of dropping the label.
if labelDrop[i, j] < 0.1 {
y[i][j] = 10
}
}
}
let yG = y.toGPU(0)
let t = graph.variable(.CPU, .C(batchSize), of: Float.self)
let yG = DynamicGraph.Group(y.enumerated().map { $1.toGPU($0) })
let t = DynamicGraph.Group(
(0..<deviceCount).map { _ in graph.variable(.CPU, .C(batchSize), of: Float.self) })
t.randn()
t.sigmoid()
let tG = t.toGPU(0).reshaped(.NCHW(batchSize, 1, 1, 1))
let tE = graph.variable(timeEmbedding(timesteps:(0..<batchSize).map({ t[$0] }), embeddingSize: 256, maxPeriod: 10_000))
let tEG = tE.toGPU(0)
let tG = DynamicGraph.Group(t.enumerated().map { $1.toGPU($0) }).reshaped(
.NCHW(batchSize, 1, 1, 1))
let tE = graph.variable(
(0..<deviceCount).map { i in
timeEmbedding(
timesteps: (0..<batchSize).map({ t[i][$0] }), embeddingSize: 256, maxPeriod: 10_000)
})
let tEG = DynamicGraph.Group(tE.enumerated().map { $1.toGPU($0) })
let z1 = graph.variable(like: x)
z1.randn()
let zt = (1 - tG) .* x + tG .* z1
Expand All @@ -235,17 +268,19 @@ for epoch in 0..<10000 {
let vtheta = dit(inputs: zt, rotG, tEG, yG)[0].as(of: Float.self)
let diff = z1 - x - vtheta
let loss = (diff .* diff).reduced(.mean, axis: [1, 2, 3])
loss.backward(to: [zt, yG])
loss.backward(to: [zt])
optimizer.step()
let lossC = loss.rawValue.toCPU()
let lossC = loss.map { $0.rawValue.toCPU() }
var batchLoss: Double = 0
for i in 0..<batchSize {
let singleLoss = Double(lossC[i, 0, 0, 0])
lossBins[min(Int((t[i] * 10).rounded(.down)), 9)] += singleLoss
lossCount[min(Int((t[i] * 10).rounded(.down)), 9)] += 1
batchLoss += singleLoss
for i in 0..<deviceCount {
for j in 0..<batchSize {
let singleLoss = Double(lossC[i][j, 0, 0, 0])
lossBins[min(Int((t[i][j] * 10).rounded(.down)), 9)] += singleLoss
lossCount[min(Int((t[i][j] * 10).rounded(.down)), 9)] += 1
batchLoss += singleLoss
}
}
batchLoss = batchLoss / Double(batchSize)
batchLoss = batchLoss / Double(batchSize * deviceCount)
overallLoss += batchLoss
overallSamples += 1
summaryWriter.addScalar("loss", batchLoss, step: optimizer.step)
Expand All @@ -262,32 +297,41 @@ for epoch in 0..<10000 {
// Run denoising.
let samplingSteps = 50
graph.withNoGrad {
var z = graph.variable(.GPU(0), .NCHW(batchSize, 3, 32, 32), of: Float.self)
var z = DynamicGraph.Group(
(0..<deviceCount).map {
graph.variable(.GPU($0), .NCHW(batchSize, 3, 32, 32), of: Float.self)
})
z.randn()
let y = graph.variable(.CPU, .C(batchSize), of: Int32.self)
for i in 0..<batchSize {
y[i] = Int32(i) % 10
}
let yG = y.toGPU(0)
let yG = DynamicGraph.Group((0..<deviceCount).map { y.toGPU($0) })
let u = graph.variable(.CPU, .C(batchSize), of: Int32.self)
for i in 0..<batchSize {
u[i] = 10
}
let uG = u.toGPU(0)
let uG = DynamicGraph.Group((0..<deviceCount).map { u.toGPU($0) })
for i in (1...samplingSteps).reversed() {
let t = Float(i) / Float(samplingSteps)
let tE = graph.variable(timeEmbedding(timesteps:(0..<batchSize).map({ _ in t }), embeddingSize: 256, maxPeriod: 10_000))
let tEG = tE.toGPU(0)
let tE = graph.variable(
timeEmbedding(
timesteps: (0..<batchSize).map({ _ in t }), embeddingSize: 256, maxPeriod: 10_000))
let tEG = DynamicGraph.Group((0..<deviceCount).map { tE.toGPU($0) })
let vc = dit(inputs: z, rotG, tEG, yG)[0].as(of: Float.self)
let vu = dit(inputs: z, rotG, tEG, uG)[0].as(of: Float.self)
// cfg = 2
let v = vu + 2 * (vc - vu)
z = z - (1 / Float(samplingSteps)) * v
}
let zCPU = z.toCPU()
for i in 0..<batchSize {
// Write each image as ppm format.
summaryWriter.addImage("sample \(i)", (zCPU[i..<(i + 1), 0..<3, 0..<32, 0..<32] + 1) * 0.5, step: epoch)
for i in 0..<deviceCount {
for j in 0..<batchSize {
// Write each image as ppm format.
summaryWriter.addImage(
"sample \(i * batchSize + j)", (zCPU[i][j..<(j + 1), 0..<3, 0..<32, 0..<32] + 1) * 0.5,
step: epoch)
}
}
}
}
8 changes: 4 additions & 4 deletions nnc/DynamicGraph.swift
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,9 @@ extension DynamicGraph {
as! T
case is DynamicGraph.AnyGroup:
return DynamicGraph.Group(
(0..<like.untyped.count).map { _ in
like.untyped.map {
graph.variable(
like.kind, format: like.format, shape: like.shape, of: T.ElementNumeric.self)
$0.kind, format: $0.format, shape: $0.shape, of: T.ElementNumeric.self)
}) as! T
default:
fatalError("Cannot support the given type")
Expand All @@ -680,9 +680,9 @@ extension DynamicGraph {
as! T
case is DynamicGraph.AnyGroup:
return DynamicGraph.Group(
(0..<like.untyped.count).map { _ in
like.untyped.map {
graph.constant(
like.kind, format: like.format, shape: like.shape, of: T.ElementNumeric.self)
$0.kind, format: $0.format, shape: $0.shape, of: T.ElementNumeric.self)
}) as! T
default:
fatalError("Cannot support the given type")
Expand Down
38 changes: 38 additions & 0 deletions nnc/FunctionalAddons.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,44 @@ extension DynamicGraph.Tensor {
}
}

extension DynamicGraph.Group
where Element: DynamicGraph.TensorGroup, Element: DynamicGraph.AnyTensor {
/// Copy the given tensor to GPU.
public func toGPU(_ ordinal: Int, streamContext: StreamContext?)
-> DynamicGraph.Group<Element>
{
fatalError(
"toGPU() cannot be reasonably implemented for Group as Group would be most effective to resides on different GPUs"
)
}

/// Copy the given tensor to CPU.
public func toCPU(streamContext: StreamContext?) -> DynamicGraph.Group<Element> {
guard underlyingArray.count > 0 else { return self }
var params = CmdParamsFactory.factory.newParams()
params.size.dim = (1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0)
let cmd = ccv_nnc_cmd(CCV_NNC_DATA_TRANSFER_FORWARD, nil, params, 0)
let _inputs: [ccv_nnc_tensor_variable_t?] = underlyingArray.map { $0._tensor }
let graph = underlyingArray[0].graph
let _graph = graph.cGraph
let _streamContext = (streamContext ?? graph.streamContext)?._stream
let outputSize = Int32(underlyingArray.count)
let outputs: [DynamicGraph.Tensor<ElementNumeric>] = untyped.map {
graph.variable(.CPU, format: $0.format, shape: $0.shape)
}
let _outputs = UnsafeMutablePointer<ccv_nnc_tensor_variable_t?>.allocate(
capacity: Int(outputSize))
for (i, variable) in outputs.enumerated() {
(_outputs + i).initialize(to: variable._tensor)
}
ccv_nnc_dynamic_graph_exec(
_graph, cmd, ccv_nnc_no_hint, 0, _inputs, outputSize, _outputs, outputSize, outputSize,
_streamContext)
_outputs.deallocate()
return DynamicGraph.Group(outputs) as! DynamicGraph.Group<Element>
}
}

extension DynamicGraph.Tensor {
/// Fill the given tensor with a value.
public func full(_ value: Float, streamContext: StreamContext?) {
Expand Down
42 changes: 33 additions & 9 deletions nnc/Model.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,15 @@ public class Model {
public func first(where block: @escaping (String) -> Bool) -> Parameters? {
let wrapped = Wrapped(block)
let unmanaged = Unmanaged.passRetained(wrapped)
guard let io = ccv_cnnp_model_parameter_first(model!.cModel, { _, name, context in
let block = Unmanaged<Wrapped<(String) -> Bool>>.fromOpaque(context!).takeUnretainedValue()
return block.value(name.flatMap({ String(cString: $0) }) ?? "") ? 1 : 0
}, unmanaged.toOpaque()) else {
guard
let io = ccv_cnnp_model_parameter_first(
model!.cModel,
{ _, name, context in
let block = Unmanaged<Wrapped<(String) -> Bool>>.fromOpaque(context!)
.takeUnretainedValue()
return block.value(name.flatMap({ String(cString: $0) }) ?? "") ? 1 : 0
}, unmanaged.toOpaque())
else {
unmanaged.release()
return nil
}
Expand All @@ -138,10 +143,13 @@ public class Model {
public func filter(where block: @escaping (String) -> Bool) -> [Parameters] {
let wrapped = Wrapped(block)
let unmanaged = Unmanaged.passRetained(wrapped)
let array = ccv_cnnp_model_parameters_filter(model!.cModel, { _, name, context in
let block = Unmanaged<Wrapped<(String) -> Bool>>.fromOpaque(context!).takeUnretainedValue()
return block.value(name.flatMap({ String(cString: $0) }) ?? "") ? 1 : 0
}, unmanaged.toOpaque())!
let array = ccv_cnnp_model_parameters_filter(
model!.cModel,
{ _, name, context in
let block = Unmanaged<Wrapped<(String) -> Bool>>.fromOpaque(context!)
.takeUnretainedValue()
return block.value(name.flatMap({ String(cString: $0) }) ?? "") ? 1 : 0
}, unmanaged.toOpaque())!
unmanaged.release()
guard array.pointee.rnum > 0 else {
ccv_array_free(array)
Expand Down Expand Up @@ -303,7 +311,23 @@ extension Model {
ccv_cnnp_model_compile(cModel, inputParams, Int32(inputParams.count), noop, noop)
if isEager {
let graph = inputs[0].graph
let _inputs: [ccv_nnc_tensor_variable_t?] = inputs.map { $0.untyped[0]._tensor }
let parallel = inputs[0].untyped.count
let inputSize = inputs.count
var _inputs = [ccv_nnc_tensor_variable_t?](repeating: nil, count: parallel * inputSize)
for (i, input) in inputs.enumerated() {
assert(input.untyped.count == parallel)
for (j, tensor) in input.untyped.enumerated() {
assert(tensor.graph === graph)
_inputs[j * inputSize + i] = tensor._tensor
}
}
if let dataParallel = dataParallel {
// You cannot run a model previously parallel and then not.
assert(dataParallel == parallel)
} else {
ccv_cnnp_model_set_data_parallel(cModel, Int32(parallel))
dataParallel = parallel
}
let _streamContext = graph.streamContext?._stream
ccv_nnc_dynamic_graph_dry_run(
graph.cGraph, cModel, testing ? 1 : 0, _inputs, Int32(_inputs.count), _streamContext)
Expand Down
Loading

0 comments on commit 7a222c9

Please sign in to comment.