Support alter load description for routine load (#3018)

1. RoutineLoadDesc is not easy to serialize because too many objects are involved. Referring to creating routine load, we persist the OriginStatement of the AlterRoutineLoadStmt into the bdb log and the checkpoint image.
2. Fix a modifyProperties bug: the property `strip_outer_array` can not be modified because the key in the analyzedJobProperties was incorrectly set to the value of strip_outer_array.
3. Add some UT for RoutineLoadJob.modifyJob().

Use the following command to alter routine load description: 
``` SQL
Alter routine load for job_name
[column_separator],
[columns_mapping],
[where_predicates],
[partitions]
```
`column_separator` specifies the column separator, for example:
``` SQL
COLUMNS TERMINATED BY ","
```
`columns_mapping` specifies the columns mapping relation, for example:
``` SQL
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2)
```
`where_predicates` specifies the filter conditions, for example:
```SQL
WHERE k1 > 10
```
`partitions` specifies the partitions to be imported, for example:
```SQL
PARTITION (p1, p2)
```
This commit is contained in:
gengjun-git 2022-01-26 21:33:47 +08:00 committed by GitHub
parent bc6a155d9f
commit 7798135a1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 274 additions and 128 deletions

View File

@ -772,10 +772,12 @@ alter_stmt ::=
{:
RESULT = new AlterUserStmt(user);
:}
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
opt_load_property_list:loadPropertyList
opt_properties:jobProperties
opt_datasource_properties:datasourceProperties
{:
RESULT = new AlterRoutineLoadStmt(jobLabel, jobProperties, datasourceProperties);
RESULT = new AlterRoutineLoadStmt(jobLabel, loadPropertyList, jobProperties, datasourceProperties);
:}
;

View File

@ -28,7 +28,9 @@ import com.starrocks.common.FeNameFormat;
import com.starrocks.common.UserException;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.load.RoutineLoadDesc;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -59,6 +61,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.build();
private final LabelName labelName;
private final List<ParseNode> loadPropertyList;
private RoutineLoadDesc routineLoadDesc;
private final Map<String, String> jobProperties;
private final RoutineLoadDataSourceProperties dataSourceProperties;
@ -66,9 +70,11 @@ public class AlterRoutineLoadStmt extends DdlStmt {
// analyzed data source properties are saved in dataSourceProperties.
private Map<String, String> analyzedJobProperties = Maps.newHashMap();
public AlterRoutineLoadStmt(LabelName labelName, Map<String, String> jobProperties,
public AlterRoutineLoadStmt(LabelName labelName, List<ParseNode> loadPropertyList,
Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
this.labelName = labelName;
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap();
this.dataSourceProperties = dataSourceProperties;
}
@ -93,18 +99,27 @@ public class AlterRoutineLoadStmt extends DdlStmt {
return dataSourceProperties;
}
public RoutineLoadDesc getRoutineLoadDesc() {
return routineLoadDesc;
}
public List<ParseNode> getLoadPropertyList() {
return loadPropertyList;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
FeNameFormat.checkCommonName(NAME_TYPE, labelName.getLabelName());
routineLoadDesc = CreateRoutineLoadStmt.buildLoadDesc(loadPropertyList);
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check data source properties
checkDataSourceProperties();
if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) {
if (routineLoadDesc == null && analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) {
throw new AnalysisException("No properties are specified");
}
}
@ -174,8 +189,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
if (jobProperties.containsKey(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)) {
boolean stripOuterArray = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY));
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY),
String.valueOf(stripOuterArray));
analyzedJobProperties.put(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY, String.valueOf(stripOuterArray));
}
}

View File

