使用sql读取java数据

Apache Calcite框架是一个动态数据管理框架,可以使用写sql的方式查询java类型的数据。也可以处理其他形式的数据,详情看官方文档Apache Calcite

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.37</version>
</dependency>

jdk是1.8版本

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class CalciteTest {
public static void main(String[] args) throws Exception {
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
String jsonArray = "[{\"id\":1,\"name\":\"Alice\",\"age\":30},{\"id\":2,\"name\":\"Bob\",\"age\":25},{\"id\":3,\"name\":\"John\",\"age\":24}]";
List<Map<String, Object>> data = JsonUtils.parseJsonArray(jsonArray);
Map<String, List<Map<String, Object>>> tables = new HashMap<>();
tables.put("DynamicTable", data);
rootSchema.add("aa", new DynamicSchema(tables));
Statement statement = calciteConnection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM aa.DynamicTable");
while (resultSet.next()) {
System.out.println(resultSet.getString("id") + "," + resultSet.getString("name") + "," + resultSet.getString("age"));
}
resultSet.close();
statement.close();
connection.close();
}
}

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;
import java.util.Map;

public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();

public static List<Map<String, Object>> parseJsonArray(String jsonArray) throws Exception {
return objectMapper.readValue(jsonArray, new TypeReference<List<Map<String, Object>>>() {});
}
}

动态schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DynamicSchema extends AbstractSchema {
private final Map<String, List<Map<String, Object>>> tables;

public DynamicSchema(Map<String, List<Map<String, Object>>> tables) {
this.tables = tables;
}

@Override
protected Map<String, org.apache.calcite.schema.Table> getTableMap() {
Map<String, org.apache.calcite.schema.Table> tableMap = new HashMap<>();
for (Map.Entry<String, List<Map<String, Object>>> entry : tables.entrySet()) {
tableMap.put(entry.getKey(), new DynamicTable(entry.getValue()));
}
return tableMap;
}

public static class DynamicTable extends AbstractTable implements ScannableTable {

private final List<Map<String, Object>> data;

public DynamicTable(List<Map<String, Object>> data) {
this.data = data;
}
@Override
public Enumerable<Object[]> scan(DataContext root) {
List<Object[]> rows = new ArrayList<>();
for (Map<String, Object> row : data) {
rows.add(row.values().toArray());
}
return Linq4j.asEnumerable(rows);
}

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
if (!data.isEmpty()) {
Map<String, Object> firstRow = data.get(0);
for (Map.Entry<String, Object> entry : firstRow.entrySet()) {
SqlTypeName sqlType = determineSqlType(entry.getValue());
builder.add(entry.getKey(), sqlType);
}
}
return builder.build();
}

private SqlTypeName determineSqlType(Object value) {
if (value instanceof Integer) {
return SqlTypeName.INTEGER;
} else if (value instanceof String) {
return SqlTypeName.VARCHAR;
} else if (value instanceof Double) {
return SqlTypeName.DOUBLE;
} else if (value instanceof Boolean) {
return SqlTypeName.BOOLEAN;
} else {
return SqlTypeName.ANY;
}
}
}
}

框架原理

在官网的示例中,使用new ReflectiveSchema()构造了一个Java结构作为schema,对象中的emps、depts作为table,这样就可以对java对象使用sql操作,标准的sql语法都能实现。

Schema schema = new ReflectiveSchema(new HrSchema());

HrSchema结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.google.common.collect.ImmutableList;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.TranslatableTable;

import java.util.Arrays;
import java.util.Collections;

public class HrSchema {
@Override public String toString() {
return "HrSchema";
}

public final Employees[] emps = {
new Employees(100, 10, "Bill", 10000, 1000),
new Employees(200, 20, "Eric", 8000, 500),
new Employees(150, 10, "Sebastian", 7000, null),
new Employees(110, 10, "Theodore", 11500, 250),
};
public final Departments[] depts = {
new Departments(10, "Sales", Arrays.asList(emps[0], emps[2]),
new Location(-122, 38)),
new Departments(30, "Marketing", ImmutableList.of(), new Location(0, 52)),
new Departments(40, "HR", Collections.singletonList(emps[1]), null),
};
public final Dependents[] Dependentss = {
new Dependents(10, "Michael"),
new Dependents(10, "Jane"),
};
public final Dependents[] locations = {
new Dependents(10, "San Francisco"),
new Dependents(20, "San Diego"),
};

public QueryableTable foo(int count) {
return Smalls.generateStrings(count);
}

public TranslatableTable view(String s) {
return Smalls.view(s);
}
}

在自己的示例中,继承了AbstractSchema,重新写了一个通用的结构。只要传入的是一个对象数组结构,都能转换为动态的表,都使用sql查询。