I am trying out the cookbook java example here. The only change is that I am trying to write multiple batches. See "batch" comment in the code.
Upon running this example I am seeing unexpected overlapping results!! This thing gets wierder with multi-threading.
Please suggest what is the correct way of sending multiple batches!
S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.arrow.memory.util.MemoryUtil (file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.arrow.memory.util.MemoryUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
C2: Client (Populate Data): Wrote 2 batches with 3 rows each
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
C4: Client (Get Stream):
Client Received batch apache/arrow#1, Data:
vector size: 10
30
31
32
33
34
35
36
37
38
39
Client Received batch apache/arrow#2, Data:
vector size: 10
40
41
42
43
44
45
46
47
48
49
Client Received batch apache/arrow#3, Data:
vector size: 10
50
51
52
53
54
55
56
57
58
59
Client Received batch apache/arrow#4, Data:
vector size: 10
30
31
32
33
34
35
36
37
38
39
Client Received batch apache/arrow#5, Data:
vector size: 10
40
41
42
43
44
45
46
47
48
49
Client Received batch apache/arrow#6, Data:
vector size: 10
50
51
52
53
54
55
56
57
58
59
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully
Process finished with exit code 0
package com.iamsmkr.arrowflight;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
public class CookbookApp {
public static void main(String[] args) {
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()) {
// Server
try (FlightServer flightServer = FlightServer.builder(allocator, location, new ArrowFlightProducer(allocator, location)).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
System.exit(1);
}
// Client
try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
System.out.println("C1: Client (Location): Connected to " + location.getUri());
// Populate data
Schema schema = new Schema(Arrays.asList(
new Field("name", new FieldType(false, new ArrowType.Int(64, true), null), null)));
try (
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
BigIntVector names = (BigIntVector) vectorSchemaRoot.getVector("name")
) {
FlightClient.ClientStreamListener listener =
flightClient.startPut(
FlightDescriptor.path("profiles"),
vectorSchemaRoot,
new AsyncPutListener()
);
// Batch 1
int j = 0;
for (long i = 0; i < 10; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 2
j = 0;
for (long i = 10; i < 20; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 3
j = 0;
for (long i = 20; i < 30; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 4
j = 0;
for (long i = 30; i < 40; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 5
j = 0;
for (long i = 40; i < 50; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 6
j = 0;
for (long i = 50; i < 60; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
listener.completed();
listener.getResult();
System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
}
// Get metadata information
FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
System.out.println("C3: Client (Get Metadata): " + flightInfo);
// Get data information
try (FlightStream flightStream = flightClient.getStream(new Ticket(
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
int batch = 0;
try (
VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot();
BigIntVector names = (BigIntVector) vectorSchemaRootReceived.getVector("name")
) {
System.out.println("C4: Client (Get Stream):");
while (flightStream.next()) {
batch++;
System.out.println("Client Received batch #" + batch + ", Data:");
// System.out.print(vectorSchemaRootReceived.contentToTSVString());
int i = vectorSchemaRootReceived.getRowCount();
System.out.println("vector size: " + i);
int j = 0;
while (j < i) {
System.out.println(names.get(j));
// names.get(j);
// copy(vcHolder, tmpSB);
// System.out.println("name" + j + ": " + tmpSB);
j++;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
// Get all metadata information
Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
System.out.print("C5: Client (List Flights Info): ");
flightInfosBefore.forEach(t -> System.out.println(t));
// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
while (deleteActionResult.hasNext()) {
Result result = deleteActionResult.next();
System.out.println("C6: Client (Do Delete Action): " +
new String(result.getBody(), StandardCharsets.UTF_8));
}
// Get all metadata information (to validate detele action)
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(System.out::println);
System.out.println("C7: Client (List Flights Info): After delete - No records");
// Server shut down
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}