Streams
Stream commands for append-only log data structures with consumer groups.
Commands
Section titled âCommandsâBasic Operations
Section titled âBasic Operationsâ| Command | Syntax | Description |
|---|---|---|
| XADD | XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] *|ID field value [field value ...] | Add entry |
| XLEN | XLEN key | Get stream length |
| XRANGE | XRANGE key start end [COUNT count] | Get entries by ID range |
| XREVRANGE | XREVRANGE key end start [COUNT count] | Get entries in reverse |
| XREAD | XREAD [COUNT count] [BLOCK ms] STREAMS key [key ...] ID [ID ...] | Read from streams |
| XTRIM | XTRIM key MAXLEN|MINID [=|~] threshold | Trim stream |
| XDEL | XDEL key ID [ID ...] | Delete entries |
| XINFO STREAM | XINFO STREAM key | Get stream info |
Consumer Groups
Section titled âConsumer Groupsâ| Command | Syntax | Description |
|---|---|---|
| XGROUP CREATE | XGROUP CREATE key groupname ID [MKSTREAM] | Create consumer group |
| XGROUP DESTROY | XGROUP DESTROY key groupname | Delete consumer group |
| XGROUP SETID | XGROUP SETID key groupname ID | Set groupâs last ID |
| XREADGROUP | XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms] [NOACK] STREAMS key [key ...] ID [ID ...] | Read as consumer |
| XACK | XACK key group ID [ID ...] | Acknowledge entries |
| XPENDING | XPENDING key group [start end count [consumer]] | Get pending entries |
| XCLAIM | XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms] [RETRYCOUNT count] [FORCE] | Claim pending entries |
| XINFO GROUPS | XINFO GROUPS key | List consumer groups |
| XINFO CONSUMERS | XINFO CONSUMERS key groupname | List consumers |
Examples
Section titled âExamplesâBasic Operations
Section titled âBasic Operationsâ# Add entry (auto-generated ID)127.0.0.1:6379> XADD mystream * sensor_id "123" temperature "25.5""1704067200000-0"
# Add entry with specific ID127.0.0.1:6379> XADD mystream 1704067201000-0 sensor_id "123" temperature "26.0""1704067201000-0"
# Get stream length127.0.0.1:6379> XLEN mystream(integer) 2
# Read all entries127.0.0.1:6379> XRANGE mystream - +1) 1) "1704067200000-0" 2) 1) "sensor_id" 2) "123" 3) "temperature" 4) "25.5"2) 1) "1704067201000-0" 2) 1) "sensor_id" 2) "123" 3) "temperature" 4) "26.0"Range Queries
Section titled âRange Queriesâ# Get last 5 entries127.0.0.1:6379> XREVRANGE mystream + - COUNT 5
# Get entries after specific ID127.0.0.1:6379> XRANGE mystream 1704067200000-0 +
# Get entries before specific ID127.0.0.1:6379> XRANGE mystream - 1704067201000-0Reading Streams
Section titled âReading Streamsâ# Read new entries (from last seen ID)127.0.0.1:6379> XREAD STREAMS mystream 01) 1) "mystream" 2) 1) 1) "1704067200000-0" 2) 1) "sensor_id" 2) "123" 3) "temperature" 4) "25.5"
# Read from multiple streams127.0.0.1:6379> XREAD STREAMS stream1 stream2 0 0
# Blocking read (server mode only)127.0.0.1:6379> XREAD BLOCK 5000 STREAMS mystream $# Waits up to 5 seconds for new entriesStream Management
Section titled âStream Managementâ# Trim to max 1000 entries127.0.0.1:6379> XTRIM mystream MAXLEN 1000(integer) 500 # Entries removed
# Approximate trim (faster)127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000
# Delete specific entry127.0.0.1:6379> XDEL mystream 1704067200000-0(integer) 1
# Get stream info127.0.0.1:6379> XINFO STREAM mystreamConsumer Groups
Section titled âConsumer Groupsâ# Create consumer group (start from beginning)127.0.0.1:6379> XGROUP CREATE mystream mygroup 0OK
# Create group starting from now127.0.0.1:6379> XGROUP CREATE mystream mygroup $ MKSTREAMOK
# Read as consumer127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1704067200000-0" 2) 1) "sensor_id" 2) "123" 3) "temperature" 4) "25.5"
# Acknowledge processed entries127.0.0.1:6379> XACK mystream mygroup 1704067200000-0(integer) 1
# Check pending entries127.0.0.1:6379> XPENDING mystream mygroup1) (integer) 5 # Total pending2) "1704067200000-0" # Smallest ID3) "1704067205000-0" # Largest ID4) 1) 1) "consumer1" 2) "5"Claiming Messages
Section titled âClaiming Messagesâ# Claim entries idle for more than 1 minute127.0.0.1:6379> XCLAIM mystream mygroup consumer2 60000 1704067200000-01) 1) "1704067200000-0" 2) 1) "sensor_id" 2) "123"Library Mode (Rust)
Section titled âLibrary Mode (Rust)âuse redlite::Db;
let db = Db::open("mydata.db")?;
// Add entrylet id = db.xadd("mystream", "*", &[("field1", b"value1")])?;
// Read entrieslet entries = db.xrange("mystream", "-", "+", None)?;
// Read from multiple streamslet results = db.xread(&[("mystream", "0")])?;
// Stream infolet info = db.xinfo_stream("mystream")?;
// Consumer groups (require mutable reference)db.xgroup_create("mystream", "mygroup", "0", false)?;let entries = db.xreadgroup("mygroup", "consumer1", &[("mystream", ">")], None)?;db.xack("mystream", "mygroup", &["1704067200000-0"])?;Use Cases
Section titled âUse CasesâEvent Sourcing
Section titled âEvent Sourcingâ# Record eventsXADD events * type "order_created" order_id "123" user_id "456"XADD events * type "payment_received" order_id "123" amount "99.99"XADD events * type "order_shipped" order_id "123" tracking "ABC123"
# Replay events to rebuild stateXRANGE events - +Real-Time Analytics
Section titled âReal-Time Analyticsâ# Log page viewsXADD pageviews * url "/products" user_id "u:100" timestamp "2024-01-01T12:00:00Z"
# Read recent activityXREVRANGE pageviews + - COUNT 100Message Queues with Acknowledgment
Section titled âMessage Queues with Acknowledgmentâ# Producer adds messagesXADD tasks * type "email" to "user@example.com" subject "Hello"
# Consumer group for reliable processingXGROUP CREATE tasks workers 0 MKSTREAM
# Worker reads and processesXREADGROUP GROUP workers worker1 COUNT 1 STREAMS tasks ># ... process message ...XACK tasks workers 1704067200000-0IoT Sensor Data
Section titled âIoT Sensor Dataâ# Sensors push dataXADD sensor:temp:room1 * value "23.5" unit "celsius"XADD sensor:humidity:room1 * value "45" unit "percent"
# Keep last 24 hours of dataXTRIM sensor:temp:room1 MINID (now-86400000)