Implementing custom rules
caution
This feature is still in Beta 🧪. As such, you should not expect it to be 100% stable or be free of bugs. Any public CLI or Python interfaces may change without prior notice.
If you find any bugs or feel like something is not behaving as it should, feel free to open an issue on the MetricFlow Github repo.
By default, MetricFlow will use sensible default rules for running data source inference. However, your warehouse might have specific cases that are not covered by those defaults, and you might want to implement custom rules to enhance your inference pipeline. Let's take a deeper look into how to do that.
Understanding signals​
Before going into how to implement custom rules, let's understand what a rule outputs: signals.
As already mentioned in the overview, a signal is a piece of evidence about a column that is produced by an inference rule. It is always associated with a column type (what the rule believes the column might be), a confidence score and a reason. The following is an example of a signal that indicates column db.schema.table.my_col
is a primary identifier with medium confidence:
from metricflow.dataflow.sql_column import SqlColumn
from metricflow.inference.models import (
InferenceSignal,
InferenceSignalConfidence,
InferenceSignalType
)
signal = InferenceSignal(
column=SqlColumn.from_string("db.schema.table.my_col"),
confidence=InferenceSignalConfidence.MEDIUM,
type_node=InferenceSignalType.ID.PRIMARY,
reason="It's sunny outside so I think this is a primary ID!",
only_applies_to_parent=False
)
You might notice that InferenceSignal
also has an only_applies_to_parent
boolean attribute. complementary signals indicate to the solver that they can't be taken into account by themselves without support from other signals. You can consider that making a signal complementary makes it "weaker" in some sense.
Here are two real examples of only_applies_to_parent
usage in the default MetricFlow rules:
PrimaryIdentifierByNameRule
matches columns namedid
. It produces non-complementary signals because a column being named ID is in itself pretty strong evidence that it is an identifier, without the need for further corroboration from other rules.UniqueIdentifierByDistinctCountRule
checks for columns with unique values. It produces complementary signals because being unique does not necessarily indicate a column is an identifier. However, if there is other evidence it is an ID, then this information can be used by a solver to further specify the type of ID.
In practice, this makes it so a column named id
with unique values will be resolved to unique identifier. However, solving for other columns with unique values which are not identifiers, such as categorical dimensions, is not affected.
note
You might ask yourself why we've abstracted rule outputs as signals instead of just making a single piece of logic that produces definitive column types.
While that's a good starting point, this approach can quickly become unmanageable and complex. For this reason, we wanted to make rules as decoupled and simple as possible. We chose to delegate the task of reasoning about the final type of a column to the inference solver, and allow it to be "confused" if there are contradicting signals for the same column.
Learn more about the default inference solver here.
Implementing a rule​
An inference rule is simply a subclass of InferenceRule
that implements the process()
interface. Below is an example rule that matches nullable columns ending with measure
and produces non-complementary MEASURE
signals with VERY_HIGH
confidence.
from metricflow.inference.models import InferenceSignal, InferenceSignalConfidence, InferenceSignalType
from metricflow.inference.context.data_warehouse import DataWarehouseInferenceContext
from metricflow.inference.rule.base import InferenceRule
class MyCustomInferenceRule(InferenceRule):
def process(self, warehouse: DataWarehouseInferenceContext) -> List[InferenceSignal]:
return [
InferenceSignal(
confidence=InferenceSignalConfidence.VERY_HIGH,
column=column,
type_node=InferenceSignalType.MEASURE.UNKNOWN,
reason="This column is nullable and its name ends with `measure`",
only_applies_to_parent=False
)
for column, props in warehouse.columns.items()
if column.column_name.endswith("measure") and (props.is_nullable or props.null_count == 0)
]
For rules that do not do any cross-column checking, that is, only look at one column at a time, you can simplify the implementation by using ColumnMatcherRule
. Here's an example of a rule with the same behavior as above, using ColumnMatcherRule
:
from metricflow.inference.models import InferenceSignal, InferenceSignalConfidence, InferenceSignalType
from metricflow.inference.context.data_warehouse import DataWarehouseInferenceContext
from metricflow.inference.rule.rules import ColumnMatcherRule
# This will produce a signal with the configured attributes for a column
# if `MyCustomInferenceRule.match_column` returns True for it.
class MyCustomInferenceRule(ColumnMatcherRule):
type_node = InferenceSignalType.MEASURE.UNKNOWN
confidence = InferenceSignalConfidence.VERY_HIGH
only_applies_to_parent_signal = False
match_reason = "This column is nullable and its name ends with `measure`"
def match_column(self, props: ColumnProperties) -> bool:
return (
props.column.column_name.endswith("measure")
and (props.is_nullable or props.null_count == 0)
)
We encourage you to explore MetricFlow's source code to see for yourself how the default inference rules are implemented!
Adding a rule to the inference pipeline​
To add your newly created rule to the inference pipeline, just add it to the rules
array when instantiating the InferenceRunner
:
# ...
runner = InferenceRunner(
context_providers=[
# ...
],
ruleset=[*DEFAULT_RULESET, MyCustomInferenceRule()],
solver=WeightedTypeTreeInferenceSolver(),
renderers=[
# ...
]
)
# ...