diff --git a/addons/falcon-bridge/src/test/resources/atlas-application.properties b/addons/falcon-bridge/src/test/resources/atlas-application.properties
index 3b12e5fb3..0ce0f46c9 100644
--- a/addons/falcon-bridge/src/test/resources/atlas-application.properties
+++ b/addons/falcon-bridge/src/test/resources/atlas-application.properties
@@ -37,6 +37,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/addons/hbase-bridge/src/test/resources/atlas-application.properties b/addons/hbase-bridge/src/test/resources/atlas-application.properties
index 3b12e5fb3..0ce0f46c9 100644
--- a/addons/hbase-bridge/src/test/resources/atlas-application.properties
+++ b/addons/hbase-bridge/src/test/resources/atlas-application.properties
@@ -37,6 +37,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index c25755139..a787dc7fb 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -611,7 +611,7 @@ public class HiveHookIT extends HiveITBase {
}
private String createTestDFSFile(String path) throws Exception {
- return "pfile://" + file(path);
+ return "file://" + file(path);
}
@Test
@@ -1213,7 +1213,7 @@ public class HiveHookIT extends HiveITBase {
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
- String filename = "pfile://" + mkdir("export");
+ String filename = "file://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
@@ -1272,7 +1272,7 @@ public class HiveHookIT extends HiveITBase {
Assert.assertNotEquals(processEntity1.getGuid(), processEntity2.getGuid());
//Export should update same process
- filename = "pfile://" + mkdir("export2");
+ filename = "file://" + mkdir("export2");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
diff --git a/addons/hive-bridge/src/test/resources/atlas-application.properties b/addons/hive-bridge/src/test/resources/atlas-application.properties
index 5d24a3014..61b3c8f24 100644
--- a/addons/hive-bridge/src/test/resources/atlas-application.properties
+++ b/addons/hive-bridge/src/test/resources/atlas-application.properties
@@ -36,6 +36,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
@@ -122,4 +123,4 @@ atlas.authentication.method.file=true
atlas.authentication.method.ldap.type=none
atlas.authentication.method.kerberos=false
# atlas.authentication.method.file.filename=users-credentials.properties
-atlas.hook.hive.hs2.ignore.ddl.operations=false
\ No newline at end of file
+atlas.hook.hive.hs2.ignore.ddl.operations=false
diff --git a/addons/impala-bridge/src/test/resources/atlas-application.properties b/addons/impala-bridge/src/test/resources/atlas-application.properties
index 898b69c99..aeb28ed67 100644
--- a/addons/impala-bridge/src/test/resources/atlas-application.properties
+++ b/addons/impala-bridge/src/test/resources/atlas-application.properties
@@ -36,6 +36,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/addons/kafka-bridge/src/test/resources/atlas-application.properties b/addons/kafka-bridge/src/test/resources/atlas-application.properties
index 4a12cf6c8..631c52347 100644
--- a/addons/kafka-bridge/src/test/resources/atlas-application.properties
+++ b/addons/kafka-bridge/src/test/resources/atlas-application.properties
@@ -37,6 +37,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/addons/sqoop-bridge/src/test/resources/atlas-application.properties b/addons/sqoop-bridge/src/test/resources/atlas-application.properties
index 898b69c99..aeb28ed67 100644
--- a/addons/sqoop-bridge/src/test/resources/atlas-application.properties
+++ b/addons/sqoop-bridge/src/test/resources/atlas-application.properties
@@ -36,6 +36,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/addons/storm-bridge/src/test/resources/atlas-application.properties b/addons/storm-bridge/src/test/resources/atlas-application.properties
index b82257894..1338d1972 100644
--- a/addons/storm-bridge/src/test/resources/atlas-application.properties
+++ b/addons/storm-bridge/src/test/resources/atlas-application.properties
@@ -38,6 +38,7 @@ atlas.graph.index.search.backend=solr
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/authorization/src/test/resources/atlas-application.properties b/authorization/src/test/resources/atlas-application.properties
index d735900f5..e908895b2 100644
--- a/authorization/src/test/resources/atlas-application.properties
+++ b/authorization/src/test/resources/atlas-application.properties
@@ -53,6 +53,7 @@ atlas.graph.index.search.backend=${graph.index.backend}
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkley
+atlas.graph.storage.transactions=true
#hbase
#For standalone mode , specify localhost
diff --git a/client/common/pom.xml b/client/common/pom.xml
index c1bd024b1..9bc38c20f 100644
--- a/client/common/pom.xml
+++ b/client/common/pom.xml
@@ -44,5 +44,10 @@
org.apache.atlasatlas-intg
+
+
+ javax.xml.bind
+ jaxb-api
+
diff --git a/dev-support/atlas-docker/.env b/dev-support/atlas-docker/.env
index 0e924b27a..cdcb7b113 100644
--- a/dev-support/atlas-docker/.env
+++ b/dev-support/atlas-docker/.env
@@ -19,8 +19,8 @@ ATLAS_SERVER_JAVA_VERSION=8
ATLAS_VERSION=3.0.0-SNAPSHOT
UBUNTU_VERSION=20.04
-HADOOP_VERSION=3.3.0
-HBASE_VERSION=2.3.3
-KAFKA_VERSION=2.8.1
-HIVE_VERSION=3.1.2
+HADOOP_VERSION=3.3.6
+HBASE_VERSION=2.6.0
+KAFKA_VERSION=2.8.2
+HIVE_VERSION=3.1.3
HIVE_HADOOP_VERSION=3.1.1
diff --git a/dev-support/atlas-docker/Dockerfile.atlas-zk b/dev-support/atlas-docker/Dockerfile.atlas-zk
index 2855b5014..94b228cc3 100644
--- a/dev-support/atlas-docker/Dockerfile.atlas-zk
+++ b/dev-support/atlas-docker/Dockerfile.atlas-zk
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM zookeeper:3.5.9
+FROM zookeeper:3.9.2
diff --git a/graphdb/api/pom.xml b/graphdb/api/pom.xml
index 4ba89b20f..84fc54499 100644
--- a/graphdb/api/pom.xml
+++ b/graphdb/api/pom.xml
@@ -61,6 +61,11 @@
+
+ org.apache.tinkerpop
+ gremlin-util
+ ${tinkerpop.version}
+ org.apache.commonscommons-text
diff --git a/graphdb/janus-hbase2/pom.xml b/graphdb/janus-hbase2/pom.xml
deleted file mode 100644
index 9ec087dbb..000000000
--- a/graphdb/janus-hbase2/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-
-
-
-
- 4.0.0
-
- atlas-graphdb
- org.apache.atlas
- 3.0.0-SNAPSHOT
-
- atlas-janusgraph-hbase2
- Apache Atlas JanusGraph-HBase2 Module
- Apache Atlas JanusGraph-HBase2 Module
- jar
-
-
-
- org.janusgraph
- janusgraph-core
- ${janusgraph.version}
-
-
- com.codahale.metrics
- *
-
-
- org.noggit
- noggit
-
-
- org.apache.tinkerpop
- gremlin-shaded
-
-
- org.apache.tinkerpop
- gremlin-server
-
-
- org.apache.tinkerpop
- gremlin-groovy
-
-
- org.apache.tinkerpop
- gremlin-core
-
-
- org.apache.tinkerpop
- gremlin-driver
-
-
- org.apache.tinkerpop
- tinkergraph-gremlin
-
-
- org.apache.commons
- commons-configuration2
-
-
- org.apache.commons
- commons-text
-
-
- com.rabbitmq
- amqp-client
-
-
-
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- provided
-
-
- org.apache.commons
- commons-configuration2
-
-
- org.apache.commons
- commons-text
-
-
-
-
-
- org.apache.hbase
- hbase-shaded-client
- ${hbase.version}
- true
-
-
- avro
- org.apache.avro
-
-
- jruby-complete
- org.jruby
-
-
- asm
- asm
-
-
-
-
-
- org.apache.commons
- commons-text
- ${commons-text.version}
-
-
-
-
-
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/AdminMask.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/AdminMask.java
deleted file mode 100644
index 548860bcc..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/AdminMask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-/**
- * Copyright DataStax, Inc.
- *
- * Please see the included license file for details.
- */
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface AdminMask extends Closeable
-{
-
- void clearTable(String tableName, long timestamp) throws IOException;
-
- /**
- * Drop given table. Table can be either enabled or disabled.
- * @param tableName Name of the table to delete
- * @throws IOException
- */
- void dropTable(String tableName) throws IOException;
-
- TableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
-
- boolean tableExists(String tableName) throws IOException;
-
- void createTable(TableDescriptor desc) throws IOException;
-
- void createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
-
- /**
- * Estimate the number of regionservers in the HBase cluster.
- *
- * This is usually implemented by calling
- * {@link HBaseAdmin#getClusterStatus()} and then
- * {@link ClusterStatus#getServers()} and finally {@code size()} on the
- * returned server list.
- *
- * @return the number of servers in the cluster or -1 if it could not be determined
- */
- int getEstimatedRegionServerCount();
-
- void disableTable(String tableName) throws IOException;
-
- void enableTable(String tableName) throws IOException;
-
- boolean isTableDisabled(String tableName) throws IOException;
-
- void addColumn(String tableName, ColumnFamilyDescriptor columnDescriptor) throws IOException;
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/ConnectionMask.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/ConnectionMask.java
deleted file mode 100644
index 05ecd532f..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/ConnectionMask.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-/**
- * Copyright DataStax, Inc.
- *
- * Please see the included license file for details.
- */
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface ConnectionMask extends Closeable
-{
-
- /**
- * Retrieve the TableMask compatibility layer object for the supplied table name.
- * @return The TableMask for the specified table.
- * @throws IOException in the case of backend exceptions.
- */
- TableMask getTable(String name) throws IOException;
-
- /**
- * Retrieve the AdminMask compatibility layer object for this Connection.
- * @return The AdminMask for this Connection
- * @throws IOException in the case of backend exceptions.
- */
- AdminMask getAdmin() throws IOException;
-
- /**
- * Retrieve the RegionLocations for the supplied table name.
- * @return A map of HRegionInfo to ServerName that describes the storage regions for the named table.
- * @throws IOException in the case of backend exceptions.
- */
- List getRegionLocations(String tablename) throws IOException;
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseAdmin2_0.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseAdmin2_0.java
deleted file mode 100644
index f93481e92..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseAdmin2_0.java
+++ /dev/null
@@ -1,167 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class HBaseAdmin2_0 implements AdminMask
-{
-
- private static final Logger log = LoggerFactory.getLogger(HBaseAdmin2_0.class);
-
- private final Admin adm;
-
- public HBaseAdmin2_0(Admin adm)
- {
- this.adm = adm;
- }
-
- /**
- * Delete all rows from the given table. This method is intended only for development and testing use.
- * @param tableString
- * @param timestamp
- * @throws IOException
- */
- @Override
- public void clearTable(String tableString, long timestamp) throws IOException
- {
- TableName tableName = TableName.valueOf(tableString);
-
- if (!adm.tableExists(tableName)) {
- log.debug("Attempted to clear table {} before it exists (noop)", tableString);
- return;
- }
-
- // Unfortunately, linear scanning and deleting rows is faster in HBase when running integration tests than
- // disabling and deleting/truncating tables.
- final Scan scan = new Scan();
- scan.setCacheBlocks(false);
- scan.setCaching(2000);
- scan.setTimeRange(0, Long.MAX_VALUE);
- scan.readVersions(1);
-
- try (final Table table = adm.getConnection().getTable(tableName);
- final ResultScanner scanner = table.getScanner(scan)) {
- final Iterator iterator = scanner.iterator();
- final int batchSize = 1000;
- final List deleteList = new ArrayList<>();
- while (iterator.hasNext()) {
- deleteList.add(new Delete(iterator.next().getRow(), timestamp));
- if (!iterator.hasNext() || deleteList.size() == batchSize) {
- table.delete(deleteList);
- deleteList.clear();
- }
- }
- }
- }
-
- @Override
- public void dropTable(String tableString) throws IOException {
- final TableName tableName = TableName.valueOf(tableString);
-
- if (!adm.tableExists(tableName)) {
- log.debug("Attempted to drop table {} before it exists (noop)", tableString);
- return;
- }
-
- if (adm.isTableEnabled(tableName)) {
- adm.disableTable(tableName);
- }
- adm.deleteTable(tableName);
- }
-
- @Override
- public TableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
- {
- return adm.getDescriptor(TableName.valueOf(tableString));
- }
-
- @Override
- public boolean tableExists(String tableString) throws IOException
- {
- return adm.tableExists(TableName.valueOf(tableString));
- }
-
- @Override
- public void createTable(TableDescriptor desc) throws IOException
- {
- adm.createTable(desc);
- }
-
- @Override
- public void createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
- {
- adm.createTable(desc, startKey, endKey, numRegions);
- }
-
- @Override
- public int getEstimatedRegionServerCount()
- {
- int serverCount = -1;
- try {
- serverCount = adm.getClusterStatus().getServers().size();
- log.debug("Read {} servers from HBase ClusterStatus", serverCount);
- } catch (IOException e) {
- log.debug("Unable to retrieve HBase cluster status", e);
- }
- return serverCount;
- }
-
- @Override
- public void disableTable(String tableString) throws IOException
- {
- adm.disableTable(TableName.valueOf(tableString));
- }
-
- @Override
- public void enableTable(String tableString) throws IOException
- {
- adm.enableTable(TableName.valueOf(tableString));
- }
-
- @Override
- public boolean isTableDisabled(String tableString) throws IOException
- {
- return adm.isTableDisabled(TableName.valueOf(tableString));
- }
-
- @Override
- public void addColumn(String tableString, ColumnFamilyDescriptor columnDescriptor) throws IOException
- {
- adm.addColumnFamily(TableName.valueOf(tableString), columnDescriptor);
- }
-
- @Override
- public void close() throws IOException
- {
- adm.close();
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat.java
deleted file mode 100644
index 553ad4606..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-
-import java.io.IOException;
-
-public interface HBaseCompat {
-
- /**
- * Configure the compression scheme {@code algo} on a column family
- * descriptor {@code cd}. The {@code algo} parameter is a string value
- * corresponding to one of the values of HBase's Compression enum. The
- * Compression enum has moved between packages as HBase has evolved, which
- * is why this method has a String argument in the signature instead of the
- * enum itself.
- * @param cd
- * column family to configure
- * @param algo
- */
- public ColumnFamilyDescriptor setCompression(ColumnFamilyDescriptor cd, String algo);
-
- /**
- * Create and return a HTableDescriptor instance with the given name. The
- * constructors on this method have remained stable over HBase development
- * so far, but the old HTableDescriptor(String) constructor & byte[] friends
- * are now marked deprecated and may eventually be removed in favor of the
- * HTableDescriptor(TableName) constructor. That constructor (and the
- * TableName type) only exists in newer HBase versions. Hence this method.
- *
- * @param tableName
- * HBase table name
- * @return a new table descriptor instance
- */
- public TableDescriptor newTableDescriptor(String tableName);
-
- ConnectionMask createConnection(Configuration conf) throws IOException;
-
- TableDescriptor addColumnFamilyToTableDescriptor(TableDescriptor tdesc, ColumnFamilyDescriptor cdesc);
-
- void setTimestamp(Delete d, long timestamp);
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat2_0.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat2_0.java
deleted file mode 100644
index fdba24a3b..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompat2_0.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.compress.Compression;
-
-import java.io.IOException;
-
-public class HBaseCompat2_0 implements HBaseCompat {
-
- @Override
- public ColumnFamilyDescriptor setCompression(ColumnFamilyDescriptor cd, String algo) {
- return ColumnFamilyDescriptorBuilder.newBuilder(cd).setCompressionType(Compression.Algorithm.valueOf(algo)).build();
- }
-
- @Override
- public TableDescriptor newTableDescriptor(String tableName) {
- TableName tn = TableName.valueOf(tableName);
-
- return TableDescriptorBuilder.newBuilder(tn).build();
- }
-
- @Override
- public ConnectionMask createConnection(Configuration conf) throws IOException
- {
- return new HConnection2_0(ConnectionFactory.createConnection(conf));
- }
-
- @Override
- public TableDescriptor addColumnFamilyToTableDescriptor(TableDescriptor tdesc, ColumnFamilyDescriptor cdesc)
- {
- return TableDescriptorBuilder.newBuilder(tdesc).addColumnFamily(cdesc).build();
- }
-
- @Override
- public void setTimestamp(Delete d, long timestamp)
- {
- d.setTimestamp(timestamp);
- }
-
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompatLoader.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompatLoader.java
deleted file mode 100644
index d746b3db0..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseCompatLoader.java
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HBaseCompatLoader {
-
- private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
-
- private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.2";
-
- private static final String HBASE_VERSION_2_STRING = "2.";
-
- private static final String DEFAULT_HBASE_COMPAT_CLASS_NAME =
- "org.janusgraph.diskstorage.hbase2.HBaseCompat2_0";
-
- private static final String[] HBASE_SUPPORTED_VERSIONS =
- new String[] { "0.98", "1.0", "1.1", "1.2", "1.3", "2.0" };
-
- private static HBaseCompat cachedCompat;
-
- public synchronized static HBaseCompat getCompat(String classOverride) {
-
- if (null != cachedCompat) {
- log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
- return cachedCompat;
- }
-
- HBaseCompat compat;
- String className = null;
- String classNameSource = null;
-
- if (null != classOverride) {
- className = classOverride;
- classNameSource = "from explicit configuration";
- } else {
- String hbaseVersion = VersionInfo.getVersion();
- for (String supportedVersion : HBASE_SUPPORTED_VERSIONS) {
- if (hbaseVersion.startsWith(supportedVersion + ".")) {
- if (hbaseVersion.startsWith(HBASE_VERSION_2_STRING)) {
- // All HBase 2.x maps to HBaseCompat2_0.
- className = DEFAULT_HBASE_COMPAT_CLASS_NAME;
- }
- else {
- className = "org.janusgraph.diskstorage.hbase2.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
- }
- classNameSource = "supporting runtime HBase version " + hbaseVersion;
- break;
- }
- }
- if (null == className) {
- log.info("The HBase version {} is not explicitly supported by JanusGraph. " +
- "Loading JanusGraph's compatibility layer for its most recent supported HBase version ({})",
- hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
- className = DEFAULT_HBASE_COMPAT_CLASS_NAME;
- classNameSource = " by default";
- }
- }
-
- final String errTemplate = " when instantiating HBase compatibility class " + className;
-
- try {
- compat = (HBaseCompat)Class.forName(className).newInstance();
- log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (InstantiationException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
- }
-
- return cachedCompat = compat;
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseKeyColumnValueStore.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseKeyColumnValueStore.java
deleted file mode 100644
index ffafc8c4d..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseKeyColumnValueStore.java
+++ /dev/null
@@ -1,391 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
-import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.janusgraph.diskstorage.BackendException;
-import org.janusgraph.diskstorage.Entry;
-import org.janusgraph.diskstorage.EntryList;
-import org.janusgraph.diskstorage.EntryMetaData;
-import org.janusgraph.diskstorage.PermanentBackendException;
-import org.janusgraph.diskstorage.StaticBuffer;
-import org.janusgraph.diskstorage.TemporaryBackendException;
-import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
-import org.janusgraph.diskstorage.keycolumnvalue.KCVSUtil;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery;
-import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery;
-import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator;
-import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
-import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery;
-import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
-import org.janusgraph.diskstorage.util.RecordIterator;
-import org.janusgraph.diskstorage.util.StaticArrayBuffer;
-import org.janusgraph.diskstorage.util.StaticArrayEntry;
-import org.janusgraph.diskstorage.util.StaticArrayEntryList;
-import org.janusgraph.util.system.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-
-/**
- * Here are some areas that might need work:
- *
- * - batching? (consider HTable#batch, HTable#setAutoFlush(false)
- * - tuning HTable#setWriteBufferSize (?)
- * - writing a server-side filter to replace ColumnCountGetFilter, which drops
- * all columns on the row where it reaches its limit. This requires getSlice,
- * currently, to impose its limit on the client side. That obviously won't
- * scale.
- * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this)
- * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache
- *
- * There may be other problem areas. These are just the ones of which I'm aware.
- */
-public class HBaseKeyColumnValueStore implements KeyColumnValueStore {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class);
-
- private final String tableName;
- private final HBaseStoreManager storeManager;
-
- // When using shortened CF names, columnFamily is the shortname and storeName is the longname
- // When not using shortened CF names, they are the same
- //private final String columnFamily;
- private final String storeName;
- // This is columnFamily.getBytes()
- private final byte[] columnFamilyBytes;
- private final HBaseGetter entryGetter;
-
- private final ConnectionMask cnx;
-
- HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) {
- this.storeManager = storeManager;
- this.cnx = cnx;
- this.tableName = tableName;
- //this.columnFamily = columnFamily;
- this.storeName = storeName;
- this.columnFamilyBytes = Bytes.toBytes(columnFamily);
- this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName));
- }
-
- @Override
- public void close() throws BackendException {
- }
-
- @Override
- public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
- Map result = getHelper(Arrays.asList(query.getKey()), getFilter(query));
- return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
- }
-
- @Override
- public Map getSlice(List keys, SliceQuery query, StoreTransaction txh) throws BackendException {
- return getHelper(keys, getFilter(query));
- }
-
- @Override
- public void mutate(StaticBuffer key, List additions, List deletions, StoreTransaction txh) throws BackendException {
- Map mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
- mutateMany(mutations, txh);
- }
-
- @Override
- public void acquireLock(StaticBuffer key,
- StaticBuffer column,
- StaticBuffer expectedValue,
- StoreTransaction txh) throws BackendException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY),
- query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY),
- new FilterList(FilterList.Operator.MUST_PASS_ALL),
- query);
- }
-
- @Override
- public String getName() {
- return storeName;
- }
-
- @Override
- public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
- return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query);
- }
-
- @Override
- public KeySlicesIterator getKeys(MultiSlicesQuery queries, StoreTransaction txh) throws BackendException {
- throw new UnsupportedOperationException();
- }
-
- public static Filter getFilter(SliceQuery query) {
- byte[] colStartBytes = query.getSliceStart().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null;
- byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null;
-
- Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false);
-
- if (query.hasLimit()) {
- filter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
- filter,
- new ColumnPaginationFilter(query.getLimit(), 0));
- }
-
- logger.debug("Generated HBase Filter {}", filter);
-
- return filter;
- }
-
- private Map getHelper(List keys, Filter getFilter) throws BackendException {
- List requests = new ArrayList(keys.size());
- {
- for (StaticBuffer key : keys) {
- Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter);
- try {
- g.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
- requests.add(g);
- }
- }
-
- Map resultMap = new HashMap(keys.size());
-
- try {
- TableMask table = null;
- Result[] results = null;
-
- try {
- table = cnx.getTable(tableName);
- results = table.get(requests);
- } finally {
- IOUtils.closeQuietly(table);
- }
-
- if (results == null)
- return KCVSUtil.emptyResults(keys);
-
- assert results.length==keys.size();
-
- for (int i = 0; i < results.length; i++) {
- Result result = results[i];
- NavigableMap>> f = result.getMap();
-
- if (f == null) { // no result for this key
- resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
- continue;
- }
-
- // actual key with
- NavigableMap> r = f.get(columnFamilyBytes);
- resultMap.put(keys.get(i), (r == null)
- ? EntryList.EMPTY_LIST
- : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter));
- }
-
- return resultMap;
- } catch (InterruptedIOException e) {
- // added to support traversal interruption
- Thread.currentThread().interrupt();
- throw new PermanentBackendException(e);
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
- }
-
- private void mutateMany(Map mutations, StoreTransaction txh) throws BackendException {
- storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh);
- }
-
- private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException {
- return executeKeySliceQuery(null, null, filters, columnSlice);
- }
-
- private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey,
- @Nullable byte[] endKey,
- FilterList filters,
- @Nullable SliceQuery columnSlice) throws BackendException {
- Scan scan = new Scan().addFamily(columnFamilyBytes);
-
- try {
- scan.setTimeRange(0, Long.MAX_VALUE);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
-
- if (startKey != null)
- scan.withStartRow(startKey);
-
- if (endKey != null)
- scan.withStopRow(endKey);
-
- if (columnSlice != null) {
- filters.addFilter(getFilter(columnSlice));
- }
-
- TableMask table = null;
-
- try {
- table = cnx.getTable(tableName);
- return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes);
- } catch (IOException e) {
- IOUtils.closeQuietly(table);
- throw new PermanentBackendException(e);
- }
- }
-
- private class RowIterator implements KeyIterator {
- private final Closeable table;
- private final Iterator rows;
- private final byte[] columnFamilyBytes;
-
- private Result currentRow;
- private boolean isClosed;
-
- public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) {
- this.table = table;
- this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length);
- this.rows = Iterators.filter(rows.iterator(), result -> null != result && null != result.getRow());
- }
-
- @Override
- public RecordIterator getEntries() {
- ensureOpen();
-
- return new RecordIterator() {
- private final Iterator>> kv;
- {
- final Map>> map = currentRow.getMap();
- Preconditions.checkNotNull(map);
- kv = map.get(columnFamilyBytes).entrySet().iterator();
- }
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return kv.hasNext();
- }
-
- @Override
- public Entry next() {
- ensureOpen();
- return StaticArrayEntry.ofBytes(kv.next(), entryGetter);
- }
-
- @Override
- public void close() {
- isClosed = true;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public boolean hasNext() {
- ensureOpen();
- return rows.hasNext();
- }
-
- @Override
- public StaticBuffer next() {
- ensureOpen();
-
- currentRow = rows.next();
- return StaticArrayBuffer.of(currentRow.getRow());
- }
-
- @Override
- public void close() {
- IOUtils.closeQuietly(table);
- isClosed = true;
- logger.debug("RowIterator closed table {}", table);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private void ensureOpen() {
- if (isClosed)
- throw new IllegalStateException("Iterator has been closed.");
- }
- }
-
- private static class HBaseGetter implements StaticArrayEntry.GetColVal>, byte[]> {
-
- private final EntryMetaData[] schema;
-
- private HBaseGetter(EntryMetaData[] schema) {
- this.schema = schema;
- }
-
- @Override
- public byte[] getColumn(Map.Entry> element) {
- return element.getKey();
- }
-
- @Override
- public byte[] getValue(Map.Entry> element) {
- return element.getValue().lastEntry().getValue();
- }
-
- @Override
- public EntryMetaData[] getMetaSchema(Map.Entry> element) {
- return schema;
- }
-
- @Override
- public Object getMetaData(Map.Entry> element, EntryMetaData meta) {
- switch(meta) {
- case TIMESTAMP:
- return element.getValue().lastEntry().getKey();
- default:
- throw new UnsupportedOperationException("Unsupported meta data: " + meta);
- }
- }
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseStoreManager.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseStoreManager.java
deleted file mode 100644
index 5f8e31021..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseStoreManager.java
+++ /dev/null
@@ -1,988 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.janusgraph.core.JanusGraphException;
-import org.janusgraph.diskstorage.BackendException;
-import org.janusgraph.diskstorage.BaseTransactionConfig;
-import org.janusgraph.diskstorage.Entry;
-import org.janusgraph.diskstorage.EntryMetaData;
-import org.janusgraph.diskstorage.PermanentBackendException;
-import org.janusgraph.diskstorage.StaticBuffer;
-import org.janusgraph.diskstorage.StoreMetaData;
-import org.janusgraph.diskstorage.TemporaryBackendException;
-import org.janusgraph.diskstorage.common.DistributedStoreManager;
-import org.janusgraph.diskstorage.configuration.ConfigElement;
-import org.janusgraph.diskstorage.configuration.ConfigNamespace;
-import org.janusgraph.diskstorage.configuration.ConfigOption;
-import org.janusgraph.diskstorage.configuration.Configuration;
-import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
-import org.janusgraph.diskstorage.keycolumnvalue.KeyRange;
-import org.janusgraph.diskstorage.keycolumnvalue.StandardStoreFeatures;
-import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures;
-import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
-import org.janusgraph.diskstorage.util.BufferUtil;
-import org.janusgraph.diskstorage.util.StaticArrayBuffer;
-import org.janusgraph.diskstorage.util.time.TimestampProviders;
-import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
-import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
-import org.janusgraph.util.system.IOUtils;
-import org.janusgraph.util.system.NetworkUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.janusgraph.diskstorage.Backend.EDGESTORE_NAME;
-import static org.janusgraph.diskstorage.Backend.INDEXSTORE_NAME;
-import static org.janusgraph.diskstorage.Backend.LOCK_STORE_SUFFIX;
-import static org.janusgraph.diskstorage.Backend.SYSTEM_MGMT_LOG_NAME;
-import static org.janusgraph.diskstorage.Backend.SYSTEM_TX_LOG_NAME;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.DROP_ON_CLEAR;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.GRAPH_NAME;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.IDS_STORE_NAME;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME;
-
-/**
- * Storage Manager for HBase
- */
-@PreInitializeConfigOptions
-public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
-
- private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class);
-
- public static final ConfigNamespace HBASE_NS =
- new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options");
-
- public static final ConfigOption SHORT_CF_NAMES =
- new ConfigOption<>(HBASE_NS, "short-cf-names",
- "Whether to shorten the names of JanusGraph's column families to one-character mnemonics " +
- "to conserve storage space", ConfigOption.Type.FIXED, true);
-
- public static final String COMPRESSION_DEFAULT = "-DEFAULT-";
-
- public static final ConfigOption COMPRESSION =
- new ConfigOption<>(HBASE_NS, "compression-algorithm",
- "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " +
- "The compression algorithm must be installed and available on the HBase cluster. JanusGraph cannot install " +
- "and configure new compression algorithms on the HBase cluster by itself.",
- ConfigOption.Type.MASKABLE, "SNAPPY");
-
- public static final ConfigOption SKIP_SCHEMA_CHECK =
- new ConfigOption<>(HBASE_NS, "skip-schema-check",
- "Assume that JanusGraph's HBase table and column families already exist. " +
- "When this is true, JanusGraph will not check for the existence of its table/CFs, " +
- "nor will it attempt to create them under any circumstances. This is useful " +
- "when running JanusGraph without HBase admin privileges.",
- ConfigOption.Type.MASKABLE, false);
-
- public static final ConfigOption HBASE_TABLE =
- new ConfigOption<>(HBASE_NS, "table",
- "The name of the table JanusGraph will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) +
- " is false, JanusGraph will automatically create this table if it does not already exist." +
- " If this configuration option is not provided but graph.graphname is, the table will be set" +
- " to that value.",
- ConfigOption.Type.LOCAL, "janusgraph");
-
- /**
- * Related bug fixed in 0.98.0, 0.94.7, 0.95.0:
- *
- * https://issues.apache.org/jira/browse/HBASE-8170
- */
- public static final int MIN_REGION_COUNT = 3;
-
- /**
- * The total number of HBase regions to create with JanusGraph's table. This
- * setting only effects table creation; this normally happens just once when
- * JanusGraph connects to an HBase backend for the first time.
- */
- public static final ConfigOption REGION_COUNT =
- new ConfigOption(HBASE_NS, "region-count",
- "The number of initial regions set when creating JanusGraph's HBase table",
- ConfigOption.Type.MASKABLE, Integer.class, input -> null != input && MIN_REGION_COUNT <= input);
-
- /**
- * This setting is used only when {@link #REGION_COUNT} is unset.
- *
- * If JanusGraph's HBase table does not exist, then it will be created with total
- * region count = (number of servers reported by ClusterStatus) * (this
- * value).
- *
- * The Apache HBase manual suggests an order-of-magnitude range of potential
- * values for this setting:
- *
- *
- * What's the optimal number of pre-split regions to create? Mileage will
- * vary depending upon your application. You could start low with 10
- * pre-split regions / server and watch as data grows over time. It's
- * better to err on the side of too little regions and rolling split later.
- *
- * In general, HBase is designed to run with a small (20-200) number of
- * relatively large (5-20Gb) regions per server... Typically you want to
- * keep your region count low on HBase for numerous reasons. Usually
- * right around 100 regions per RegionServer has yielded the best results.
- *
- *
- *
- *
- * These considerations may differ for other HBase implementations (e.g. MapR).
- */
- public static final ConfigOption REGIONS_PER_SERVER =
- new ConfigOption<>(HBASE_NS, "regions-per-server",
- "The number of regions per regionserver to set when creating JanusGraph's HBase table",
- ConfigOption.Type.MASKABLE, Integer.class);
-
- /**
- * If this key is present in either the JVM system properties or the process
- * environment (checked in the listed order, first hit wins), then its value
- * must be the full package and class name of an implementation of
- * {@link HBaseCompat} that has a no-arg public constructor.
- *
- * When this is not set, JanusGraph attempts to automatically detect the
- * HBase runtime version by calling {@link VersionInfo#getVersion()}. JanusGraph
- * then checks the returned version string against a hard-coded list of
- * supported version prefixes and instantiates the associated compat layer
- * if a match is found.
- *
- * When this is set, JanusGraph will not call
- * {@code VersionInfo.getVersion()} or read its hard-coded list of supported
- * version prefixes. JanusGraph will instead attempt to instantiate the class
- * specified (via the no-arg constructor which must exist) and then attempt
- * to cast it to HBaseCompat and use it as such. JanusGraph will assume the
- * supplied implementation is compatible with the runtime HBase version and
- * make no attempt to verify that assumption.
- *
- * Setting this key incorrectly could cause runtime exceptions at best or
- * silent data corruption at worst. This setting is intended for users
- * running exotic HBase implementations that don't support VersionInfo or
- * implementations which return values from {@code VersionInfo.getVersion()}
- * that are inconsistent with Apache's versioning convention. It may also be
- * useful to users who want to run against a new release of HBase that JanusGraph
- * doesn't yet officially support.
- *
- */
- public static final ConfigOption COMPAT_CLASS =
- new ConfigOption<>(HBASE_NS, "compat-class",
- "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " +
- "When this option is unset, JanusGraph calls HBase's VersionInfo.getVersion() and loads the matching compat class " +
- "at runtime. Setting this option forces JanusGraph to instead reflectively load and instantiate the specified class.",
- ConfigOption.Type.MASKABLE, String.class);
-
- public static final int PORT_DEFAULT = 9160;
-
- public static final TimestampProviders PREFERRED_TIMESTAMPS = TimestampProviders.MICRO;
-
- public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE =
- new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true);
-
- private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4);
-
- // Immutable instance fields
- private final BiMap shortCfNameMap;
- private final String tableName;
- private final String compression;
- private final int regionCount;
- private final int regionsPerServer;
- private final ConnectionMask cnx;
- private final org.apache.hadoop.conf.Configuration hconf;
- private final boolean shortCfNames;
- private final boolean skipSchemaCheck;
- private final String compatClass;
- private final HBaseCompat compat;
- // Cached return value of getDeployment() as requesting it can be expensive.
- private Deployment deployment = null;
-
- private static final ConcurrentHashMap openManagers = new ConcurrentHashMap<>();
-
- // Mutable instance state
- private final ConcurrentMap openStores;
-
- public HBaseStoreManager(org.janusgraph.diskstorage.configuration.Configuration config) throws BackendException {
- super(config, PORT_DEFAULT);
-
- shortCfNameMap = createShortCfMap(config);
-
- Preconditions.checkArgument(null != shortCfNameMap);
- Collection shorts = shortCfNameMap.values();
- Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size());
-
- checkConfigDeprecation(config);
-
- this.tableName = determineTableName(config);
- this.compression = config.get(COMPRESSION);
- this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1;
- this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1;
- this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK);
- this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null;
- this.compat = HBaseCompatLoader.getCompat(compatClass);
-
- /*
- * Specifying both region count options is permitted but may be
- * indicative of a misunderstanding, so issue a warning.
- */
- if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) {
- logger.warn("Both {} and {} are set in JanusGraph's configuration, but "
- + "the former takes precedence and the latter will be ignored.",
- REGION_COUNT, REGIONS_PER_SERVER);
- }
-
- /* This static factory calls HBaseConfiguration.addHbaseResources(),
- * which in turn applies the contents of hbase-default.xml and then
- * applies the contents of hbase-site.xml.
- */
- this.hconf = HBaseConfiguration.create();
-
- // Copy a subset of our commons config into a Hadoop config
- int keysLoaded=0;
- Map configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE);
- for (Map.Entry entry : configSub.entrySet()) {
- logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue());
- if (entry.getValue()==null) continue;
- hconf.set(entry.getKey(), entry.getValue().toString());
- keysLoaded++;
- }
-
- // Special case for STORAGE_HOSTS
- if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) {
- String zkQuorumKey = "hbase.zookeeper.quorum";
- String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS));
- hconf.set(zkQuorumKey, csHostList);
- logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList);
- }
-
- logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded);
-
- this.shortCfNames = config.get(SHORT_CF_NAMES);
-
- try {
- //this.cnx = HConnectionManager.createConnection(hconf);
- this.cnx = compat.createConnection(hconf);
- } catch (IOException e) {
- throw new PermanentBackendException(e);
- }
-
- if (logger.isTraceEnabled()) {
- openManagers.put(this, new Throwable("Manager Opened"));
- dumpOpenManagers();
- }
-
- logger.debug("Dumping HBase config key=value pairs");
- for (Map.Entry entry : hconf) {
- logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue());
- }
- logger.debug("End of HBase config key=value pairs");
-
- openStores = new ConcurrentHashMap<>();
- }
-
- public static BiMap createShortCfMap(Configuration config) {
- return ImmutableBiMap.builder()
- .put(INDEXSTORE_NAME, "g")
- .put(INDEXSTORE_NAME + LOCK_STORE_SUFFIX, "h")
- .put(config.get(IDS_STORE_NAME), "i")
- .put(EDGESTORE_NAME, "e")
- .put(EDGESTORE_NAME + LOCK_STORE_SUFFIX, "f")
- .put(SYSTEM_PROPERTIES_STORE_NAME, "s")
- .put(SYSTEM_PROPERTIES_STORE_NAME + LOCK_STORE_SUFFIX, "t")
- .put(SYSTEM_MGMT_LOG_NAME, "m")
- .put(SYSTEM_TX_LOG_NAME, "l")
- .build();
- }
-
- @Override
- public Deployment getDeployment() {
- if (null != deployment) {
- return deployment;
- }
-
- List local;
- try {
- local = getLocalKeyPartition();
- deployment = null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE;
- } catch (BackendException e) {
- throw new RuntimeException(e);
- }
- return deployment;
- }
-
- @Override
- public String toString() {
- return "hbase[" + tableName + "@" + super.toString() + "]";
- }
-
- public void dumpOpenManagers() {
- int estimatedSize = openManagers.size();
- logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize);
- for (HBaseStoreManager m : openManagers.keySet()) {
- logger.trace("Manager {} opened at:", m, openManagers.get(m));
- }
- logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize);
- }
-
- @Override
- public void close() {
- openStores.clear();
- if (logger.isTraceEnabled())
- openManagers.remove(this);
- IOUtils.closeQuietly(cnx);
- }
-
- @Override
- public StoreFeatures getFeatures() {
-
- Configuration c = GraphDatabaseConfiguration.buildGraphConfiguration();
-
- StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder()
- .orderedScan(true).unorderedScan(true).batchMutation(true)
- .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true)
- .cellTTL(true).timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS)
- .optimisticLocking(true).keyConsistent(c);
-
- try {
- fb.localKeyPartition(getDeployment() == Deployment.LOCAL);
- } catch (Exception e) {
- logger.warn("Unexpected exception during getDeployment()", e);
- }
-
- return fb.build();
- }
-
- @Override
- public void mutateMany(Map> mutations, StoreTransaction txh) throws BackendException {
- final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
- // In case of an addition and deletion with identical timestamps, the
- // deletion tombstone wins.
- // http://hbase.apache.org/book/versions.html#d244e4250
- final Map, Delete>> commandsPerKey =
- convertToCommands(
- mutations,
- commitTime.getAdditionTime(times),
- commitTime.getDeletionTime(times));
-
- final List batch = new ArrayList<>(commandsPerKey.size()); // actual batch operation
-
- // convert sorted commands into representation required for 'batch' operation
- for (Pair, Delete> commands : commandsPerKey.values()) {
- if (commands.getFirst() != null && !commands.getFirst().isEmpty())
- batch.addAll(commands.getFirst());
-
- if (commands.getSecond() != null)
- batch.add(commands.getSecond());
- }
-
- try {
- TableMask table = null;
-
- try {
- table = cnx.getTable(tableName);
- table.batch(batch, new Object[batch.size()]);
- } finally {
- IOUtils.closeQuietly(table);
- }
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- } catch (InterruptedException e) {
- throw new TemporaryBackendException(e);
- }
-
- this.sleepAfterWrite(commitTime);
- }
-
- @Override
- public KeyColumnValueStore openDatabase(String longName, StoreMetaData.Container metaData) throws BackendException {
- // HBase does not support retrieving cell-level TTL by the client.
- Preconditions.checkArgument(!storageConfig.has(GraphDatabaseConfiguration.STORE_META_TTL, longName)
- || !storageConfig.get(GraphDatabaseConfiguration.STORE_META_TTL, longName));
-
- HBaseKeyColumnValueStore store = openStores.get(longName);
-
- if (store == null) {
- final String cfName = getCfNameForStoreName(longName);
-
- HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName);
-
- store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread
-
- if (store == null) {
- if (!skipSchemaCheck) {
- int cfTTLInSeconds = -1;
- if (metaData.contains(StoreMetaData.TTL)) {
- cfTTLInSeconds = metaData.get(StoreMetaData.TTL);
- }
- ensureColumnFamilyExists(tableName, cfName, cfTTLInSeconds);
- }
-
- store = newStore;
- }
- }
-
- return store;
- }
-
- @Override
- public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException {
- return new HBaseTransaction(config);
- }
-
- @Override
- public String getName() {
- return tableName;
- }
-
- /**
- * Deletes the specified table with all its columns.
- * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss.
- */
- @Override
- public void clearStorage() throws BackendException {
- try (AdminMask adm = getAdminInterface()) {
- if (this.storageConfig.get(DROP_ON_CLEAR)) {
- adm.dropTable(tableName);
- } else {
- adm.clearTable(tableName, times.getTime(times.getTime()));
- }
- } catch (IOException e)
- {
- throw new TemporaryBackendException(e);
- }
- }
-
- @Override
- public boolean exists() throws BackendException {
- try (final AdminMask adm = getAdminInterface()) {
- return adm.tableExists(tableName);
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
- }
-
- @Override
- public List getLocalKeyPartition() throws BackendException {
- List result = new LinkedList<>();
- try {
- ensureTableExists(
- tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0);
- Map normed = normalizeKeyBounds(cnx.getRegionLocations(tableName));
-
- for (Map.Entry e : normed.entrySet()) {
- if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) {
- result.add(e.getKey());
- logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue());
- } else {
- logger.debug("Discarding remote {}", e.getValue());
- }
- }
- } catch (MasterNotRunningException e) {
- logger.warn("Unexpected MasterNotRunningException", e);
- } catch (ZooKeeperConnectionException e) {
- logger.warn("Unexpected ZooKeeperConnectionException", e);
- } catch (IOException e) {
- logger.warn("Unexpected IOException", e);
- }
- return result;
- }
-
- /**
- * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the
- * region's start and end key bounds using JanusGraph-partitioning-friendly
- * conventions (start inclusive, end exclusive, zero bytes appended where
- * necessary to make all keys at least 4 bytes long).
- *
- * This method iterates over the entries in its map parameter and performs
- * the following conditional conversions on its keys. "Require" below means
- * either a {@link Preconditions} invocation or an assertion. HRegionInfo
- * sometimes returns start and end keys of zero length; this method replaces
- * zero length keys with null before doing any of the checks described
- * below. The parameter map and the values it contains are only read and
- * never modified.
- *
- *
- *
If an entry's HRegionInfo has null start and end keys, then first
- * require that the parameter map is a singleton, and then return a
- * single-entry map whose {@code KeyRange} has start and end buffers that
- * are both four bytes of zeros.
- *
If the entry has a null end key (but non-null start key), put an
- * equivalent entry in the result map with a start key identical to the
- * input, except that zeros are appended to values less than 4 bytes long,
- * and an end key that is four bytes of zeros.
- *
If the entry has a null start key (but non-null end key), put an
- * equivalent entry in the result map where the start key is four bytes of
- * zeros, and the end key has zeros appended, if necessary, to make it at
- * least 4 bytes long, after which one is added to the padded value in
- * unsigned 32-bit arithmetic with overflow allowed.
- *
Any entry which matches none of the above criteria results in an
- * equivalent entry in the returned map, except that zeros are appended to
- * both keys to make each at least 4 bytes long, and the end key is then
- * incremented as described in the last bullet point.
- *
- *
- * After iterating over the parameter map, this method checks that it either
- * saw no entries with null keys, one entry with a null start key and a
- * different entry with a null end key, or one entry with both start and end
- * keys null. If any null keys are observed besides these three cases, the
- * method will die with a precondition failure.
- *
- * @param locations A list of HRegionInfo
- * @return JanusGraph-friendly expression of each region's rowkey boundaries
- */
- private Map normalizeKeyBounds(List locations) {
-
- HRegionLocation nullStart = null;
- HRegionLocation nullEnd = null;
-
- ImmutableMap.Builder b = ImmutableMap.builder();
-
- for (HRegionLocation location : locations) {
- HRegionInfo regionInfo = location.getRegionInfo();
- ServerName serverName = location.getServerName();
- byte startKey[] = regionInfo.getStartKey();
- byte endKey[] = regionInfo.getEndKey();
-
- if (0 == startKey.length) {
- startKey = null;
- logger.trace("Converted zero-length HBase startKey byte array to null");
- }
-
- if (0 == endKey.length) {
- endKey = null;
- logger.trace("Converted zero-length HBase endKey byte array to null");
- }
-
- if (null == startKey && null == endKey) {
- Preconditions.checkState(1 == locations.size());
- logger.debug("HBase table {} has a single region {}", tableName, regionInfo);
- // Choose arbitrary shared value = startKey = endKey
- return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), serverName).build();
- } else if (null == startKey) {
- logger.debug("Found HRegionInfo with null startKey on server {}: {}", serverName, regionInfo);
- Preconditions.checkState(null == nullStart);
- nullStart = location;
- // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive
- StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
- // Replace null start key with zeroes
- b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), serverName);
- } else if (null == endKey) {
- logger.debug("Found HRegionInfo with null endKey on server {}: {}", serverName, regionInfo);
- Preconditions.checkState(null == nullEnd);
- nullEnd = location;
- // Replace null end key with zeroes
- b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), serverName);
- } else {
- Preconditions.checkState(null != startKey);
- Preconditions.checkState(null != endKey);
-
- // Convert HBase's inclusive end keys into exclusive JanusGraph end keys
- StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey));
- StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey));
-
- KeyRange kr = new KeyRange(startBuf, endBuf);
- b.put(kr, serverName);
- logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", serverName, regionInfo);
- }
- }
-
- // Require either no null key bounds or a pair of them
- Preconditions.checkState(!(null == nullStart ^ null == nullEnd));
-
- // Check that every key in the result is at least 4 bytes long
- Map result = b.build();
- for (KeyRange kr : result.keySet()) {
- Preconditions.checkState(4 <= kr.getStart().length());
- Preconditions.checkState(4 <= kr.getEnd().length());
- }
-
- return result;
- }
-
- /**
- * If the parameter is shorter than 4 bytes, then create and return a new 4
- * byte array with the input array's bytes followed by zero bytes. Otherwise
- * return the parameter.
- *
- * @param dataToPad non-null but possibly zero-length byte array
- * @return either the parameter or a new array
- */
- private final byte[] zeroExtend(byte[] dataToPad) {
- assert null != dataToPad;
-
- final int targetLength = 4;
-
- if (targetLength <= dataToPad.length)
- return dataToPad;
-
- byte padded[] = new byte[targetLength];
-
- for (int i = 0; i < dataToPad.length; i++)
- padded[i] = dataToPad[i];
-
- for (int i = dataToPad.length; i < padded.length; i++)
- padded[i] = (byte)0;
-
- return padded;
- }
-
- public static String shortenCfName(BiMap shortCfNameMap, String longName) throws PermanentBackendException {
- final String s;
- if (shortCfNameMap.containsKey(longName)) {
- s = shortCfNameMap.get(longName);
- Preconditions.checkNotNull(s);
- logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s);
- } else {
- if (shortCfNameMap.containsValue(longName)) {
- String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true";
- String msg = String.format(fmt, shortCfNameMap.inverse().get(longName), longName, SHORT_CF_NAMES.getName());
- throw new PermanentBackendException(msg);
- }
- s = longName;
- logger.debug("Kept default CF name \"{}\" because it has no associated short form", s);
- }
- return s;
- }
-
- private TableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException {
- AdminMask adm = null;
-
- TableDescriptor desc;
-
- try { // Create our table, if necessary
- adm = getAdminInterface();
- /*
- * Some HBase versions/impls respond badly to attempts to create a
- * table without at least one CF. See #661. Creating a CF along with
- * the table avoids HBase carping.
- */
- if (adm.tableExists(tableName)) {
- desc = adm.getTableDescriptor(tableName);
- // Check and warn if long and short cf names are mixedly used for the same table.
- if (shortCfNames && initialCFName.equals(shortCfNameMap.get(SYSTEM_PROPERTIES_STORE_NAME))) {
- String longCFName = shortCfNameMap.inverse().get(initialCFName);
- if (desc.getColumnFamily(Bytes.toBytes(longCFName)) != null) {
- logger.warn("Configuration {}=true, but the table \"{}\" already has column family with long name \"{}\".",
- SHORT_CF_NAMES.getName(), tableName, longCFName);
- logger.warn("Check {} configuration.", SHORT_CF_NAMES.getName());
- }
- }
- else if (!shortCfNames && initialCFName.equals(SYSTEM_PROPERTIES_STORE_NAME)) {
- String shortCFName = shortCfNameMap.get(initialCFName);
- if (desc.getColumnFamily(Bytes.toBytes(shortCFName)) != null) {
- logger.warn("Configuration {}=false, but the table \"{}\" already has column family with short name \"{}\".",
- SHORT_CF_NAMES.getName(), tableName, shortCFName);
- logger.warn("Check {} configuration.", SHORT_CF_NAMES.getName());
- }
- }
- } else {
- desc = createTable(tableName, initialCFName, ttlInSeconds, adm);
- }
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- } finally {
- IOUtils.closeQuietly(adm);
- }
-
- return desc;
- }
-
- private TableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException {
- TableDescriptor desc = compat.newTableDescriptor(tableName);
-
- ColumnFamilyDescriptor cdesc = ColumnFamilyDescriptorBuilder.of(cfName);
- cdesc = setCFOptions(cdesc, ttlInSeconds);
-
- desc = compat.addColumnFamilyToTableDescriptor(desc, cdesc);
-
- int count; // total regions to create
- String src;
-
- if (MIN_REGION_COUNT <= (count = regionCount)) {
- src = "region count configuration";
- } else if (0 < regionsPerServer &&
- MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) {
- src = "ClusterStatus server count";
- } else {
- count = -1;
- src = "default";
- }
-
- if (MIN_REGION_COUNT < count) {
- adm.createTable(desc, getStartKey(count), getEndKey(count), count);
- logger.debug("Created table {} with region count {} from {}", tableName, count, src);
- } else {
- adm.createTable(desc);
- logger.debug("Created table {} with default start key, end key, and region count", tableName);
- }
-
- return desc;
- }
-
- /**
- *
- * From the {@code createTable} javadoc:
- * "The start key specified will become the end key of the first region of
- * the table, and the end key specified will become the start key of the
- * last region of the table (the first region has a null start key and
- * the last region has a null end key)"
- *
- * To summarize, the {@code createTable} argument called "startKey" is
- * actually the end key of the first region.
- */
- private byte[] getStartKey(int regionCount) {
- ByteBuffer regionWidth = ByteBuffer.allocate(4);
- regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip();
- return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
- }
-
- /**
- * Companion to {@link #getStartKey(int)}. See its javadoc for details.
- */
- private byte[] getEndKey(int regionCount) {
- ByteBuffer regionWidth = ByteBuffer.allocate(4);
- regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip();
- return StaticArrayBuffer.of(regionWidth).getBytes(0, 4);
- }
-
- private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException {
- AdminMask adm = null;
- try {
- adm = getAdminInterface();
- TableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds);
-
- Preconditions.checkNotNull(desc);
-
- ColumnFamilyDescriptor cf = desc.getColumnFamily(Bytes.toBytes(columnFamily));
-
- // Create our column family, if necessary
- if (cf == null) {
- try {
- if (!adm.isTableDisabled(tableName)) {
- adm.disableTable(tableName);
- }
- } catch (TableNotEnabledException e) {
- logger.debug("Table {} already disabled", tableName);
- } catch (IOException e) {
- throw new TemporaryBackendException(e);
- }
-
- try {
- ColumnFamilyDescriptor cdesc = ColumnFamilyDescriptorBuilder.of(columnFamily);
-
- cdesc = setCFOptions(cdesc, ttlInSeconds);
-
- adm.addColumn(tableName, cdesc);
-
- try {
- logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily);
- Thread.sleep(1000L);
- } catch (InterruptedException ie) {
- throw new TemporaryBackendException(ie);
- }
-
- adm.enableTable(tableName);
- } catch (TableNotFoundException ee) {
- logger.error("TableNotFoundException", ee);
- throw new PermanentBackendException(ee);
- } catch (org.apache.hadoop.hbase.TableExistsException ee) {
- logger.debug("Swallowing exception {}", ee);
- } catch (IOException ee) {
- throw new TemporaryBackendException(ee);
- }
- }
- } finally {
- IOUtils.closeQuietly(adm);
- }
- }
-
- private ColumnFamilyDescriptor setCFOptions(ColumnFamilyDescriptor cdesc, int ttlInSeconds) {
- ColumnFamilyDescriptor ret = null;
-
- if (null != compression && !compression.equals(COMPRESSION_DEFAULT)) {
- cdesc = ColumnFamilyDescriptorBuilder.newBuilder(cdesc).setDataBlockEncoding( DataBlockEncoding.FAST_DIFF).build();
- ret = compat.setCompression(cdesc, compression);
- }
-
- if (ttlInSeconds > 0) {
- ret = ColumnFamilyDescriptorBuilder.newBuilder(cdesc).setTimeToLive(ttlInSeconds).build();
- }
-
- return ret;
- }
-
- /**
- * Convert JanusGraph internal Mutation representation into HBase native commands.
- *
- * @param mutations Mutations to convert into HBase commands.
- * @param putTimestamp The timestamp to use for Put commands.
- * @param delTimestamp The timestamp to use for Delete commands.
- * @return Commands sorted by key converted from JanusGraph internal representation.
- * @throws org.janusgraph.diskstorage.PermanentBackendException
- */
- @VisibleForTesting
- Map, Delete>> convertToCommands(Map> mutations,
- final long putTimestamp,
- final long delTimestamp) throws PermanentBackendException {
- // A map of rowkey to commands (list of Puts, Delete)
- final Map, Delete>> commandsPerKey = new HashMap<>();
-
- for (Map.Entry> entry : mutations.entrySet()) {
-
- String cfString = getCfNameForStoreName(entry.getKey());
- byte[] cfName = Bytes.toBytes(cfString);
-
- for (Map.Entry m : entry.getValue().entrySet()) {
- final byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY);
- KCVMutation mutation = m.getValue();
-
- Pair, Delete> commands = commandsPerKey.get(m.getKey());
-
- // The firt time we go through the list of input ,
- // create the holder for a particular rowkey
- if (commands == null) {
- commands = new Pair<>();
- // List of all the Puts for this rowkey, including the ones without TTL and with TTL.
- final List putList = new ArrayList<>();
- commands.setFirst(putList);
- commandsPerKey.put(m.getKey(), commands);
- }
-
- if (mutation.hasDeletions()) {
- if (commands.getSecond() == null) {
- Delete d = new Delete(key);
- compat.setTimestamp(d, delTimestamp);
- commands.setSecond(d);
- }
-
- for (StaticBuffer b : mutation.getDeletions()) {
- // commands.getSecond() is a Delete for this rowkey.
- commands.getSecond().addColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp);
- }
- }
-
- if (mutation.hasAdditions()) {
- // All the entries (column cells) with the rowkey use this one Put, except the ones with TTL.
- final Put putColumnsWithoutTtl = new Put(key, putTimestamp);
- // At the end of this loop, there will be one Put entry in the commands.getFirst() list that
- // contains all additions without TTL set, and possible multiple Put entries for columns
- // that have TTL set.
- for (Entry e : mutation.getAdditions()) {
-
- // Deal with TTL within the entry (column cell) first
- // HBase cell level TTL is actually set at the Mutation/Put level.
- // Therefore we need to construct a new Put for each entry (column cell) with TTL.
- // We can not combine them because column cells within the same rowkey may:
- // 1. have no TTL
- // 2. have TTL
- // 3. have different TTL
- final Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
- if (null != ttl && ttl > 0) {
- // Create a new Put
- Put putColumnWithTtl = new Put(key, putTimestamp);
- addColumnToPut(putColumnWithTtl, cfName, putTimestamp, e);
- // Convert ttl from second (JanusGraph TTL) to millisec (HBase TTL)
- // @see JanusGraphManagement#setTTL(JanusGraphSchemaType, Duration)
- // Cast Put to Mutation for backward compatibility with HBase 0.98.x
- // HBase supports cell-level TTL for versions 0.98.6 and above.
- ((Mutation) putColumnWithTtl).setTTL(ttl * 1000);
- // commands.getFirst() is the list of Puts for this rowkey. Add this
- // Put column with TTL to the list.
- commands.getFirst().add(putColumnWithTtl);
- } else {
- addColumnToPut(putColumnsWithoutTtl, cfName, putTimestamp, e);
- }
- }
- // If there were any mutations without TTL set, add them to commands.getFirst()
- if (!putColumnsWithoutTtl.isEmpty()) {
- commands.getFirst().add(putColumnsWithoutTtl);
- }
- }
- }
- }
-
- return commandsPerKey;
- }
-
- private void addColumnToPut(Put p, byte[] cfName, long putTimestamp, Entry e) {
- p.addColumn(cfName, e.getColumnAs(StaticBuffer.ARRAY_FACTORY), putTimestamp,
- e.getValueAs(StaticBuffer.ARRAY_FACTORY));
- }
-
- private String getCfNameForStoreName(String storeName) throws PermanentBackendException {
- return shortCfNames ? shortenCfName(shortCfNameMap, storeName) : storeName;
- }
-
- private void checkConfigDeprecation(org.janusgraph.diskstorage.configuration.Configuration config) {
- if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) {
- logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in JanusGraph's configuration file.",
- ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE));
- }
- }
-
- private AdminMask getAdminInterface() {
- try {
- return cnx.getAdmin();
- } catch (IOException e) {
- throw new JanusGraphException(e);
- }
- }
-
- private String determineTableName(org.janusgraph.diskstorage.configuration.Configuration config) {
- if ((!config.has(HBASE_TABLE)) && (config.has(GRAPH_NAME))) {
- return config.get(GRAPH_NAME);
- }
- return config.get(HBASE_TABLE);
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseTransaction.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseTransaction.java
deleted file mode 100644
index 3b0d271bb..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HBaseTransaction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.janusgraph.diskstorage.BaseTransactionConfig;
-import org.janusgraph.diskstorage.common.AbstractStoreTransaction;
-
-/**
- * This class overrides and adds nothing compared with
- * {@link org.janusgraph.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
- * to HBase, which lets us check for user errors like passing a Cassandra
- * transaction into a HBase method.
- */
-public class HBaseTransaction extends AbstractStoreTransaction {
-
- public HBaseTransaction(final BaseTransactionConfig config) {
- super(config);
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HConnection2_0.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HConnection2_0.java
deleted file mode 100644
index 66b8642dc..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HConnection2_0.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-
-import java.io.IOException;
-import java.util.List;
-
-public class HConnection2_0 implements ConnectionMask
-{
-
- private final Connection cnx;
-
- public HConnection2_0(Connection cnx)
- {
- this.cnx = cnx;
- }
-
- @Override
- public TableMask getTable(String name) throws IOException
- {
- return new HTable2_0(cnx.getTable(TableName.valueOf(name)));
- }
-
- @Override
- public AdminMask getAdmin() throws IOException
- {
- return new HBaseAdmin2_0(cnx.getAdmin());
- }
-
- @Override
- public void close() throws IOException
- {
- cnx.close();
- }
-
- @Override
- public List getRegionLocations(String tableName)
- throws IOException
- {
- return this.cnx.getRegionLocator(TableName.valueOf(tableName)).getAllRegionLocations();
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HTable2_0.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HTable2_0.java
deleted file mode 100644
index 0b4643a4e..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/HTable2_0.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-
-import java.io.IOException;
-import java.util.List;
-
-public class HTable2_0 implements TableMask
-{
- private final Table table;
-
- public HTable2_0(Table table)
- {
- this.table = table;
- }
-
- @Override
- public ResultScanner getScanner(Scan filter) throws IOException
- {
- return table.getScanner(filter);
- }
-
- @Override
- public Result[] get(List gets) throws IOException
- {
- return table.get(gets);
- }
-
- @Override
- public void batch(List writes, Object[] results) throws IOException, InterruptedException
- {
- table.batch(writes, results);
- /* table.flushCommits(); not needed anymore */
- }
-
- @Override
- public void close() throws IOException
- {
- table.close();
- }
-}
diff --git a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/TableMask.java b/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/TableMask.java
deleted file mode 100644
index 0309c39b0..000000000
--- a/graphdb/janus-hbase2/src/main/java/org/janusgraph/diskstorage/hbase2/TableMask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright 2017 JanusGraph Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-/**
- * Copyright DataStax, Inc.
- *
- * Please see the included license file for details.
- */
-package org.janusgraph.diskstorage.hbase2;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Scan;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
- * of development from 0.94 to 1.0 and beyond.
- */
-public interface TableMask extends Closeable
-{
-
- ResultScanner getScanner(Scan filter) throws IOException;
-
- Result[] get(List gets) throws IOException;
-
- void batch(List writes, Object[] results) throws IOException, InterruptedException;
-
-}
diff --git a/graphdb/janus/pom.xml b/graphdb/janus/pom.xml
index 80fe82bfd..1e1501346 100644
--- a/graphdb/janus/pom.xml
+++ b/graphdb/janus/pom.xml
@@ -51,12 +51,6 @@
provided
-
- org.apache.atlas
- atlas-janusgraph-hbase2
- ${project.version}
-
-
org.apache.atlasatlas-testtools
@@ -106,6 +100,10 @@
org.apache.tinkerpopgremlin-driver
+
+ org.noggit
+ noggit
+
@@ -220,6 +218,10 @@
org.codehaus.woodstoxwoodstox-core-asl
+
+ org.apache.zookeeper
+ zookeeper-jute
+
@@ -262,6 +264,12 @@
+
+ org.apache.tinkerpop
+ gremlin-util
+ ${tinkerpop.version}
+
+
org.apache.tinkerpopgremlin-groovy
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 115b681cc..aea2445a8 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -39,6 +39,7 @@ import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.StandardIndexProvider;
import org.janusgraph.diskstorage.StandardStoreManager;
import org.janusgraph.diskstorage.es.ElasticSearch7Index;
+import org.janusgraph.diskstorage.hbase.HBaseStoreManager;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
@@ -85,7 +86,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase customMap = new HashMap<>(StandardStoreManager.getAllManagerClasses());
- customMap.put("hbase2", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
+ customMap.put("hbase2", HBaseStoreManager.class.getName());
ImmutableMap immap = ImmutableMap.copyOf(customMap);
field.set(null, immap);
- LOG.debug("Injected HBase2 support - {}", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
+ LOG.debug("Injected HBase2 support - {}", HBaseStoreManager.class.getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java
index c5b642a71..17c7a842e 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusIndexQuery.java
@@ -45,7 +45,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery> vertices() {
- Iterator> results = query.vertices().iterator();
+ Iterator> results = query.vertexStream().iterator();
Function, Result> function =
new Function, Result>() {
@@ -66,7 +66,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery> results = query
.offset(offset)
.limit(limit)
- .vertices().iterator();
+ .vertexStream().iterator();
Function, Result> function =
new Function, Result>() {
@@ -89,7 +89,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery, Result> function =
new Function, Result>() {
@@ -115,7 +115,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery> edges() {
- Iterator> results = query.edges().iterator();
+ Iterator> results = query.edgeStream().iterator();
Function, Result> function =
new Function, Result>() {
@@ -136,7 +136,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery> results = query
.offset(offset)
.limit(limit)
- .edges().iterator();
+ .edgeStream().iterator();
Function, Result> function =
new Function, Result>() {
@@ -159,7 +159,7 @@ public class AtlasJanusIndexQuery implements AtlasIndexQuery, Result> function =
new Function, Result>() {
diff --git a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
index 51a87f50a..90a2a9fe7 100644
--- a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
+++ b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
@@ -17,1283 +17,49 @@
*/
package org.janusgraph.diskstorage.es;
-import static org.janusgraph.diskstorage.es.ElasticSearchIndex.*;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import org.janusgraph.core.Cardinality;
-import org.janusgraph.core.JanusGraphException;
-import org.janusgraph.core.attribute.Cmp;
-import org.janusgraph.core.attribute.Geo;
-import org.janusgraph.core.attribute.Geoshape;
-import org.janusgraph.core.attribute.Text;
-import org.janusgraph.core.schema.Mapping;
-import org.janusgraph.core.schema.Parameter;
import org.janusgraph.diskstorage.BackendException;
-import org.janusgraph.diskstorage.BaseTransaction;
-import org.janusgraph.diskstorage.BaseTransactionConfig;
-import org.janusgraph.diskstorage.BaseTransactionConfigurable;
-import org.janusgraph.diskstorage.PermanentBackendException;
-import org.janusgraph.diskstorage.TemporaryBackendException;
-import org.janusgraph.diskstorage.configuration.ConfigOption;
import org.janusgraph.diskstorage.configuration.Configuration;
-import org.janusgraph.diskstorage.es.compat.AbstractESCompat;
-import org.janusgraph.diskstorage.es.compat.ESCompatUtils;
-import org.janusgraph.diskstorage.es.mapping.IndexMapping;
-import org.janusgraph.diskstorage.es.script.ESScriptResponse;
-import org.janusgraph.diskstorage.indexing.IndexEntry;
-import org.janusgraph.diskstorage.indexing.IndexFeatures;
-import org.janusgraph.diskstorage.indexing.IndexMutation;
-import org.janusgraph.diskstorage.indexing.IndexProvider;
-import org.janusgraph.diskstorage.indexing.IndexQuery;
-import org.janusgraph.diskstorage.indexing.KeyInformation;
-import org.janusgraph.diskstorage.indexing.RawQuery;
-import org.janusgraph.diskstorage.util.DefaultTransaction;
import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
-import org.janusgraph.graphdb.database.serialize.AttributeUtils;
-import org.janusgraph.graphdb.query.JanusGraphPredicate;
-import org.janusgraph.graphdb.query.condition.And;
-import org.janusgraph.graphdb.query.condition.Condition;
-import org.janusgraph.graphdb.query.condition.Not;
-import org.janusgraph.graphdb.query.condition.Or;
-import org.janusgraph.graphdb.query.condition.PredicateCondition;
-import org.janusgraph.graphdb.types.ParameterType;
-import org.locationtech.spatial4j.shape.Rectangle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_DOC_KEY;
-import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_GEO_COORDS_KEY;
-import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_LANG_KEY;
-import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_SCRIPT_KEY;
-import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_TYPE_KEY;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
-import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME;
+import java.lang.reflect.Field;
/**
- * Do not change
- * This is a copy of ElasticSearchIndex.java from org.janusgraph.diskstorage.es
- * Added a new method to new client instance
+ * NOTE: Class to get access to ElasticSearchIndex.client
*/
@PreInitializeConfigOptions
-public class ElasticSearch7Index implements IndexProvider {
+public class ElasticSearch7Index extends ElasticSearchIndex {
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticSearch7Index.class);
- private static final Logger log = LoggerFactory.getLogger(ElasticSearchIndex.class);
+ private static ElasticSearch7Index INSTANCE;
- // add elasticsearch index instance(Singleton Pattern)
- private static ElasticSearch7Index instance = null;
-
- private static final String STRING_MAPPING_SUFFIX = "__STRING";
- // add if need new instance
- public static final ConfigOption CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST = new ConfigOption(ELASTICSEARCH_NS, "create-client-per-request", "when false, allows the sharing of es client across other components.", org.janusgraph.diskstorage.configuration.ConfigOption.Type.LOCAL, false);
-
- private static final String PARAMETERIZED_DELETION_SCRIPT = parameterizedScriptPrepare("",
- "for (field in params.fields) {",
- " if (field.cardinality == 'SINGLE') {",
- " ctx._source.remove(field.name);",
- " } else if (ctx._source.containsKey(field.name)) {",
- " def fieldIndex = ctx._source[field.name].indexOf(field.value);",
- " if (fieldIndex >= 0 && fieldIndex < ctx._source[field.name].size()) {",
- " ctx._source[field.name].remove(fieldIndex);",
- " }",
- " }",
- "}");
-
- private static final String PARAMETERIZED_ADDITION_SCRIPT = parameterizedScriptPrepare("",
- "for (field in params.fields) {",
- " if (ctx._source[field.name] == null) {",
- " ctx._source[field.name] = [];",
- " }",
- " if (field.cardinality != 'SET' || ctx._source[field.name].indexOf(field.value) == -1) {",
- " ctx._source[field.name].add(field.value);",
- " }",
- "}");
-
- static final String INDEX_NAME_SEPARATOR = "_";
- private static final String SCRIPT_ID_SEPARATOR = "-";
-
- private static final String MAX_OPEN_SCROLL_CONTEXT_PARAMETER = "search.max_open_scroll_context";
- private static final Map MAX_RESULT_WINDOW = ImmutableMap.of("index.max_result_window", Integer.MAX_VALUE);
-
- private static final Parameter[] NULL_PARAMETERS = null;
-
- private static final String TRACK_TOTAL_HITS_PARAMETER = "track_total_hits";
- private static final Parameter[] TRACK_TOTAL_HITS_DISABLED_PARAMETERS = new Parameter[]{new Parameter<>(TRACK_TOTAL_HITS_PARAMETER, false)};
- private static final Map TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY = ImmutableMap.of(TRACK_TOTAL_HITS_PARAMETER, false);
-
- private final Function generateIndexStoreNameFunction = this::generateIndexStoreName;
- private final Map indexStoreNamesCache = new ConcurrentHashMap<>();
- private final boolean indexStoreNameCacheEnabled;
-
- private final AbstractESCompat compat;
private final ElasticSearchClient client;
- private final Configuration configuration;
- private final String indexName;
- private final int batchSize;
- private final boolean useExternalMappings;
- private final boolean allowMappingUpdate;
- private final Map indexSetting;
- private final long createSleep;
- private final boolean useAllField;
- private final Map ingestPipelines;
- private final boolean useMappingForES7;
- private final String parameterizedAdditionScriptId;
- private final String parameterizedDeletionScriptId;
-
- private static boolean createElasticSearchClientPerRequest;
public ElasticSearch7Index(Configuration config) throws BackendException {
- // fetch configuration
- this.configuration = config;
- indexName = config.get(INDEX_NAME);
- parameterizedAdditionScriptId = generateScriptId("add");
- parameterizedDeletionScriptId = generateScriptId("del");
- useAllField = config.get(USE_ALL_FIELD);
- useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS);
- allowMappingUpdate = config.get(ALLOW_MAPPING_UPDATE);
- createSleep = config.get(CREATE_SLEEP);
- ingestPipelines = config.getSubset(ES_INGEST_PIPELINES);
- useMappingForES7 = config.get(USE_MAPPING_FOR_ES7);
- indexStoreNameCacheEnabled = config.get(ENABLE_INDEX_STORE_NAMES_CACHE);
- batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
- log.debug("Configured ES query nb result by query to {}", batchSize);
+ super(config);
- client = createElasticSearchClient();
- createElasticSearchClientPerRequest = config.get(CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST);
+ ElasticSearchClient client = null;
- checkClusterHealth(config.get(HEALTH_REQUEST_TIMEOUT));
+ try {
+ Field fld = ElasticSearchIndex.class.getDeclaredField("client");
- compat = ESCompatUtils.acquireCompatForVersion(client.getMajorVersion());
+ fld.setAccessible(true);
- indexSetting = ElasticSearchSetup.getSettingsFromJanusGraphConf(config);
+ client = (ElasticSearchClient) fld.get(this);
+ } catch (Exception excp) {
+ LOG.warn("Failed to get SolrClient", excp);
+ }
- setupMaxOpenScrollContextsIfNeeded(config);
+ this.client = client;
- setupStoredScripts();
-
- //set instance
- ElasticSearch7Index.instance = this;
+ INSTANCE = this;
}
-
- //get client
public static ElasticSearchClient getElasticSearchClient() {
- ElasticSearchClient ret = null;
- ElasticSearch7Index esIndex = ElasticSearch7Index.instance;
+ ElasticSearch7Index index = INSTANCE;
- if (esIndex != null) {
- if (createElasticSearchClientPerRequest) {
- log.debug("Creating a new ElasticSearch Client.");
-
- ret = esIndex.createElasticSearchClient();
- } else {
- log.debug("Returning the elasticSearch client owned by ElasticSearchIndex.");
-
- ret = esIndex.client;
- }
- } else {
- log.debug("No ElasticSearchIndex available. Will return null");
- }
-
- return ret;
- }
-
- //release client
- public static void releaseElasticSearchClient(ElasticSearchClient elasticSearchClient) {
- if(createElasticSearchClientPerRequest) {
- if (elasticSearchClient != null) {
- try {
- elasticSearchClient.close();
-
- if(log.isDebugEnabled()) {
- log.debug("Closed the elasticSearch client successfully.");
- }
- } catch (IOException excp) {
- log.warn("Failed to close elasticSearchClient.", excp);
- }
- }
- } else {
- if(log.isDebugEnabled()) {
- log.debug("Ignoring the closing of elasticSearch client as it is owned by ElasticSearchIndex.");
- }
- }
- }
-
- //create client
- private ElasticSearchClient createElasticSearchClient() {
- return interfaceConfiguration(configuration).getClient();
- }
-
-
-
- private void checkClusterHealth(String healthCheck) throws BackendException {
- try {
- client.clusterHealthRequest(healthCheck);
- } catch (final IOException e) {
- throw new PermanentBackendException(e.getMessage(), e);
- }
- }
-
- private void setupStoredScripts() throws PermanentBackendException {
- setupStoredScriptIfNeeded(parameterizedAdditionScriptId, PARAMETERIZED_ADDITION_SCRIPT);
- setupStoredScriptIfNeeded(parameterizedDeletionScriptId, PARAMETERIZED_DELETION_SCRIPT);
- }
-
- private void setupStoredScriptIfNeeded(String storedScriptId, String source) throws PermanentBackendException {
-
- ImmutableMap preparedScript = compat.prepareScript(source).build();
-
- String lang = (String) ((ImmutableMap) preparedScript.get(ES_SCRIPT_KEY)).get(ES_LANG_KEY);
-
- try {
- ESScriptResponse esScriptResponse = client.getStoredScript(storedScriptId);
-
- if(Boolean.FALSE.equals(esScriptResponse.getFound()) || !Objects.equals(lang, esScriptResponse.getScript().getLang()) ||
- !Objects.equals(source, esScriptResponse.getScript().getSource())){
- client.createStoredScript(storedScriptId, preparedScript);
- }
-
- } catch (final IOException e) {
- throw new PermanentBackendException(e.getMessage(), e);
- }
- }
-
- private void setupMaxOpenScrollContextsIfNeeded(Configuration config) throws PermanentBackendException {
-
- if(client.getMajorVersion().getValue() > 6){
-
- boolean setupMaxOpenScrollContexts;
-
- if(config.has(SETUP_MAX_OPEN_SCROLL_CONTEXTS)){
- setupMaxOpenScrollContexts = config.get(SETUP_MAX_OPEN_SCROLL_CONTEXTS);
- } else {
- setupMaxOpenScrollContexts = SETUP_MAX_OPEN_SCROLL_CONTEXTS.getDefaultValue();
- }
-
- if(setupMaxOpenScrollContexts){
-
- Map settings = ImmutableMap.of("persistent",
- ImmutableMap.of(MAX_OPEN_SCROLL_CONTEXT_PARAMETER, Integer.MAX_VALUE));
-
- try {
- client.updateClusterSettings(settings);
- } catch (final IOException e) {
- throw new PermanentBackendException(e.getMessage(), e);
- }
- }
- }
- }
-
- /**
- * If ES already contains this instance's target index, then do nothing.
- * Otherwise, create the index, then wait .
- *
- * The {@code client} field must point to a live, connected client.
- * The {@code indexName} field must be non-null and point to the name
- * of the index to check for existence or create.
- *
- * @param index index name
- * @throws IOException if the index status could not be checked or index could not be created
- */
- private void checkForOrCreateIndex(String index) throws IOException {
- Objects.requireNonNull(client);
- Objects.requireNonNull(index);
-
- // Create index if it does not useExternalMappings and if it does not already exist
- if (!useExternalMappings && !client.indexExists(index)) {
- client.createIndex(index, indexSetting);
- client.updateIndexSettings(index, MAX_RESULT_WINDOW);
- try {
- log.debug("Sleeping {} ms after {} index creation returned from actionGet()", createSleep, index);
- Thread.sleep(createSleep);
- } catch (final InterruptedException e) {
- throw new JanusGraphException("Interrupted while waiting for index to settle in", e);
- }
- }
- Preconditions.checkState(client.indexExists(index), "Could not create index: %s",index);
- client.addAlias(indexName, index);
- }
-
-
- /**
- * Configure ElasticSearchIndex's ES client. See{@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more
- * information.
- *
- * @param config a config passed to ElasticSearchIndex's constructor
- * @return a client object open and ready for use
- */
- private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) {
- final ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class);
-
- try {
- return clientMode.connect(config);
- } catch (final IOException e) {
- throw new JanusGraphException(e);
- }
- }
-
- private BackendException convert(Exception esException) {
- if (esException instanceof InterruptedException) {
- return new TemporaryBackendException("Interrupted while waiting for response", esException);
- } else {
- return new PermanentBackendException("Unknown exception while executing index operation", esException);
- }
- }
-
- private static String getDualMappingName(String key) {
- return key + STRING_MAPPING_SUFFIX;
- }
-
- private String generateScriptId(String uniqueScriptSuffix){
- return indexName + SCRIPT_ID_SEPARATOR + uniqueScriptSuffix;
- }
-
- private String generateIndexStoreName(String store){
- return indexName + INDEX_NAME_SEPARATOR + store.toLowerCase();
- }
-
- private String getIndexStoreName(String store) {
-
- if(indexStoreNameCacheEnabled){
- return indexStoreNamesCache.computeIfAbsent(store, generateIndexStoreNameFunction);
- }
-
- return generateIndexStoreName(store);
- }
-
- @Override
- public void register(String store, String key, KeyInformation information,
- BaseTransaction tx) throws BackendException {
- final Class> dataType = information.getDataType();
- final Mapping map = Mapping.getMapping(information);
- Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtils.isString(dataType) ||
- (map==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType)),
- "Specified illegal mapping [%s] for data type [%s]",map,dataType);
- final String indexStoreName = getIndexStoreName(store);
- if (useExternalMappings) {
- try {
- //We check if the externalMapping have the property 'key'
- final IndexMapping mappings = client.getMapping(indexStoreName, store);
- if (mappings == null || (!mappings.isDynamic() && !mappings.getProperties().containsKey(key))) {
- //Error if it is not dynamic and have not the property 'key'
- throw new PermanentBackendException("The external mapping for index '"+ indexStoreName + "' and type '" + store + "' do not have property '" + key + "'");
- } else if (allowMappingUpdate && mappings.isDynamic()) {
- //If it is dynamic, we push the unknown property 'key'
- this.pushMapping(store, key, information);
- }
- } catch (final IOException e) {
- throw new PermanentBackendException(e);
- }
- } else {
- try {
- checkForOrCreateIndex(indexStoreName);
- } catch (final IOException e) {
- throw new PermanentBackendException(e);
- }
- this.pushMapping(store, key, information);
- }
- }
-
- /**
- * Push mapping to ElasticSearch
- * @param store the type in the index
- * @param key the name of the property in the index
- * @param information information of the key
- */
- private void pushMapping(String store, String key,
- KeyInformation information) throws AssertionError, BackendException {
- final Class> dataType = information.getDataType();
- Mapping map = Mapping.getMapping(information);
- final Map properties = new HashMap<>();
- if (AttributeUtils.isString(dataType)) {
- if (map==Mapping.DEFAULT) map=Mapping.TEXT;
- log.debug("Registering string type for {} with mapping {}", key, map);
- final String stringAnalyzer
- = ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null);
- final String textAnalyzer = ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null);
- // use keyword type for string mappings unless custom string analyzer is provided
- final Map stringMapping
- = stringAnalyzer == null ? compat.createKeywordMapping() : compat.createTextMapping(stringAnalyzer);
- switch (map) {
- case STRING:
- properties.put(key, stringMapping);
- break;
- case TEXT:
- properties.put(key, compat.createTextMapping(textAnalyzer));
- break;
- case TEXTSTRING:
- properties.put(key, compat.createTextMapping(textAnalyzer));
- properties.put(getDualMappingName(key), stringMapping);
- break;
- default: throw new AssertionError("Unexpected mapping: "+map);
- }
- } else if (dataType == Float.class) {
- log.debug("Registering float type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "float"));
- } else if (dataType == Double.class) {
- log.debug("Registering double type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "double"));
- } else if (dataType == Byte.class) {
- log.debug("Registering byte type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "byte"));
- } else if (dataType == Short.class) {
- log.debug("Registering short type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "short"));
- } else if (dataType == Integer.class) {
- log.debug("Registering integer type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "integer"));
- } else if (dataType == Long.class) {
- log.debug("Registering long type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "long"));
- } else if (dataType == Boolean.class) {
- log.debug("Registering boolean type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "boolean"));
- } else if (dataType == Geoshape.class) {
- switch (map) {
- case PREFIX_TREE:
- final int maxLevels = ParameterType.INDEX_GEO_MAX_LEVELS.findParameter(information.getParameters(),
- DEFAULT_GEO_MAX_LEVELS);
- final double distErrorPct
- = ParameterType.INDEX_GEO_DIST_ERROR_PCT.findParameter(information.getParameters(),
- DEFAULT_GEO_DIST_ERROR_PCT);
- log.debug("Registering geo_shape type for {} with tree_levels={} and distance_error_pct={}", key,
- maxLevels, distErrorPct);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_shape",
- "tree", "quadtree",
- "tree_levels", maxLevels,
- "distance_error_pct", distErrorPct));
- break;
- default:
- log.debug("Registering geo_point type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_point"));
- }
- } else if (dataType == Date.class || dataType == Instant.class) {
- log.debug("Registering date type for {}", key);
- properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "date"));
- } else if (dataType == UUID.class) {
- log.debug("Registering uuid type for {}", key);
- properties.put(key, compat.createKeywordMapping());
- }
-
- if (useAllField) {
- // add custom all field mapping if it doesn't exist
- properties.put(ElasticSearchConstants.CUSTOM_ALL_FIELD, compat.createTextMapping(null));
-
- // add copy_to for custom all field mapping
- if (properties.containsKey(key) && dataType != Geoshape.class) {
- final Map mapping = new HashMap<>(((Map) properties.get(key)));
- mapping.put("copy_to", ElasticSearchConstants.CUSTOM_ALL_FIELD);
- properties.put(key, mapping);
- }
- }
-
- final List customParameters = ParameterType.getCustomParameters(information.getParameters());
-
- if (properties.containsKey(key) && !customParameters.isEmpty()) {
- final Map mapping = new HashMap<>(((Map) properties.get(key)));
- customParameters.forEach(p -> mapping.put(p.key(), p.value()));
- properties.put(key, mapping);
- }
-
- final Map mapping = ImmutableMap.of("properties", properties);
-
- try {
- client.createMapping(getIndexStoreName(store), store, mapping);
- } catch (final Exception e) {
- throw convert(e);
- }
- }
-
- private static Mapping getStringMapping(KeyInformation information) {
- assert AttributeUtils.isString(information.getDataType());
- Mapping map = Mapping.getMapping(information);
- if (map==Mapping.DEFAULT) map = Mapping.TEXT;
- return map;
- }
-
- private static boolean hasDualStringMapping(KeyInformation information) {
- return AttributeUtils.isString(information.getDataType()) && getStringMapping(information)==Mapping.TEXTSTRING;
- }
-
- public Map getNewDocument(final List additions,
- KeyInformation.StoreRetriever information) throws BackendException {
- // JSON writes duplicate fields one after another, which forces us
- // at this stage to make de-duplication on the IndexEntry list. We don't want to pay the
- // price map storage on the Mutation level because none of other backends need that.
-
- final Multimap unique = LinkedListMultimap.create();
- for (final IndexEntry e : additions) {
- unique.put(e.field, e);
- }
-
- final Map doc = new HashMap<>();
- for (final Map.Entry> add : unique.asMap().entrySet()) {
- final KeyInformation keyInformation = information.get(add.getKey());
- final Object value;
- switch (keyInformation.getCardinality()) {
- case SINGLE:
- value = convertToEsType(Iterators.getLast(add.getValue().iterator()).value,
- Mapping.getMapping(keyInformation));
- break;
- case SET:
- case LIST:
- value = add.getValue().stream()
- .map(v -> convertToEsType(v.value, Mapping.getMapping(keyInformation)))
- .filter(v -> {
- Preconditions.checkArgument(!(v instanceof byte[]),
- "Collections not supported for %s", add.getKey());
- return true;
- }).toArray();
- break;
- default:
- value = null;
- break;
- }
-
- doc.put(add.getKey(), value);
- if (hasDualStringMapping(information.get(add.getKey())) && keyInformation.getDataType() == String.class) {
- doc.put(getDualMappingName(add.getKey()), value);
- }
-
-
- }
-
- return doc;
- }
-
- private static Object convertToEsType(Object value, Mapping mapping) {
- if (value instanceof Number) {
- if (AttributeUtils.isWholeNumber((Number) value)) {
- return ((Number) value).longValue();
- } else { //double or float
- return ((Number) value).doubleValue();
- }
- } else if (AttributeUtils.isString(value)) {
- return value;
- } else if (value instanceof Geoshape) {
- return convertGeoshape((Geoshape) value, mapping);
- } else if (value instanceof Date) {
- return value;
- } else if (value instanceof Instant) {
- return Date.from((Instant) value);
- } else if (value instanceof Boolean) {
- return value;
- } else if (value instanceof UUID) {
- return value.toString();
- } else throw new IllegalArgumentException("Unsupported type: " + value.getClass() + " (value: " + value + ")");
- }
-
- @SuppressWarnings("unchecked")
- private static Object convertGeoshape(Geoshape geoshape, Mapping mapping) {
- if (geoshape.getType() == Geoshape.Type.POINT && Mapping.PREFIX_TREE != mapping) {
- final Geoshape.Point p = geoshape.getPoint();
- return new double[]{p.getLongitude(), p.getLatitude()};
- } else if (geoshape.getType() == Geoshape.Type.BOX) {
- final Rectangle box = geoshape.getShape().getBoundingBox();
- final Map map = new HashMap<>();
- map.put("type", "envelope");
- map.put("coordinates", new double[][] {{box.getMinX(),box.getMaxY()},{box.getMaxX(),box.getMinY()}});
- return map;
- } else if (geoshape.getType() == Geoshape.Type.CIRCLE) {
- try {
- final Map map = geoshape.toMap();
- map.put("radius", map.get("radius") + ((Map) map.remove("properties")).get("radius_units"));
- return map;
- } catch (final IOException e) {
- throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
- }
- } else {
- try {
- return geoshape.toMap();
- } catch (final IOException e) {
- throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
- }
- }
- }
-
- @Override
- public void mutate(Map> mutations, KeyInformation.IndexRetriever information,
- BaseTransaction tx) throws BackendException {
- final List requests = new ArrayList<>();
- try {
- for (final Map.Entry> stores : mutations.entrySet()) {
- final List requestByStore = new ArrayList<>();
- final String storeName = stores.getKey();
- final String indexStoreName = getIndexStoreName(storeName);
- for (final Map.Entry entry : stores.getValue().entrySet()) {
- final String documentId = entry.getKey();
- final IndexMutation mutation = entry.getValue();
- assert mutation.isConsolidated();
- Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
- Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
- Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
- //Deletions first
- if (mutation.hasDeletions()) {
- if (mutation.isDeleted()) {
- log.trace("Deleting entire document {}", documentId);
- requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, storeName,
- documentId));
- } else {
- List