@@ -2,8 +2,10 @@ package distributor
22
33import (
44 "context"
5+ "encoding/binary"
56 "flag"
67 "fmt"
8+ "hash/fnv"
79 "math"
810 "net/http"
911 "runtime/pprof"
@@ -44,9 +46,11 @@ import (
4446 "github.com/grafana/loki/v3/pkg/distributor/shardstreams"
4547 "github.com/grafana/loki/v3/pkg/distributor/writefailures"
4648 "github.com/grafana/loki/v3/pkg/ingester"
47- "github.com/grafana/loki/v3/pkg/ingester/client"
49+ ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
4850 "github.com/grafana/loki/v3/pkg/kafka"
4951 kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
52+ limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
53+ limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
5054 "github.com/grafana/loki/v3/pkg/loghttp/push"
5155 "github.com/grafana/loki/v3/pkg/logproto"
5256 "github.com/grafana/loki/v3/pkg/logql/syntax"
@@ -96,9 +100,11 @@ type Config struct {
96100
97101 OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
98102
99- KafkaEnabled bool `yaml:"kafka_writes_enabled"`
100- IngesterEnabled bool `yaml:"ingester_writes_enabled"`
101- KafkaConfig kafka.Config `yaml:"-"`
103+ KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104+ IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105+ IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
106+
107+ KafkaConfig kafka.Config `yaml:"-"`
102108
103109 // TODO: cleanup config
104110 TenantTopic TenantTopicConfig `yaml:"tenant_topic" category:"experimental"`
@@ -114,6 +120,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
114120 fs .IntVar (& cfg .PushWorkerCount , "distributor.push-worker-count" , 256 , "Number of workers to push batches to ingesters." )
115121 fs .BoolVar (& cfg .KafkaEnabled , "distributor.kafka-writes-enabled" , false , "Enable writes to Kafka during Push requests." )
116122 fs .BoolVar (& cfg .IngesterEnabled , "distributor.ingester-writes-enabled" , true , "Enable writes to Ingesters during Push requests. Defaults to true." )
123+ fs .BoolVar (& cfg .IngestLimitsEnabled , "distributor.ingest-limits-enabled" , false , "Enable checking limits against the ingest-limits service. Defaults to false." )
117124}
118125
119126func (cfg * Config ) Validate () error {
@@ -143,12 +150,12 @@ type Distributor struct {
143150 cfg Config
144151 ingesterCfg ingester.Config
145152 logger log.Logger
146- clientCfg client .Config
153+ clientCfg ingester_client .Config
147154 tenantConfigs * runtime.TenantConfigs
148155 tenantsRetention * retention.TenantsRetention
149156 ingestersRing ring.ReadRing
150157 validator * Validator
151- pool * ring_client.Pool
158+ ingesterClients * ring_client.Pool
152159 tee Tee
153160
154161 rateStore RateStore
@@ -184,10 +191,20 @@ type Distributor struct {
184191 ingesterTasks chan pushIngesterTask
185192 ingesterTaskWg sync.WaitGroup
186193
194+ // Will succeed usage tracker in future.
195+ limitsFrontendRing ring.ReadRing
196+ limitsFrontends * ring_client.Pool
197+
187198 // kafka
188199 kafkaWriter KafkaProducer
189200 partitionRing ring.PartitionRingReader
190201
202+ // The number of partitions for the stream metadata topic. Unlike stream
203+ // records, where entries are sharded over just the active partitions,
204+ // stream metadata is sharded over all partitions, and all partitions
205+ // are consumed.
206+ numMetadataPartitions int
207+
191208 // kafka metrics
192209 kafkaAppends * prometheus.CounterVec
193210 kafkaWriteBytesTotal prometheus.Counter
@@ -199,7 +216,7 @@ type Distributor struct {
199216func New (
200217 cfg Config ,
201218 ingesterCfg ingester.Config ,
202- clientCfg client .Config ,
219+ clientCfg ingester_client .Config ,
203220 configs * runtime.TenantConfigs ,
204221 ingestersRing ring.ReadRing ,
205222 partitionRing ring.PartitionRingReader ,
@@ -208,26 +225,31 @@ func New(
208225 metricsNamespace string ,
209226 tee Tee ,
210227 usageTracker push.UsageTracker ,
228+ limitsFrontendCfg limits_frontend_client.Config ,
229+ limitsFrontendRing ring.ReadRing ,
230+ numMetadataPartitions int ,
211231 logger log.Logger ,
212232) (* Distributor , error ) {
213- factory := cfg .factory
214- if factory == nil {
215- factory = ring_client .PoolAddrFunc (func (addr string ) (ring_client.PoolClient , error ) {
216- return client .New (clientCfg , addr )
233+ ingesterClientFactory := cfg .factory
234+ if ingesterClientFactory == nil {
235+ ingesterClientFactory = ring_client .PoolAddrFunc (func (addr string ) (ring_client.PoolClient , error ) {
236+ return ingester_client .New (clientCfg , addr )
217237 })
218238 }
219239
220- internalFactory := func (addr string ) (ring_client.PoolClient , error ) {
240+ internalIngesterClientFactory := func (addr string ) (ring_client.PoolClient , error ) {
221241 internalCfg := clientCfg
222242 internalCfg .Internal = true
223- return client .New (internalCfg , addr )
243+ return ingester_client .New (internalCfg , addr )
224244 }
225245
226246 validator , err := NewValidator (overrides , usageTracker )
227247 if err != nil {
228248 return nil , err
229249 }
230250
251+ limitsFrontendClientFactory := limits_frontend_client .NewPoolFactory (limitsFrontendCfg )
252+
231253 // Create the configured ingestion rate limit strategy (local or global).
232254 var ingestionRateStrategy limiter.RateLimiterStrategy
233255 var distributorsLifecycler * ring.BasicLifecycler
@@ -274,7 +296,7 @@ func New(
274296 tenantsRetention : retention .NewTenantsRetention (overrides ),
275297 ingestersRing : ingestersRing ,
276298 validator : validator ,
277- pool : clientpool .NewPool ("ingester" , clientCfg .PoolConfig , ingestersRing , factory , logger , metricsNamespace ),
299+ ingesterClients : clientpool .NewPool ("ingester" , clientCfg .PoolConfig , ingestersRing , ingesterClientFactory , logger , metricsNamespace ),
278300 labelCache : labelCache ,
279301 shardTracker : NewShardTracker (),
280302 healthyInstancesCount : atomic .NewUint32 (0 ),
@@ -335,6 +357,15 @@ func New(
335357 writeFailuresManager : writefailures .NewManager (logger , registerer , cfg .WriteFailuresLogging , configs , "distributor" ),
336358 kafkaWriter : kafkaWriter ,
337359 partitionRing : partitionRing ,
360+ limitsFrontendRing : limitsFrontendRing ,
361+ limitsFrontends : limits_frontend_client .NewPool (
362+ limits_frontend .RingName ,
363+ limitsFrontendCfg .PoolConfig ,
364+ limitsFrontendRing ,
365+ limitsFrontendClientFactory ,
366+ logger ,
367+ ),
368+ numMetadataPartitions : numMetadataPartitions ,
338369 }
339370
340371 if overrides .IngestionRateStrategy () == validation .GlobalIngestionRateStrategy {
@@ -366,7 +397,7 @@ func New(
366397 "rate-store" ,
367398 clientCfg .PoolConfig ,
368399 ingestersRing ,
369- ring_client .PoolAddrFunc (internalFactory ),
400+ ring_client .PoolAddrFunc (internalIngesterClientFactory ),
370401 logger ,
371402 metricsNamespace ,
372403 ),
@@ -375,7 +406,7 @@ func New(
375406 )
376407 d .rateStore = rs
377408
378- servs = append (servs , d .pool , rs )
409+ servs = append (servs , d .ingesterClients , rs )
379410 d .subservices , err = services .NewManager (servs ... )
380411 if err != nil {
381412 return nil , errors .Wrap (err , "services manager" )
@@ -417,8 +448,9 @@ func (d *Distributor) stopping(_ error) error {
417448}
418449
419450type KeyedStream struct {
420- HashKey uint32
421- Stream logproto.Stream
451+ HashKey uint32
452+ HashKeyNoShard uint64
453+ Stream logproto.Stream
422454}
423455
424456// TODO taken from Cortex, see if we can refactor out an usable interface.
@@ -474,6 +506,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
474506 return & logproto.PushResponse {}, httpgrpc .Errorf (http .StatusUnprocessableEntity , validation .MissingStreamsErrorMsg )
475507 }
476508
509+ if d .cfg .IngestLimitsEnabled {
510+ exceedsLimits , err := d .exceedsLimits (ctx , tenantID , req .Streams )
511+ if err != nil {
512+ level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
513+ } else if len (exceedsLimits .RejectedStreams ) > 0 {
514+ level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
515+ } else {
516+ level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
517+ }
518+ }
519+
477520 // First we flatten out the request into a list of samples.
478521 // We use the heuristic of 1 sample per TS to size the array.
479522 // We also work out the hash value at the same time.
@@ -494,8 +537,9 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
494537 return
495538 }
496539 streams = append (streams , KeyedStream {
497- HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
498- Stream : stream ,
540+ HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
541+ HashKeyNoShard : stream .Hash ,
542+ Stream : stream ,
499543 })
500544 }
501545
@@ -932,7 +976,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
932976 shardCount := d .shardCountFor (logger , & stream , pushSize , tenantID , shardStreamsCfg )
933977
934978 if shardCount <= 1 {
935- return []KeyedStream {{HashKey : lokiring .TokenFor (tenantID , stream .Labels ), Stream : stream }}
979+ return []KeyedStream {{HashKey : lokiring .TokenFor (tenantID , stream .Labels ), HashKeyNoShard : stream . Hash , Stream : stream }}
936980 }
937981
938982 d .streamShardCount .Inc ()
@@ -976,8 +1020,9 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
9761020 shard := d .createShard (streamLabels , streamPattern , shardNum , entriesPerShard )
9771021
9781022 derivedStreams = append (derivedStreams , KeyedStream {
979- HashKey : lokiring .TokenFor (tenantID , shard .Labels ),
980- Stream : shard ,
1023+ HashKey : lokiring .TokenFor (tenantID , shard .Labels ),
1024+ HashKeyNoShard : stream .Hash ,
1025+ Stream : shard ,
9811026 })
9821027
9831028 if shardStreamsCfg .LoggingEnabled {
@@ -1107,9 +1152,65 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
11071152 }
11081153}
11091154
1155+ func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []logproto.Stream ) (* logproto.ExceedsLimitsResponse , error ) {
1156+ // We use an FNV-1 of all stream hashes in the request to load balance requests
1157+ // to limits-frontends instances.
1158+ h := fnv .New32 ()
1159+
1160+ // The distributor sends the hashes of all streams in the request to the
1161+ // limits-frontend. The limits-frontend is responsible for deciding if
1162+ // the request would exceed the tenants limits, and if so, which streams
1163+ // from the request caused it to exceed its limits.
1164+ streamHashes := make ([]* logproto.StreamMetadata , 0 , len (streams ))
1165+ for _ , stream := range streams {
1166+ // Add the stream hash to FNV-1.
1167+ buf := make ([]byte , binary .MaxVarintLen64 )
1168+ binary .PutUvarint (buf , stream .Hash )
1169+ _ , _ = h .Write (buf )
1170+ // Add the stream hash to the request. This is sent to limits-frontend.
1171+ streamHashes = append (streamHashes , & logproto.StreamMetadata {
1172+ StreamHash : stream .Hash ,
1173+ })
1174+ }
1175+
1176+ req := logproto.ExceedsLimitsRequest {
1177+ Tenant : tenantID ,
1178+ Streams : streamHashes ,
1179+ }
1180+
1181+ // Get the limits-frontend instances from the ring.
1182+ var descs [5 ]ring.InstanceDesc
1183+ rs , err := d .limitsFrontendRing .Get (h .Sum32 (), limits_frontend_client .LimitsRead , descs [0 :], nil , nil )
1184+ if err != nil {
1185+ return nil , fmt .Errorf ("failed to get limits-frontend instances from ring: %w" , err )
1186+ }
1187+
1188+ var lastErr error
1189+ // Send the request to the limits-frontend to see if it exceeds the tenant
1190+ // limits. If the RPC fails, failover to the next instance in the ring.
1191+ for _ , instance := range rs .Instances {
1192+ c , err := d .limitsFrontends .GetClientFor (instance .Addr )
1193+ if err != nil {
1194+ lastErr = err
1195+ continue
1196+ }
1197+
1198+ client := c .(logproto.IngestLimitsFrontendClient )
1199+ resp , err := client .ExceedsLimits (ctx , & req )
1200+ if err != nil {
1201+ lastErr = err
1202+ continue
1203+ }
1204+
1205+ return resp , nil
1206+ }
1207+
1208+ return nil , lastErr
1209+ }
1210+
11101211// TODO taken from Cortex, see if we can refactor out an usable interface.
11111212func (d * Distributor ) sendStreamsErr (ctx context.Context , ingester ring.InstanceDesc , streams []* streamTracker ) error {
1112- c , err := d .pool .GetClientFor (ingester .Addr )
1213+ c , err := d .ingesterClients .GetClientFor (ingester .Addr )
11131214 if err != nil {
11141215 return err
11151216 }
@@ -1150,20 +1251,48 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
11501251 if len (stream .Stream .Entries ) == 0 {
11511252 return nil
11521253 }
1153- partitionID , err := subring .ActivePartitionForKey (stream .HashKey )
1254+
1255+ // The distributor writes stream records to one of the active partitions
1256+ // in the partition ring. The number of active partitions is equal to the
1257+ // number of ingesters.
1258+ streamPartitionID , err := subring .ActivePartitionForKey (stream .HashKey )
11541259 if err != nil {
11551260 d .kafkaAppends .WithLabelValues ("kafka" , "fail" ).Inc ()
11561261 return fmt .Errorf ("failed to find active partition for stream: %w" , err )
11571262 }
1158-
11591263 startTime := time .Now ()
1160-
1161- records , err := kafka .Encode (partitionID , tenant , stream .Stream , d .cfg .KafkaConfig .ProducerMaxRecordSizeBytes )
1264+ records , err := kafka .Encode (
1265+ streamPartitionID ,
1266+ tenant ,
1267+ stream .Stream ,
1268+ d .cfg .KafkaConfig .ProducerMaxRecordSizeBytes ,
1269+ )
11621270 if err != nil {
1163- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "fail" ).Inc ()
1271+ d .kafkaAppends .WithLabelValues (
1272+ fmt .Sprintf ("partition_%d" , streamPartitionID ),
1273+ "fail" ,
1274+ ).Inc ()
11641275 return fmt .Errorf ("failed to marshal write request to records: %w" , err )
11651276 }
11661277
1278+ // However, unlike stream records, the distributor writes stream metadata
1279+ // records to one of a fixed number of partitions, the size of which is
1280+ // determined ahead of time. It does not use a ring. The reason for this
1281+ // is that we want to be able to scale components that consume metadata
1282+ // records independent of ingesters.
1283+ metadataPartitionID := int32 (stream .HashKeyNoShard % uint64 (d .numMetadataPartitions ))
1284+ metadata , err := kafka .EncodeStreamMetadata (
1285+ metadataPartitionID ,
1286+ d .cfg .KafkaConfig .Topic ,
1287+ tenant ,
1288+ stream .HashKeyNoShard ,
1289+ )
1290+ if err != nil {
1291+ return fmt .Errorf ("failed to marshal metadata: %w" , err )
1292+ }
1293+
1294+ records = append (records , metadata )
1295+
11671296 d .kafkaRecordsPerRequest .Observe (float64 (len (records )))
11681297
11691298 produceResults := d .kafkaWriter .ProduceSync (ctx , records )
@@ -1176,10 +1305,10 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
11761305 var finalErr error
11771306 for _ , result := range produceResults {
11781307 if result .Err != nil {
1179- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "fail" ).Inc ()
1308+ d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , streamPartitionID ), "fail" ).Inc ()
11801309 finalErr = result .Err
11811310 } else {
1182- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "success" ).Inc ()
1311+ d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , streamPartitionID ), "success" ).Inc ()
11831312 }
11841313 }
11851314
0 commit comments