@ -266,6 +266,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return customKafkaProperties;
}
public List<ParseNode> getLoadPropertyList() {
return loadPropertyList;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@ -274,7 +278,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
// check name
FeNameFormat.checkCommonName(NAME_TYPE, name);
// check load properties include column separator etc.
checkLoadProperties();
routineLoadDesc = buildLoadDesc(loadPropertyList);
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check data source properties
@ -290,9 +294,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
}
public void checkLoadProperties() throws UserException {
public static RoutineLoadDesc buildLoadDesc(List<ParseNode> loadPropertyList) throws UserException {
if (loadPropertyList == null) {
return;
return null;
}
ColumnSeparator columnSeparator = null;
RowDelimiter rowDelimiter = null;
@ -335,7 +339,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
partitionNames.analyze(null);
}
}
routineLoadDesc = new RoutineLoadDesc(columnSeparator, rowDelimiter, importColumnsStmt, importWhereStmt,
return new RoutineLoadDesc(columnSeparator, rowDelimiter, importColumnsStmt, importWhereStmt,
partitionNames);
}

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.starrocks.analysis.AlterRoutineLoadStmt;
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.RoutineLoadDataSourceProperties;
import com.starrocks.catalog.Catalog;
@ -52,7 +51,6 @@ import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SmallFileMgr;
import com.starrocks.common.util.SmallFileMgr.SmallFile;
import com.starrocks.persist.AlterRoutineLoadJobOperationLog;
import com.starrocks.system.SystemInfoService;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
@ -517,35 +515,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException {
Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
RoutineLoadDataSourceProperties dataSourceProperties = stmt.getDataSourceProperties();
writeLock();
try {
if (getState() != JobState.PAUSED) {
throw new DdlException("Only supports modification of PAUSED jobs");
}
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
Catalog.getCurrentCatalog().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
}
}
@Override
public void updateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
super.updateState(jobState, reason, isReplay);
}
private void modifyPropertiesInternal(Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties)
throws DdlException {
public void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException {
List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
Map<String, String> customKafkaProperties = Maps.newHashMap();
@ -560,28 +530,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}
if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
}
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
}
@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(), log.getDataSourceProperties());
} catch (DdlException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}", id, e);
}
LOG.info("modify the data source properties of kafka routine load job: {}, datasource properties: {}",
this.id, dataSourceProperties);
}
}

View File

