Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #3426: add apache arrow import procedure to extended #3978

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
= apoc.import.arrow
:description: This section contains reference documentation for the apoc.import.arrow procedure.

label:procedure[] label:apoc-extended[]

[.emphasis]
apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array

== Signature

[source]
----
apoc.import.arrow(urlOrBinaryFile :: ANY?, config = {} :: MAP?) :: (file :: STRING?, source :: STRING?, format :: STRING?, nodes :: INTEGER?, relationships :: INTEGER?, properties :: INTEGER?, time :: INTEGER?, rows :: INTEGER?, batchSize :: INTEGER?, batches :: INTEGER?, done :: BOOLEAN?, data :: STRING?)
----

== Input parameters
[.procedures, opts=header]
|===
| Name | Type | Default
|urlOrBinaryFile|ANY?|null
|config|MAP?|{}
|===

== Config parameters
This procedure supports the following config parameters:

.Config parameters
[opts=header, cols='1a,1a,1a,3a']
|===
| name | type |default | description
| unwindBatchSize | Integer | `2000` | the batch size of the unwind
| mapping | Map | `{}` | see `Mapping config` example below
|===

== Output parameters
[.procedures, opts=header]
|===
| Name | Type
|file|STRING?
|source|STRING?
|format|STRING?
|nodes|INTEGER?
|relationships|INTEGER?
|properties|INTEGER?
|time|INTEGER?
|rows|INTEGER?
|batchSize|INTEGER?
|batches|INTEGER?
|done|BOOLEAN?
|data|STRING?
|===

[[usage-apoc.import.arrow]]
== Usage Examples

The `apoc.import.arrow` procedure can be used to import arrow files created by the `apoc.export.arrow.*` procedures.


[source,cypher]
----
CALL apoc.import.arrow("fileCreatedViaExportProcedures.arrow")
----

.Results
[opts=header]
|===
| file | source | format | nodes | relationships | properties | time | rows | batchSize | batches | done | data
| "fileCreatedViaExportProcedures.arrow" | "file" | "arrow" | 3 | 1 | 15 | 105 | 4 | -1 | 0 | TRUE | NULL
|===


We can also import a file from a binary `byte[]` created by the `apoc.export.arrow.stream.*` procedures.

[source,cypher]
----
CALL apoc.import.arrow(`<binaryArrow>`)
----

=== Mapping config

In order to import complex types not supported by Parquet, like Point, Duration, List of Duration, etc..
we can use the mapping config to convert to the desired data type.
For example, if we have a node `(:MyLabel {durationProp: duration('P5M1.5D')}`, and we export it in a parquet file/binary,
we can import it by explicit a map with key the property key, and value the property type.

That is in this example, by using the load procedure:
[source,cypher]
----
CALL apoc.load.arrow(fileOrBinary, {mapping: {durationProp: 'Duration'}})
----

Or with the import procedure:
[source,cypher]
----
CALL apoc.import.parquet(fileOrBinary, {mapping: {durationProp: 'Duration'}})
----

The mapping value types can be one of the following:

* `Point`
* `LocalDateTime`
* `LocalTime`
* `DateTime`
* `Time`
* `Date`
* `Duration`
* `Char`
* `Byte`
* `Double`
* `Float`
* `Short`
* `Int`
* `Long`
* `Node`
* `Relationship`
* `BaseType` followed by Array, to map a list of values, where BaseType can be one of the previous type, for example `DurationArray`


12 changes: 12 additions & 0 deletions docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
= apoc.import
:description: This section contains reference documentation for the apoc.import procedures.

[.procedures, opts=header, cols='5a,1a']
|===
| Qualified Name | Type
|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]]

apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array
|label:procedure[]
|===

Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,17 @@ apoc.get.rels(rel\|id\|[ids]) - quickly returns all relationships with these id'
|label:procedure[]
|===

== xref::overview/apoc.import/index.adoc[]

[.procedures, opts=header, cols='5a,1a']
|===
| Qualified Name | Type
|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]]

apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array
|label:procedure[]
|===

== xref::overview/apoc.load/index.adoc[]

[.procedures, opts=header, cols='5a,1a']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ This file is generated by DocsTest, so don't change it!
** xref::overview/apoc.get/index.adoc[]
*** xref::overview/apoc.get/apoc.get.nodes.adoc[]
*** xref::overview/apoc.get/apoc.get.rels.adoc[]
** xref::overview/apoc.import/index.adoc[]
*** xref::overview/apoc.import/apoc.import.arrow.adoc[]
** xref::overview/apoc.load/index.adoc[]
*** xref::overview/apoc.load/apoc.load.csv.adoc[]
*** xref::overview/apoc.load/apoc.load.csvParams.adoc[]
Expand Down
6 changes: 6 additions & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ dependencies {
compileOnly group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}

compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

testImplementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.5', {
exclude group: 'com.amazonaws'
}
Expand Down
225 changes: 225 additions & 0 deletions extended/src/main/java/apoc/export/arrow/ImportArrow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package apoc.export.arrow;

