[Enhancement] Add tracing util for FE (#7220)
BE already added a tracing framework, this PR adds tracing util for FE, and adds some tracing for txn commit/publish
This commit is contained in:
parent
ae20b1f529
commit
b1aaf2a8fe
|
|
@ -19,7 +19,7 @@ Tracer::~Tracer() {
|
|||
Tracer& Tracer::Instance() {
|
||||
static Tracer global_tracer;
|
||||
static std::once_flag oc;
|
||||
std::call_once(oc, [&]() { global_tracer.init("STARROCKS-BE"); });
|
||||
std::call_once(oc, [&]() { global_tracer.init("starrocks"); });
|
||||
return global_tracer;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,9 @@ webserver_port = 8040
|
|||
heartbeat_service_port = 9050
|
||||
brpc_port = 8060
|
||||
|
||||
# Enable jaeger tracing by setting jaeger_endpoint
|
||||
# jaeger_endpoint = localhost:6831
|
||||
|
||||
# Choose one if there are more than one ip except loopback address.
|
||||
# Note that there should at most one ip match this list.
|
||||
# If no ip match this rule, will choose one randomly.
|
||||
|
|
|
|||
|
|
@ -46,6 +46,9 @@ query_port = 9030
|
|||
edit_log_port = 9010
|
||||
mysql_service_nio_enabled = true
|
||||
|
||||
# Enable jaeger tracing by setting jaeger_grpc_endpoint
|
||||
# jaeger_grpc_endpoint = http://localhost:14250
|
||||
|
||||
# Choose one if there are more than one ip except loopback address.
|
||||
# Note that there should at most one ip match this list.
|
||||
# If no ip match this rule, will choose one randomly.
|
||||
|
|
|
|||
|
|
@ -566,6 +566,25 @@ under the License.
|
|||
<artifactId>starclient</artifactId>
|
||||
<version>0.1.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-api -->
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-sdk -->
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-sdk</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-exporter-jaeger -->
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-exporter-jaeger</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
@ -717,7 +736,8 @@ under the License.
|
|||
<!-->not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<-->
|
||||
<reuseForks>false</reuseForks>
|
||||
<argLine>
|
||||
-javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar -Xmx2048m -Duser.timezone=Asia/Shanghai @{jacocoArgLine}
|
||||
-javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar -Xmx2048m
|
||||
-Duser.timezone=Asia/Shanghai @{jacocoArgLine}
|
||||
</argLine>
|
||||
<!-- Set maven to use independent class loading to avoid unit test failure due to the early shutdown of the JVM -->
|
||||
<useSystemClassLoader>false</useSystemClassLoader>
|
||||
|
|
|
|||
|
|
@ -186,13 +186,13 @@ public class Config extends ConfigBase {
|
|||
public static int label_clean_interval_second = 4 * 3600; // 4 hours
|
||||
|
||||
/**
|
||||
* for task set expire time
|
||||
* for task set expire time
|
||||
*/
|
||||
@ConfField(mutable = true)
|
||||
public static int task_ttl_second = 3 * 24 * 3600; // 3 day
|
||||
|
||||
/**
|
||||
* for task run set expire time
|
||||
* for task run set expire time
|
||||
*/
|
||||
@ConfField(mutable = true)
|
||||
public static int task_runs_ttl_second = 3 * 24 * 3600; // 3 day
|
||||
|
|
@ -1485,7 +1485,7 @@ public class Config extends ConfigBase {
|
|||
|
||||
@ConfField(mutable = true)
|
||||
public static boolean enable_experimental_mv = false;
|
||||
|
||||
|
||||
@ConfField
|
||||
public static boolean enable_dict_optimize_routine_load = false;
|
||||
|
||||
|
|
@ -1520,9 +1520,15 @@ public class Config extends ConfigBase {
|
|||
public static int quorom_publish_wait_time_ms = 5000;
|
||||
|
||||
/**
|
||||
* Fqdn function switch,
|
||||
* Fqdn function switch,
|
||||
* this switch will be deleted after release the fqdn func
|
||||
*/
|
||||
@ConfField(mutable = true)
|
||||
public static boolean enable_fqdn_func = false;
|
||||
|
||||
/**
|
||||
* jaeger tracing endpoint, empty thing disables tracing
|
||||
*/
|
||||
@ConfField
|
||||
public static String jaeger_grpc_endpoint = "";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
|
||||
package com.starrocks.common;
|
||||
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import io.opentelemetry.sdk.trace.SdkTracerProvider;
|
||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
|
||||
|
||||
public class TraceManager {
|
||||
private static final String SERVICE_NAME = "starrocks";
|
||||
private static Tracer instance = null;
|
||||
|
||||
public static Tracer getTracer() {
|
||||
if (instance == null) {
|
||||
synchronized (TraceManager.class) {
|
||||
if (instance == null) {
|
||||
OpenTelemetrySdkBuilder builder = OpenTelemetrySdk.builder();
|
||||
if (!Config.jaeger_grpc_endpoint.isEmpty()) {
|
||||
SpanProcessor processor = BatchSpanProcessor.builder(
|
||||
JaegerGrpcSpanExporter.builder().setEndpoint(Config.jaeger_grpc_endpoint)
|
||||
.build()).build();
|
||||
Resource resource = Resource.builder().put("service.name", SERVICE_NAME).build();
|
||||
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
|
||||
.addSpanProcessor(processor)
|
||||
.setResource(resource)
|
||||
.build();
|
||||
builder.setTracerProvider(sdkTracerProvider);
|
||||
}
|
||||
OpenTelemetry openTelemetry = builder.buildAndRegisterGlobal();
|
||||
instance = openTelemetry.getTracer(SERVICE_NAME);
|
||||
}
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static Span startSpan(String name, Span parent) {
|
||||
return getTracer().spanBuilder(name)
|
||||
.setParent(Context.current().with(parent)).startSpan();
|
||||
}
|
||||
|
||||
public static Span startSpan(String name) {
|
||||
return getTracer().spanBuilder(name).startSpan();
|
||||
}
|
||||
}
|
||||
|
|
@ -49,6 +49,7 @@ import com.starrocks.common.LabelAlreadyUsedException;
|
|||
import com.starrocks.common.LoadException;
|
||||
import com.starrocks.common.MetaNotFoundException;
|
||||
import com.starrocks.common.Pair;
|
||||
import com.starrocks.common.TraceManager;
|
||||
import com.starrocks.common.UserException;
|
||||
import com.starrocks.common.util.DebugUtil;
|
||||
import com.starrocks.common.util.TimeUtils;
|
||||
|
|
@ -66,6 +67,7 @@ import com.starrocks.task.ClearTransactionTask;
|
|||
import com.starrocks.task.PublishVersionTask;
|
||||
import com.starrocks.thrift.TTaskType;
|
||||
import com.starrocks.thrift.TUniqueId;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
|
@ -378,6 +380,8 @@ public class DatabaseTransactionMgr {
|
|||
throw new TransactionCommitFailedException(
|
||||
transactionState == null ? "transaction not found" : transactionState.getReason());
|
||||
}
|
||||
Span txnSpan = transactionState.getTxnSpan();
|
||||
txnSpan.addEvent("commit_start");
|
||||
|
||||
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
LOG.debug("transaction is already visible: {}", transactionId);
|
||||
|
|
@ -468,7 +472,6 @@ public class DatabaseTransactionMgr {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
Set<Long> errorReplicaIds = Sets.newHashSet();
|
||||
Set<Long> totalInvolvedBackends = Sets.newHashSet();
|
||||
for (long tableId : tableToPartition.keySet()) {
|
||||
|
|
@ -552,6 +555,8 @@ public class DatabaseTransactionMgr {
|
|||
TxnStateChangeCallback callback = transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
|
||||
// transaction state transform
|
||||
boolean txnOperated = false;
|
||||
|
||||
Span unprotectedCommitSpan = TraceManager.startSpan("unprotectedCommitTransaction", txnSpan);
|
||||
writeLock();
|
||||
try {
|
||||
unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition,
|
||||
|
|
@ -560,12 +565,18 @@ public class DatabaseTransactionMgr {
|
|||
txnOperated = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
unprotectedCommitSpan.end();
|
||||
// after state transform
|
||||
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated, callback, null);
|
||||
}
|
||||
|
||||
// 6. update nextVersion because of the failure of persistent transaction resulting in error version
|
||||
updateCatalogAfterCommitted(transactionState, db);
|
||||
Span updateCatalogAfterCommittedSpan = TraceManager.startSpan("updateCatalogAfterCommitted", txnSpan);
|
||||
try {
|
||||
updateCatalogAfterCommitted(transactionState, db);
|
||||
} finally {
|
||||
updateCatalogAfterCommittedSpan.end();
|
||||
}
|
||||
LOG.info("transaction:[{}] successfully committed", transactionState);
|
||||
}
|
||||
|
||||
|
|
@ -702,8 +713,8 @@ public class DatabaseTransactionMgr {
|
|||
&& (unfinishedBackends == null
|
||||
|| !unfinishedBackends.contains(replica.getBackendId()))) {
|
||||
++successHealthyReplicaNum;
|
||||
// replica report version has greater cur transaction commit version
|
||||
// This can happen when the BE publish succeeds but fails to send a response to FE
|
||||
// replica report version has greater cur transaction commit version
|
||||
// This can happen when the BE publish succeeds but fails to send a response to FE
|
||||
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
|
||||
++successHealthyReplicaNum;
|
||||
} else if (unfinishedBackends != null
|
||||
|
|
@ -728,7 +739,7 @@ public class DatabaseTransactionMgr {
|
|||
if (successHealthyReplicaNum != replicaNum
|
||||
&& !unfinishedBackends.isEmpty()
|
||||
&& currentTs
|
||||
- txn.getCommitTime() < Config.quorom_publish_wait_time_ms) {
|
||||
- txn.getCommitTime() < Config.quorom_publish_wait_time_ms) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -771,6 +782,7 @@ public class DatabaseTransactionMgr {
|
|||
writeUnlock();
|
||||
}
|
||||
}
|
||||
Span finishSpan = TraceManager.startSpan("finishTransaction", transactionState.getTxnSpan());
|
||||
db.writeLock();
|
||||
try {
|
||||
boolean hasError = false;
|
||||
|
|
@ -908,9 +920,15 @@ public class DatabaseTransactionMgr {
|
|||
writeUnlock();
|
||||
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
|
||||
}
|
||||
updateCatalogAfterVisible(transactionState, db);
|
||||
Span updateCatalogSpan = TraceManager.startSpan("updateCatalogAfterVisible", finishSpan);
|
||||
try {
|
||||
updateCatalogAfterVisible(transactionState, db);
|
||||
} finally {
|
||||
updateCatalogSpan.end();
|
||||
}
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
finishSpan.end();
|
||||
}
|
||||
LOG.info("finish transaction {} successfully", transactionState);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import com.starrocks.catalog.OlapTable;
|
|||
import com.starrocks.catalog.Partition;
|
||||
import com.starrocks.common.Config;
|
||||
import com.starrocks.common.FeMetaVersion;
|
||||
import com.starrocks.common.TraceManager;
|
||||
import com.starrocks.common.UserException;
|
||||
import com.starrocks.common.io.Text;
|
||||
import com.starrocks.common.io.Writable;
|
||||
|
|
@ -39,6 +40,7 @@ import com.starrocks.server.GlobalStateMgr;
|
|||
import com.starrocks.task.PublishVersionTask;
|
||||
import com.starrocks.thrift.TPartitionVersionInfo;
|
||||
import com.starrocks.thrift.TUniqueId;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
|
@ -228,6 +230,8 @@ public class TransactionState implements Writable {
|
|||
|
||||
private long lastErrTimeMs = 0;
|
||||
|
||||
private Span txnSpan = null;
|
||||
|
||||
public TransactionState() {
|
||||
this.dbId = -1;
|
||||
this.tableIdList = Lists.newArrayList();
|
||||
|
|
@ -245,6 +249,7 @@ public class TransactionState implements Writable {
|
|||
this.publishVersionTasks = Maps.newHashMap();
|
||||
this.hasSendTask = false;
|
||||
this.latch = new CountDownLatch(1);
|
||||
this.txnSpan = TraceManager.startSpan("txn");
|
||||
}
|
||||
|
||||
public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requestId,
|
||||
|
|
@ -269,6 +274,9 @@ public class TransactionState implements Writable {
|
|||
this.latch = new CountDownLatch(1);
|
||||
this.callbackId = callbackId;
|
||||
this.timeoutMs = timeoutMs;
|
||||
this.txnSpan = TraceManager.startSpan("txn");
|
||||
txnSpan.setAttribute("txn_id", transactionId);
|
||||
txnSpan.setAttribute("label", label);
|
||||
}
|
||||
|
||||
public void setErrorReplicas(Set<Long> newErrorReplicas) {
|
||||
|
|
@ -372,10 +380,16 @@ public class TransactionState implements Writable {
|
|||
if (MetricRepo.isInit) {
|
||||
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
|
||||
}
|
||||
txnSpan.addEvent("set_visible");
|
||||
txnSpan.end();
|
||||
} else if (transactionStatus == TransactionStatus.ABORTED) {
|
||||
if (MetricRepo.isInit) {
|
||||
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
|
||||
}
|
||||
txnSpan.setAttribute("state", "aborted");
|
||||
txnSpan.end();
|
||||
} else if (transactionStatus == TransactionStatus.COMMITTED) {
|
||||
txnSpan.addEvent("set_committed");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -747,4 +761,8 @@ public class TransactionState implements Writable {
|
|||
}
|
||||
return tasks;
|
||||
}
|
||||
|
||||
public Span getTxnSpan() {
|
||||
return txnSpan;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -715,6 +715,14 @@ under the License.
|
|||
<artifactId>hudi-hadoop-mr</artifactId>
|
||||
<version>${hudi.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.opentelemetry</groupId>
|
||||
<artifactId>opentelemetry-bom</artifactId>
|
||||
<version>1.14.0</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue