diff --git a/be/src/common/tracer.cpp b/be/src/common/tracer.cpp index a7c91be493f..f8263b43734 100644 --- a/be/src/common/tracer.cpp +++ b/be/src/common/tracer.cpp @@ -19,7 +19,7 @@ Tracer::~Tracer() { Tracer& Tracer::Instance() { static Tracer global_tracer; static std::once_flag oc; - std::call_once(oc, [&]() { global_tracer.init("STARROCKS-BE"); }); + std::call_once(oc, [&]() { global_tracer.init("starrocks"); }); return global_tracer; } diff --git a/conf/be.conf b/conf/be.conf index 1c0b37567d3..1cdd8054301 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -24,6 +24,9 @@ webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 +# Enable jaeger tracing by setting jaeger_endpoint +# jaeger_endpoint = localhost:6831 + # Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. diff --git a/conf/fe.conf b/conf/fe.conf index 7b62b92fc63..2f485d51a06 100644 --- a/conf/fe.conf +++ b/conf/fe.conf @@ -46,6 +46,9 @@ query_port = 9030 edit_log_port = 9010 mysql_service_nio_enabled = true +# Enable jaeger tracing by setting jaeger_grpc_endpoint +# jaeger_grpc_endpoint = http://localhost:14250 + # Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index ef32336d05c..8e737ad2509 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -566,6 +566,25 @@ under the License. starclient 0.1.1 + + + + io.opentelemetry + opentelemetry-api + + + + + io.opentelemetry + opentelemetry-sdk + + + + + io.opentelemetry + opentelemetry-exporter-jaeger + + @@ -717,7 +736,8 @@ under the License. not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<--> false - -javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar -Xmx2048m -Duser.timezone=Asia/Shanghai @{jacocoArgLine} + -javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar -Xmx2048m + -Duser.timezone=Asia/Shanghai @{jacocoArgLine} false diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index f7a01cfd1f7..fea364c39c6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -186,13 +186,13 @@ public class Config extends ConfigBase { public static int label_clean_interval_second = 4 * 3600; // 4 hours /** - * for task set expire time + * for task set expire time */ @ConfField(mutable = true) public static int task_ttl_second = 3 * 24 * 3600; // 3 day /** - * for task run set expire time + * for task run set expire time */ @ConfField(mutable = true) public static int task_runs_ttl_second = 3 * 24 * 3600; // 3 day @@ -1485,7 +1485,7 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_experimental_mv = false; - + @ConfField public static boolean enable_dict_optimize_routine_load = false; @@ -1520,9 +1520,15 @@ public class Config extends ConfigBase { public static int quorom_publish_wait_time_ms = 5000; /** - * Fqdn function switch, + * Fqdn function switch, * this switch will be deleted after release the fqdn func */ @ConfField(mutable = true) public static boolean enable_fqdn_func = false; + + /** + * jaeger tracing endpoint, empty thing disables tracing + */ + @ConfField + public static String jaeger_grpc_endpoint = ""; } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/TraceManager.java b/fe/fe-core/src/main/java/com/starrocks/common/TraceManager.java new file mode 100644 index 00000000000..197a0162ed8 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/common/TraceManager.java @@ -0,0 +1,52 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. +package com.starrocks.common; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; + +public class TraceManager { + private static final String SERVICE_NAME = "starrocks"; + private static Tracer instance = null; + + public static Tracer getTracer() { + if (instance == null) { + synchronized (TraceManager.class) { + if (instance == null) { + OpenTelemetrySdkBuilder builder = OpenTelemetrySdk.builder(); + if (!Config.jaeger_grpc_endpoint.isEmpty()) { + SpanProcessor processor = BatchSpanProcessor.builder( + JaegerGrpcSpanExporter.builder().setEndpoint(Config.jaeger_grpc_endpoint) + .build()).build(); + Resource resource = Resource.builder().put("service.name", SERVICE_NAME).build(); + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(processor) + .setResource(resource) + .build(); + builder.setTracerProvider(sdkTracerProvider); + } + OpenTelemetry openTelemetry = builder.buildAndRegisterGlobal(); + instance = openTelemetry.getTracer(SERVICE_NAME); + } + } + } + return instance; + } + + public static Span startSpan(String name, Span parent) { + return getTracer().spanBuilder(name) + .setParent(Context.current().with(parent)).startSpan(); + } + + public static Span startSpan(String name) { + return getTracer().spanBuilder(name).startSpan(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index 1e73694c94d..7db773356ca 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -49,6 +49,7 @@ import com.starrocks.common.LabelAlreadyUsedException; import com.starrocks.common.LoadException; import com.starrocks.common.MetaNotFoundException; import com.starrocks.common.Pair; +import com.starrocks.common.TraceManager; import com.starrocks.common.UserException; import com.starrocks.common.util.DebugUtil; import com.starrocks.common.util.TimeUtils; @@ -66,6 +67,7 @@ import com.starrocks.task.ClearTransactionTask; import com.starrocks.task.PublishVersionTask; import com.starrocks.thrift.TTaskType; import com.starrocks.thrift.TUniqueId; +import io.opentelemetry.api.trace.Span; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -378,6 +380,8 @@ public class DatabaseTransactionMgr { throw new TransactionCommitFailedException( transactionState == null ? "transaction not found" : transactionState.getReason()); } + Span txnSpan = transactionState.getTxnSpan(); + txnSpan.addEvent("commit_start"); if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { LOG.debug("transaction is already visible: {}", transactionId); @@ -468,7 +472,6 @@ public class DatabaseTransactionMgr { } } - Set errorReplicaIds = Sets.newHashSet(); Set totalInvolvedBackends = Sets.newHashSet(); for (long tableId : tableToPartition.keySet()) { @@ -552,6 +555,8 @@ public class DatabaseTransactionMgr { TxnStateChangeCallback callback = transactionState.beforeStateTransform(TransactionStatus.COMMITTED); // transaction state transform boolean txnOperated = false; + + Span unprotectedCommitSpan = TraceManager.startSpan("unprotectedCommitTransaction", txnSpan); writeLock(); try { unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition, @@ -560,12 +565,18 @@ public class DatabaseTransactionMgr { txnOperated = true; } finally { writeUnlock(); + unprotectedCommitSpan.end(); // after state transform transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated, callback, null); } // 6. update nextVersion because of the failure of persistent transaction resulting in error version - updateCatalogAfterCommitted(transactionState, db); + Span updateCatalogAfterCommittedSpan = TraceManager.startSpan("updateCatalogAfterCommitted", txnSpan); + try { + updateCatalogAfterCommitted(transactionState, db); + } finally { + updateCatalogAfterCommittedSpan.end(); + } LOG.info("transaction:[{}] successfully committed", transactionState); } @@ -702,8 +713,8 @@ public class DatabaseTransactionMgr { && (unfinishedBackends == null || !unfinishedBackends.contains(replica.getBackendId()))) { ++successHealthyReplicaNum; - // replica report version has greater cur transaction commit version - // This can happen when the BE publish succeeds but fails to send a response to FE + // replica report version has greater cur transaction commit version + // This can happen when the BE publish succeeds but fails to send a response to FE } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) { ++successHealthyReplicaNum; } else if (unfinishedBackends != null @@ -728,7 +739,7 @@ public class DatabaseTransactionMgr { if (successHealthyReplicaNum != replicaNum && !unfinishedBackends.isEmpty() && currentTs - - txn.getCommitTime() < Config.quorom_publish_wait_time_ms) { + - txn.getCommitTime() < Config.quorom_publish_wait_time_ms) { return false; } } @@ -771,6 +782,7 @@ public class DatabaseTransactionMgr { writeUnlock(); } } + Span finishSpan = TraceManager.startSpan("finishTransaction", transactionState.getTxnSpan()); db.writeLock(); try { boolean hasError = false; @@ -908,9 +920,15 @@ public class DatabaseTransactionMgr { writeUnlock(); transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); } - updateCatalogAfterVisible(transactionState, db); + Span updateCatalogSpan = TraceManager.startSpan("updateCatalogAfterVisible", finishSpan); + try { + updateCatalogAfterVisible(transactionState, db); + } finally { + updateCatalogSpan.end(); + } } finally { db.writeUnlock(); + finishSpan.end(); } LOG.info("finish transaction {} successfully", transactionState); } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java index 64620dedf91..61698ef293d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java @@ -31,6 +31,7 @@ import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.Partition; import com.starrocks.common.Config; import com.starrocks.common.FeMetaVersion; +import com.starrocks.common.TraceManager; import com.starrocks.common.UserException; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; @@ -39,6 +40,7 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.task.PublishVersionTask; import com.starrocks.thrift.TPartitionVersionInfo; import com.starrocks.thrift.TUniqueId; +import io.opentelemetry.api.trace.Span; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,6 +230,8 @@ public class TransactionState implements Writable { private long lastErrTimeMs = 0; + private Span txnSpan = null; + public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); @@ -245,6 +249,7 @@ public class TransactionState implements Writable { this.publishVersionTasks = Maps.newHashMap(); this.hasSendTask = false; this.latch = new CountDownLatch(1); + this.txnSpan = TraceManager.startSpan("txn"); } public TransactionState(long dbId, List tableIdList, long transactionId, String label, TUniqueId requestId, @@ -269,6 +274,9 @@ public class TransactionState implements Writable { this.latch = new CountDownLatch(1); this.callbackId = callbackId; this.timeoutMs = timeoutMs; + this.txnSpan = TraceManager.startSpan("txn"); + txnSpan.setAttribute("txn_id", transactionId); + txnSpan.setAttribute("label", label); } public void setErrorReplicas(Set newErrorReplicas) { @@ -372,10 +380,16 @@ public class TransactionState implements Writable { if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_SUCCESS.increase(1L); } + txnSpan.addEvent("set_visible"); + txnSpan.end(); } else if (transactionStatus == TransactionStatus.ABORTED) { if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_FAILED.increase(1L); } + txnSpan.setAttribute("state", "aborted"); + txnSpan.end(); + } else if (transactionStatus == TransactionStatus.COMMITTED) { + txnSpan.addEvent("set_committed"); } } @@ -747,4 +761,8 @@ public class TransactionState implements Writable { } return tasks; } + + public Span getTxnSpan() { + return txnSpan; + } } diff --git a/fe/pom.xml b/fe/pom.xml index 82e1e8a79d4..ee5ea6589e5 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -715,6 +715,14 @@ under the License. hudi-hadoop-mr ${hudi.version} + + + io.opentelemetry + opentelemetry-bom + 1.14.0 + pom + import + diff --git a/licenses-binary/LICENSE-opentelemetry-cpp.txt b/licenses-binary/LICENSE-opentelemetry.txt similarity index 100% rename from licenses-binary/LICENSE-opentelemetry-cpp.txt rename to licenses-binary/LICENSE-opentelemetry.txt