import apoc.Extended;
import apoc.Pools;
import apoc.export.util.BatchTransaction;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.FileUtils;
import apoc.util.Util;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.neo4j.graphdb.Entity;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.security.URLAccessChecker;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static apoc.util.ExtendedUtil.toValidValue;

@Extended
public class ImportArrow {

// TODO: field similar to the one placed in ArrowUtils (placed in core)
// when the Arrow procedures will be placed in extended remove these lines
// and replace FIELD_ID with FIELD_ID.getName(), FIELD_LABELS with FIELD_LABELS.getName(), etc..
public static String FIELD_ID = "<id>";
public static String FIELD_LABELS = "labels";
public static String FIELD_SOURCE_ID = "<source.id>";
public static String FIELD_TARGET_ID = "<target.id>";
public static String FIELD_TYPE = "<type>";
// -- end ArrowUtils fields

@Context
public Pools pools;

@Context
public GraphDatabaseService db;

@Context
public URLAccessChecker urlAccessChecker;


@Procedure(name = "apoc.import.arrow", mode = Mode.WRITE)
@Description("Imports arrow from the provided arrow file or byte array")
public Stream<ProgressInfo> importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {

ProgressInfo result =
Util.inThread(pools, () -> {
String file = null;
String sourceInfo = "binary";
if (input instanceof String) {
file = (String) input;
sourceInfo = "file";
}

final ArrowConfig conf = new ArrowConfig(config);

final Map<Long, Long> idMapping = new HashMap<>();

AtomicInteger counter = new AtomicInteger();
try (ArrowReader reader = getReader(input);
VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) {

final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow"));
BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter);
try {
while (hasElements(counter, reader, schemaRoot)) {

final Map<String, Object> row = schemaRoot.getFieldVectors()
.stream()
.collect(
HashMap::new,
(map, fieldVector) -> {
Object read = read(fieldVector, counter.get(), conf);
if (read == null) {
return;
}
map.put(fieldVector.getName(), read);
},
HashMap::putAll);

String relType = (String) row.remove(FIELD_TYPE);
if (relType == null) {
// is node
String[] stringLabels = (String[]) row.remove(FIELD_LABELS);
Label[] labels = Optional.ofNullable(stringLabels)
.map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new))
.orElse(new Label[]{});
final Node node = btx.getTransaction().createNode(labels);

long id = (long) row.remove(FIELD_ID);
idMapping.put(id, node.getId());

addProps(row, node);
reporter.update(1, 0, row.size());
} else {
// is relationship
long sourceId = (long) row.remove(FIELD_SOURCE_ID);
Long idSource = idMapping.get(sourceId);
final Node source = btx.getTransaction().getNodeById(idSource);

long targetId = (long) row.remove(FIELD_TARGET_ID);
Long idTarget = idMapping.get(targetId);
final Node target = btx.getTransaction().getNodeById(idTarget);

final Relationship rel = source.createRelationshipTo(target, RelationshipType.withName(relType));
addProps(row, rel);
reporter.update(0, 1, row.size());
}

counter.incrementAndGet();
btx.increment();
}

btx.doCommit();
} catch (RuntimeException e) {
btx.rollback();
throw e;
} finally {
btx.close();
}

return reporter.getTotal();
}
});

return Stream.of(result);
}


private ArrowReader getReader(Object input) throws IOException {
RootAllocator allocator = new RootAllocator();
if (input instanceof String) {
final SeekableByteChannel channel = FileUtils.inputStreamFor(input, null, null, null, urlAccessChecker)
.asChannel();
return new ArrowFileReader(channel, allocator);
}
ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[]) input);
return new ArrowStreamReader(inputStream, allocator);
}


private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException {
if (counter.get() >= schemaRoot.getRowCount()) {
if (reader.loadNextBatch()) {
counter.set(0);
} else {
return false;
}
}

return true;
}

private static Object read(FieldVector fieldVector, int index, ArrowConfig conf) {

if (fieldVector.isNull(index)) {
return null;
} else if (fieldVector instanceof BitVector) {
BitVector fe = (BitVector) fieldVector;
return fe.get(index) == 1;
} else {
Object object = fieldVector.getObject(index);
if (object instanceof Collection coll && coll.isEmpty()) {
return null;
}
return toValidValue(object, fieldVector.getName(), conf.getMapping());
}
}

private void addProps(Map<String, Object> row, Entity rel) {
row.forEach(rel::setProperty);
}

/**
* Analog to ArrowConfig present in APOC Core.
* TODO Merge these 2 classes when arrow procedure will be moved to APOC Extended
*/
public static class ArrowConfig {

private final int batchSize;
private final Map<String, Object> mapping;

public ArrowConfig(Map<String, Object> config) {
if (config == null) {
config = Collections.emptyMap();
}
this.mapping = (Map<String, Object>) config.getOrDefault("mapping", Map.of());
this.batchSize = Util.toInteger(config.getOrDefault("batchSize", 2000));
}

public int getBatchSize() {
return batchSize;
}

public Map<String, Object> getMapping() {
return mapping;
}
}

}
Loading
Loading