@ -37,9 +37,11 @@ import com.starrocks.analysis.ImportColumnDesc;
import com.starrocks.analysis.ImportColumnsStmt;
import com.starrocks.analysis.LoadStmt;
import com.starrocks.analysis.PartitionNames;
import com.starrocks.analysis.RoutineLoadDataSourceProperties;
import com.starrocks.analysis.RowDelimiter;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StatementBase;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
@ -84,6 +86,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -270,7 +273,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
if (stmt.getRoutineLoadDesc() != null) {
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
}
if (stmt.getDesiredConcurrentNum() != -1) {
this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum();
}
@ -314,29 +319,28 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
if (routineLoadDesc != null) {
if (routineLoadDesc.getColumnsInfo() != null) {
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
columnDescs = Lists.newArrayList();
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
columnDescs.add(columnDesc);
}
}
}
if (routineLoadDesc.getWherePredicate() != null) {
whereExpr = routineLoadDesc.getWherePredicate().getExpr();
}
if (routineLoadDesc.getColumnSeparator() != null) {
columnSeparator = routineLoadDesc.getColumnSeparator();
}
if (routineLoadDesc.getRowDelimiter() != null) {
rowDelimiter = routineLoadDesc.getRowDelimiter();
}
if (routineLoadDesc.getPartitionNames() != null) {
partitions = routineLoadDesc.getPartitionNames();
if (routineLoadDesc == null) {
return;
}
if (routineLoadDesc.getColumnsInfo() != null) {
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
columnDescs = Lists.newArrayList();
columnDescs.addAll(columnsStmt.getColumns());
}
}
if (routineLoadDesc.getWherePredicate() != null) {
whereExpr = routineLoadDesc.getWherePredicate().getExpr();
}
if (routineLoadDesc.getColumnSeparator() != null) {
columnSeparator = routineLoadDesc.getColumnSeparator();
}
if (routineLoadDesc.getRowDelimiter() != null) {
rowDelimiter = routineLoadDesc.getRowDelimiter();
}
if (routineLoadDesc.getPartitionNames() != null) {
partitions = routineLoadDesc.getPartitionNames();
}
}
@Override
@ -1307,6 +1311,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return gson.toJson(result);
}
public Map<String, String> getSessionVariables() {
return sessionVariables;
}
private String jobPropertiesToJsonString() {
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put("partitions",
@ -1484,35 +1492,70 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
CreateRoutineLoadStmt stmt = null;
try {
stmt = (CreateRoutineLoadStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
stmt.checkLoadProperties();
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
StatementBase stmt = SqlParserUtils.getStmt(parser, origStmt.idx);
if (stmt instanceof CreateRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((CreateRoutineLoadStmt) stmt).getLoadPropertyList()));
} else if (stmt instanceof AlterRoutineLoadStmt) {
setRoutineLoadDesc(CreateRoutineLoadStmt.
buildLoadDesc(((AlterRoutineLoadStmt) stmt).getLoadPropertyList()));
}
} catch (Exception e) {
throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e);
}
}
public abstract void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException;
public abstract void replayModifyProperties(AlterRoutineLoadJobOperationLog log);
// for ALTER ROUTINE LOAD
protected void modifyCommonJobProperties(Map<String, String> jobProperties) {
if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) {
this.desireTaskConcurrentNum = Integer.parseInt(
jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
this.maxErrorNum = Long.parseLong(
jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) {
this.taskSchedIntervalS = Long.parseLong(
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
public void modifyJob(RoutineLoadDesc routineLoadDesc,
Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties,
OriginStatement originStatement,
boolean isReplay) throws DdlException {
writeLock();
try {
if (routineLoadDesc != null) {
setRoutineLoadDesc(routineLoadDesc);
}
if (jobProperties != null) {
modifyCommonJobProperties(jobProperties);
}
if (dataSourceProperties != null) {
modifyDataSourceProperties(dataSourceProperties);
}
origStmt = originStatement;
if (!isReplay) {
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(id,
jobProperties, dataSourceProperties, originStatement);
Catalog.getCurrentCatalog().getEditLog().logAlterRoutineLoadJob(log);
}
} finally {
writeUnlock();
}
}
protected abstract void modifyDataSourceProperties(RoutineLoadDataSourceProperties dataSourceProperties) throws DdlException;
// for ALTER ROUTINE LOAD
private void modifyCommonJobProperties(Map<String, String> jobProperties) {
// Some properties will be remove from the map, so we copy the jobProperties to copiedJobProperties
Map<String, String> copiedJobProperties = new HashMap<>(jobProperties);
if (copiedJobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) {
this.desireTaskConcurrentNum = Integer.parseInt(
copiedJobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
}
if (copiedJobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
this.maxErrorNum = Long.parseLong(
copiedJobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
}
if (copiedJobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) {
this.taskSchedIntervalS = Long.parseLong(
copiedJobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
}
if (copiedJobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) {
this.maxBatchRows = Long.parseLong(
copiedJobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
}
this.jobProperties.putAll(copiedJobProperties);
}
}

View File

@ -29,6 +29,8 @@ import com.starrocks.analysis.AlterRoutineLoadStmt;
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.PauseRoutineLoadStmt;
import com.starrocks.analysis.ResumeRoutineLoadStmt;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.analysis.StopRoutineLoadStmt;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
@ -43,10 +45,14 @@ import com.starrocks.common.UserException;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.SqlParserUtils;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.mysql.privilege.PrivPredicate;
import com.starrocks.persist.AlterRoutineLoadJobOperationLog;
import com.starrocks.persist.RoutineLoadOperation;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.SqlModeHelper;
import com.starrocks.sql.optimizer.statistics.IDictManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -54,6 +60,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -546,17 +553,45 @@ public class RoutineLoadManager implements Writable {
*/
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException {
RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
if (job.getState() != RoutineLoadJob.JobState.PAUSED) {
throw new DdlException("Only supports modification of PAUSED jobs");
}
if (stmt.hasDataSourceProperty()
&& !stmt.getDataSourceProperties().getType().equalsIgnoreCase(job.dataSourceType.name())) {
throw new DdlException("The specified job type is not: " + stmt.getDataSourceProperties().getType());
}
job.modifyProperties(stmt);
job.modifyJob(stmt.getRoutineLoadDesc(), stmt.getAnalyzedJobProperties(),
stmt.getDataSourceProperties(), stmt.getOrigStmt(), false);
}
public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) {
public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) throws UserException, IOException {
RoutineLoadJob job = getJob(log.getJobId());
Preconditions.checkNotNull(job, log.getJobId());
job.replayModifyProperties(log);
// NOTE: we use the origin statement to get the RoutineLoadDesc
RoutineLoadDesc routineLoadDesc = null;
if (log.getOriginStatement() != null) {
long sqlMode;
if (job.getSessionVariables() != null && job.getSessionVariables().containsKey(SessionVariable.SQL_MODE)) {
sqlMode = Long.parseLong(job.getSessionVariables().get(SessionVariable.SQL_MODE));
} else {
sqlMode = SqlModeHelper.MODE_DEFAULT;
}
try {
SqlParser parser = new SqlParser(
new SqlScanner(new StringReader(log.getOriginStatement().originStmt), sqlMode));
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) SqlParserUtils.getStmt(
parser, log.getOriginStatement().idx);
if (stmt.getLoadPropertyList() != null) {
routineLoadDesc = CreateRoutineLoadStmt.buildLoadDesc(stmt.getLoadPropertyList());
}
} catch (Exception e) {
throw new IOException("error happens when parsing alter routine load stmt: "
+ log.getOriginStatement().originStmt, e);
}
}
job.modifyJob(routineLoadDesc, log.getJobProperties(),
log.getDataSourceProperties(), log.getOriginStatement(), true);
}
@Override

View File

@ -26,6 +26,7 @@ import com.starrocks.analysis.RoutineLoadDataSourceProperties;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.OriginStatement;
import java.io.DataInput;
import java.io.DataOutput;
@ -40,12 +41,16 @@ public class AlterRoutineLoadJobOperationLog implements Writable {
private Map<String, String> jobProperties;
@SerializedName(value = "dataSourceProperties")
private RoutineLoadDataSourceProperties dataSourceProperties;
@SerializedName(value = "originStatement")
private OriginStatement originStatement;
public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
RoutineLoadDataSourceProperties dataSourceProperties,
OriginStatement originStatement) {
this.jobId = jobId;
this.jobProperties = jobProperties;
this.dataSourceProperties = dataSourceProperties;
this.originStatement = originStatement;
}
public long getJobId() {
@ -60,6 +65,10 @@ public class AlterRoutineLoadJobOperationLog implements Writable {
return dataSourceProperties;
}
public OriginStatement getOriginStatement() {
return originStatement;
}
public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class);

View File

@ -83,7 +83,7 @@ public class AlterRoutineLoadStmtTest {
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
null, jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
} catch (UserException e) {
@ -105,7 +105,7 @@ public class AlterRoutineLoadStmtTest {
@Test(expected = AnalysisException.class)
public void testNoPproperties() throws AnalysisException, UserException {
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
Maps.newHashMap(), new RoutineLoadDataSourceProperties());
stmt.analyze(analyzer);
}
@ -115,7 +115,7 @@ public class AlterRoutineLoadStmtTest {
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.FORMAT, "csv");
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
jobProperties, new RoutineLoadDataSourceProperties());
try {
stmt.analyze(analyzer);
@ -135,7 +135,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
jobProperties, routineLoadDataSourceProperties);
try {
@ -156,7 +156,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
@ -177,7 +177,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
@ -197,7 +197,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), null,
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);

View File

@ -21,12 +21,14 @@
package com.starrocks.load.routineload;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.CreateRoutineLoadStmt;
import com.starrocks.analysis.AlterRoutineLoadStmt;
import com.starrocks.analysis.SqlParser;
import com.starrocks.analysis.SqlScanner;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
@ -36,10 +38,12 @@ import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.util.KafkaUtil;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.RoutineLoadOperation;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.OriginStatement;
import com.starrocks.thrift.TKafkaRLTaskProgress;
import com.starrocks.transaction.TransactionException;
import com.starrocks.transaction.TransactionState;
import java_cup.runtime.Symbol;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
@ -49,19 +53,10 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class RoutineLoadJobTest {
@Mocked
EditLog editLog;
@Mocked
SqlParser sqlParser;
@Mocked
CreateRoutineLoadStmt createRoutineLoadStmt;
@Mocked
Symbol symbol;
@Test
public void testAfterAbortedReasonOffsetOutOfRange(@Mocked Catalog catalog,
@Injectable TransactionState transactionState,
@ -298,4 +293,94 @@ public class RoutineLoadJobTest {
Assert.assertEquals(new Long(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows"));
}
@Test
public void testModifyJobProperties() throws Exception {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
ConnectContext connectContext = UtFrameUtils.createDefaultCtx();
// alter job properties
String desiredConcurrentNumber = "3";
String maxBatchInterval = "60";
String maxErrorNumber = "10000";
String maxBatchRows = "200000";
String strictMode = "true";
String timeZone = "UTC";
String jsonPaths = "[\\\"$.category\\\",\\\"$.author\\\",\\\"$.price\\\",\\\"$.timestamp\\\"]";
String stripOuterArray = "true";
String jsonRoot = "$.RECORDS";
String originStmt = "alter routine load for db.job1 " +
"properties (" +
" \"desired_concurrent_number\" = \"" + desiredConcurrentNumber + "\"," +
" \"max_batch_interval\" = \"" + maxBatchInterval + "\"," +
" \"max_error_number\" = \"" + maxErrorNumber + "\"," +
" \"max_batch_rows\" = \"" + maxBatchRows + "\"," +
" \"strict_mode\" = \"" + strictMode + "\"," +
" \"timezone\" = \"" + timeZone + "\"," +
" \"jsonpaths\" = \"" + jsonPaths + "\"," +
" \"strip_outer_array\" = \"" + stripOuterArray + "\"," +
" \"json_root\" = \"" + jsonRoot + "\"" +
")";
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) UtFrameUtils.parseAndAnalyzeStmt(originStmt, connectContext);
routineLoadJob.modifyJob(stmt.getRoutineLoadDesc(), stmt.getAnalyzedJobProperties(),
stmt.getDataSourceProperties(), new OriginStatement(originStmt, 0), true);
Assert.assertEquals(Integer.parseInt(desiredConcurrentNumber),
(int) Deencapsulation.getField(routineLoadJob, "desireTaskConcurrentNum"));
Assert.assertEquals(Long.parseLong(maxBatchInterval),
(long) Deencapsulation.getField(routineLoadJob, "taskSchedIntervalS"));
Assert.assertEquals(Long.parseLong(maxErrorNumber),
(long) Deencapsulation.getField(routineLoadJob, "maxErrorNum"));
Assert.assertEquals(Long.parseLong(maxBatchRows),
(long) Deencapsulation.getField(routineLoadJob, "maxBatchRows"));
Assert.assertEquals(Boolean.parseBoolean(strictMode), routineLoadJob.isStrictMode());
Assert.assertEquals(timeZone, routineLoadJob.getTimezone());
Assert.assertEquals(jsonPaths.replace("\\", ""), routineLoadJob.getJsonPaths());
Assert.assertEquals(Boolean.parseBoolean(stripOuterArray), routineLoadJob.isStripOuterArray());
Assert.assertEquals(jsonRoot, routineLoadJob.getJsonRoot());
}
@Test
public void testModifyDataSourceProperties() throws Exception {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
ConnectContext connectContext = UtFrameUtils.createDefaultCtx();
//alter data source custom properties
String groupId = "group1";
String clientId = "client1";
String defaultOffsets = "OFFSET_BEGINNING";
String originStmt = "alter routine load for db.job1 " +
"FROM KAFKA (" +
" \"property.group.id\" = \"" + groupId + "\"," +
" \"property.client.id\" = \"" + clientId + "\"," +
" \"property.kafka_default_offsets\" = \"" + defaultOffsets + "\"" +
")";
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) UtFrameUtils.parseAndAnalyzeStmt(originStmt, connectContext);
routineLoadJob.modifyJob(stmt.getRoutineLoadDesc(), stmt.getAnalyzedJobProperties(),
stmt.getDataSourceProperties(), new OriginStatement(originStmt, 0), true);
routineLoadJob.convertCustomProperties(true);
Map<String, String> properties = routineLoadJob.getConvertedCustomProperties();
Assert.assertEquals(groupId, properties.get("group.id"));
Assert.assertEquals(clientId, properties.get("client.id"));
Assert.assertEquals(-2L,
(long) Deencapsulation.getField(routineLoadJob, "kafkaDefaultOffSet"));
}
@Test
public void testModifyLoadDesc() throws Exception {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
ConnectContext connectContext = UtFrameUtils.createDefaultCtx();
//alter load desc
String originStmt = "alter routine load for db.job1 " +
"COLUMNS (a, b, c, d=a), " +
"WHERE a = 1," +
"COLUMNS TERMINATED BY \",\"," +
"PARTITION(p1, p2, p3)," +
"ROWS TERMINATED BY \"A\"";
AlterRoutineLoadStmt stmt = (AlterRoutineLoadStmt) UtFrameUtils.parseAndAnalyzeStmt(originStmt, connectContext);
routineLoadJob.modifyJob(stmt.getRoutineLoadDesc(), stmt.getAnalyzedJobProperties(),
stmt.getDataSourceProperties(), new OriginStatement(originStmt, 0), true);
Assert.assertEquals("a,b,c,d=`a`", Joiner.on(",").join(routineLoadJob.getColumnDescs()));
Assert.assertEquals("`a` = 1", routineLoadJob.getWhereExpr().toSql());
Assert.assertEquals("','", routineLoadJob.getColumnSeparator().toString());
Assert.assertEquals("'A'", routineLoadJob.getRowDelimiter().toString());
Assert.assertEquals("p1,p2,p3", Joiner.on(",").join(routineLoadJob.getPartitions().getPartitionNames()));
}
}

View File

@ -61,7 +61,7 @@ public class AlterRoutineLoadOperationLogTest {
routineLoadDataSourceProperties.analyze();
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId,
jobProperties, routineLoadDataSourceProperties);
jobProperties, routineLoadDataSourceProperties, null);
log.write(out);
out.flush();
out.close();