dstoredist
Dstoredist can be used to distribute and/or store protobuf messages generated by dnsdist & recursor. It also has the ability to apply filtering, conversion and sampling to the received protobuf messages.
Destinations to which dstoredist can distribute and/or store messages:
- dstoredist
- Kafka
- S3 object storage
Note: dstoredist inside a Userplane deployment cannot send data directly to a TopN Reporter deployed as part of Controlplane. You will need to have a dstoredist in Userplane configured to send messages to a dstoredist in Controlplane, which can then feed into TopN Reporter. For example:
Concepts
There are 2 crucial concepts to how dstoredist functions:
- Destination: Location where output messages should be sent or stored
- Routes: Mapping of input filtering rules and destinations
The relationship between these is as follows:
- Routes can specify multiple destinations
- A destination can be specified for multiple routes
- A destination which is not part of any route will not receive any data
A diagram of a potential Userplane dstoredist configuration:
Configuration Reference
Instance Sets
Instances of dstoredist can be defined under the root node dstoredists
. Each instance set should be a key-value pair, with:
- key: Name of the instance set
- value: Dictionary holding the configuration of the instance set
In a minimal configuration you will need to have at least 1 destination and 1 route defined. destinations
and routes
both also require a similar key-value pair, with:
- key: name of the destination/route
- value: Dictionary holding the configuration of the destination/route
A minimal example:
dstoredists:
mydstoredist:
< dstoredist configuration >
destinations:
mydestination:
< destination configuration >
routes:
myroute:
destinations:
- mydestination
< route configuration >
Parameters which can be used to configure dstoredist instances for a specific instance set are shown in the below table.
Parameter | Type | Default | Description |
---|---|---|---|
affinity |
k8s: Affinity |
pod affinity (Kubernetes docs: Affinity and anti-affinity). If unset, a default anti-affinity is applied using antiAffinityPreset to spread pods across nodes |
|
antiAffinityPreset |
string |
"preferred" |
pod anti affinity preset. Available options: "preferred" "required" |
batchBufferSize |
integer |
1048576 |
Size in bytes of the buffer used to handle message batches internally |
connTimeout |
go: DurationString |
Idle timeout for connections to dstoredist. Only applies if a value is configured explicitly | |
containerSecurityContext |
k8s: SecurityContext |
|
SecurityContext applied to each container |
destinations |
Map of Destination | {} |
Destinations configuration |
hostNetwork |
boolean |
false |
Use host networking for pods |
ipsets |
Map of IPSet | {} |
Sets of IP prefixes which can be used for filtering events |
logFormat |
string |
"text" |
Format of logging. Available options: "text" "json" |
logLevel |
string |
"info" |
Level of logging. Available options: "debug" "info" "warn" "error" |
nodeSelector |
k8s: NodeSelector |
{} |
Kubernetes pod nodeSelector |
podAnnotations |
k8s: Annotations |
{} |
Annotations to be added to each pod |
podDisruptionBudget |
k8s: PodDisruptionBudgetSpec |
{} |
Spec of PodDisruptionBudget to be applied to deployment |
podLabels |
k8s: Labels |
{} |
Labels to be added to each pod |
podSecurityContext |
k8s: PodSecurityContext |
|
SecurityContext applied to each pod |
resources |
k8s: Resources |
|
Resources allocated to the dstoredist container if resourceDefaults (global) is true |
routes |
Map of Route | {} |
Routes configuration |
service |
Service |
|
Service configuration |
serviceLabels |
k8s: Labels |
{} |
Labels to be added to each service |
tlsconfig |
TLSConfig | {} |
TLS configuration options |
tolerations |
List of k8s: Tolerations |
[] |
Kubernetes pod Tolerations |
use_tls |
boolean |
false |
If true , enable TLS for inbound listener. Must be configured using tlsconfig |
Service Configuration
Parameters to configure the service object for this deployment. For example:
dstoredists:
mydstoredist:
service:
type: LoadBalancer
annotations:
metallb.universe.tf/address-pool: name_of_pool
Parameter | Type | Default | Description |
---|---|---|---|
allocateLoadBalancerNodePorts |
boolean |
true |
If true, services with type LoadBalancer automatically assign NodePorts. Can be set to false if the LoadBalancer provider does not rely on NodePorts |
annotations |
k8s: Annotations |
{} |
Annotations for the service |
clusterIP |
string |
Static cluster IP, must be in the cluster's range of cluster IPs and not in use. Randomly assigned when not specified. | |
clusterIPs |
List of string |
List of static cluster IPs, must be in the cluster's range of cluster IPs and not in use. | |
externalIPs |
List of string |
List of IP addresses for which nodes in the cluster will also accept traffic for this service. These IPs are not managed by Kubernetes and must be user-defined on the cluster's nodes | |
externalTrafficPolicy |
string |
Cluster |
Can be set to Local to let nodes distribute traffic received on one of the externally-facing addresses (NodePort and LoadBalancer ) solely to endpoints on the node itself |
healthCheckNodePort |
integer |
For services with type LoadBalancer and externalTrafficPolicy Local you can configure this value to choose a static port for the NodePort which external systems (LoadBalancer provider mainly) can use to determine which node holds endpoints for this service |
|
internalTrafficPolicy |
string |
Cluster |
Can be set to Local to let nodes distribute traffic received on the ClusterIP solely to endpoints on the node itself |
loadBalancerIP |
string |
Deprecated Kubernetes feature, available for backwards compatibility: IP address to attempt to claim for use by this LoadBalancer. Replaced by annotations specific to each LoadBalancer provider |
|
loadBalancerSourceRanges |
List of string |
If supported by the LoadBalancer provider, restrict traffic to this LoadBalancer to these ranges | |
loadBalancerClass |
string |
Used to select a non-default type of LoadBalancer class to ensure the appropriate LoadBalancer provisioner attempt to manage this LoadBalancer service | |
publishNotReadyAddresses |
boolean |
false |
Service is populated with endpoints regardless of readiness state |
sessionAffinity |
string |
None |
Can be set to ClientIP to attempt to maintain session affinity. |
sessionAffinityConfig |
k8s: SessionAffinityConfig |
{} |
Configuration of session affinity |
type |
string |
ClusterIP |
Type of service. Available options: "ClusterIP" "LoadBalancer" "NodePort" |
Listener TLS Config
Parameters to configure TLS options if use_tls
is set to true
, these should be child attributes of the tlsconfig
node. For example:
dstoredists:
mydstoredist:
use_tls: true
tlsconfig:
certSecret: myTLSSecret
destinations:
< destinations >
routes:
< routes >
Parameter | Type | Default | Description |
---|---|---|---|
cert |
string |
Certificate in PEM format. Use of certSecret is recommended for security reasons |
|
certSecret |
string |
Name of a pre-existing Kubernetes TLS Secret containing the tls.crt and tls.key data items |
|
key |
string |
Key in PEM format. Use of certSecret is recommended for security reasons |
Note: Either certSecret
or cert
+ key
is required when use_tls
is true
IP Set
Parameters to configure sets of IP prefixes which can be used for filtering. For example:
dstoredists:
mydstoredist:
ipsets:
myset:
configMap: my-config-map
configMapKey: ipset.dat
destinations:
< destinations >
routes:
< routes >
Example of what ConfigMap my-config-map
could look like:
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config-map
data:
ipset.dat: |
127.0.0.1/16
128.243.0.0/16
# Comments beginning with # are allowed
fe80::1cc0:3e8c:119f:c2e1/18
The above example will expose an ipset named myset
, containing the prefixes stored in ipset.dat
inside the configmap named my-config-map
.
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
configMap |
string |
yes |
Name of a pre-existing Kubernetes configmap holding the IP prefixes | |
configMapKey |
string |
yes |
Name of the item inside configMap which holds the IP prefixes |
|
pollInterval |
go: DurationString |
5s |
Interval between checks performed by dstoredist to see if the IP prefixes need to be reloaded from the mounted configmap |
Destination
Parameters which can be used to configure a dstoredist destination:
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
blackhole |
boolean |
false |
If true, all messages to this destination will be discarded | |
burst |
integer |
If a rate is configured, allow bursts of messages up to this size |
||
rate |
integer |
If configured, maximum number of messages per second to send to the destination. Any messages exceeding this will be discarded. By default all messages will be sent to the destination. |
||
sample |
integer |
1 |
Apply sampling to the protobuf messages. If a higher value X is configured, only 1 out of every X messages will be sent to the destination. |
|
type |
string |
"pdns" |
Type of destination. Available options: "pdns" "kafka" "storage" See below for more configuration options specific to each destination type. |
Destination: pdns
Additional parameters are available on dstoredist destinations with type: pdns
(or no type
). These should be attributes of the destination item itself. For example:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: pdns
addresses:
- my.dstoredist.endpoint.local:1234
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
addresses |
List of string |
yes |
List of addresses of dstoredist deployments. Should be either IP:port or host:port |
|
connect_timeout |
go: DurationString |
"5s" |
How long to wait before timing out connection attempts | |
distribute |
string |
roundrobin |
Distribution algorithm when multiple addresses are configured. Available options: "roundrobin" "sharded" |
|
framing |
string |
"16bit" |
How to frame protobuf messages. Available options: "16bit" "32bit" "repeated" |
|
tlsconfig |
TLSConfig | {} |
TLS configuration options | |
use_tls |
boolean |
false |
If true , attempt to connect to the addresses using TLS |
|
write_timeout |
go: DurationString |
"5s" |
How long to wait before timing out write attempts |
Destination: kafka
Additional parameters are available on dstoredist destinations with type: kafka
. These should be nested attributes inside a kafka:
item of the destination item itself. For example:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: kafka
kafka:
addresses:
- some.kafka.endpoint:9092
topic: mytopic
The nested kafka:
attribute takes the following parameters:
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
addresses |
List of string |
yes |
List of addresses of kafka endpoints. Should be either IP:port or host:port |
|
async |
boolean |
false |
If true , dstoredist writes to Kafka never block and all responses from Kafka are ignored |
|
balancer |
string |
"roundrobin" |
Balancer used to distribute Kafka messages amongst partitions. Available options: "roundrobin" "leastbytes" "fnv-1a" "crc32" "murmur2" |
|
batch_size |
integer |
10000 |
Number of messages which will constitute a batch. dstoredist will wait for new messages until either the batch size is reached, or the batch_timeout is exceeded |
|
batch_timeout |
go: DurationString |
1ms |
Timeout before an incomplete batch is written to Kafka | |
compression |
string |
"" |
Compression codec to use. Available options: "gzip" "snappy" "lz4" "zstd" No compression is performed when empty |
|
instance_name |
string |
If configured, a header will be added to each Kafka message with this value | ||
json_encode |
boolean |
false |
If true , JSON encode the data before sending to Kafka |
|
max_attempts |
integer |
2 |
Number of times a message will be attempted to send to kafka | |
max_msg_size |
integer |
900000 |
Maximum size of a kafka message (in bytes). Cannot be lower than 65536. |
|
num_workers |
integer |
2 |
Number of concurrent workers that will process protobuf messages and send them to kafka | |
read_timeout |
go: DurationString |
10s |
Timeout for reads from Kafka | |
required_acks |
string |
"one" |
How many acks are required from kafka. Available options: "one" "all" "none" |
|
single_msgs |
boolean |
false |
If true , each Kafka message will only contain a single protobuf message |
|
tlsconfig |
TLSConfig | {} |
TLS configuration options | |
topic |
string |
yes |
Name of Kafka topic to send messages to | |
use_tls |
boolean |
false |
If true , attempt to connect to the addresses using TLS |
|
write_timeout |
go: DurationString |
10s |
Timeout for writes to Kafka |
Destination: storage
Additional parameters are available on dstoredist destinations with type: storage
. These should be nested attributes inside a storage:
item of the destination item itself. For example:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: storage
storage:
type: s3
encoding: json
options:
secretName: myS3secret
endpoint_url: https://my.s3.endpoint
bucket: myBucket
region: myRegion
The nested storage:
attribute takes the following parameters:
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
encoding |
string |
"protobuf" |
Encoding used for the files. Available options: "protobuf" "json" "bind" |
|
flush_interval |
go: DurationString |
300s |
Time between consecutive flushes to storage (only if max_size is not reached before this interval) |
|
max_size |
integer |
Maximum size in bytes of the file (before compression). Defaults to dstoredist's toplevel configuration item batchBufferSize . If batchBufferSize is not set the value will be 1048576 |
||
num_workers |
integer |
2 |
Number of concurrent workers that will process protobuf messages and attempt to store them | |
options |
StorageOptions | yes |
Configuration of the storage backend | |
type |
string |
"s3" |
Type of storage backend. Available options: "s3" |
|
use_compression |
boolean |
false |
If true , compress files using gzip compression |
Storage Options
For storage with type: s3
the following can be configured under options:
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
access_key |
string |
S3 access key. Use of secretName is recommended for security reasons |
||
bucket |
string |
yes |
Name of the S3 bucket | |
endpoint_url |
string |
yes |
Endpoint of the S3 service to connect to | |
region |
string |
"us-east-1" |
Region used when connecting to the S3 endpoint | |
secret_key |
string |
S3 secret key. Use of secretName is recommended for security reasons |
||
secretAccessKey |
string |
"access_key" |
If secretName is specified: name of the item inside the Secret which holds the access key |
|
secretName |
string |
Name of a pre-existing Kubernetes Secret containing a username & password for authentication with Elastic | ||
secretSecretKey |
string |
"secret_key" |
If secretName is specified: name of the item inside the Secret which holds the secret key |
|
tlsconfig |
TLSConfig | {} |
TLS configuration options |
Destination TLS Config
Parameters to configure TLS options if use_tls
is set to true
, these should be child attributes of the tlsconfig
node. For example with type pdns
:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: pdns
addresses:
- my.dstoredist.endpoint.local:1234
use_tls: true
tlsconfig:
insecure_skip_verify: true
With type kafka
:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: kafka
kafka:
addresses:
- my.dstoredist.endpoint.local:1234
topic: myTopic
use_tls: true
tlsconfig:
insecure_skip_verify: true
With type storage
:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: storage
storage:
type: s3
options:
endpoint_url: https://my.s3.endpoint
bucket: myBucket
tlsconfig:
insecure_skip_verify: true
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
ca |
string |
CA in PEM format to use for validation. Use of caSecret is recommended for security reasons |
||
caSecret |
string |
Name of a pre-existing Kubernetes Secret with a data item named ca.crt containing the CA in PEM format to use for validation |
||
cert |
string |
Certificate in PEM format to use for client certificate validation. Use of clientSecret is recommended for security reasons |
||
clientSecret |
string |
Name of a pre-existing Kubernetes TLS Secret containing the tls.crt and tls.key data items to use if the destination requires client certificate validation |
||
insecure_skip_verify |
boolean |
false |
Skip validation of the storage backend's certificate chain and hostname | |
key |
string |
Key in PEM format to use for client certificate validation. Use of clientSecret is recommended for security reasons |
Route
Parameters which can be used to configure a dstoredist route:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: pdns
addresses:
- another.dstoredist.endpoint.local:1234
routes:
myroute:
destinations:
- mydestination
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
append_tags |
List of string |
[] |
List of tags which will be appended each message for this route | |
destinations |
List of string |
yes |
List of names of destinations, these must have been configured on this dstoredist instance | |
filters |
Map of filters | {} |
Filters as defined by the format described in the dstore documentation for the filters parameter under routes |
Routes with filters can have several levels of nesting in the filters, for example:
dstoredists:
mydstoredist:
destinations:
mydestination:
type: pdns
addresses:
- another.dstoredist.endpoint.local:1234
routes:
myroute:
destinations:
- mydestination
filters:
- is_response: true
- has_deviceid: true
- tag: REQUIRED_TAG
- or:
- tag: GAMBLING
- tag: FASHION
- not:
and:
- tag: FOO
- tag: BAR
Refer to the dstore documentation for more insight into the filter configuration.