Kinesis In A Nutshell
Background
Kinesis is a high capacity real time data pipe.
What does it mean ?
Let's look at some simple scenarios, most of the time we log data hoping to analyse them at some point in time. Our log files keep growing and they get stored somewhere. What if we can analyse them real time and archive them once they are analysed. This will reduce our storage costs because we can archive logs to cheaper storage once they've been analysed, we will also need be able to monitor our logs real time for anomalies, performance, threats etc. This will ensure that we are proactive rather than reactive to changes and vulnerabilities.

Kinesis Example
Think of another scenario where a weather balloon would send real time data through an IOT device where the data needs to be analysed real time.
The data mentioned above can be sent to a data pipe, the data can be analysed real time for instance by a Lambda function when a data input event is triggered.
Basically a data pipe stores a sequence of events within a pipe, the end point of the pipe being a data analytics engine or similar.
Welcome to Kinesis, Kinesis is a high capacity real time data pipe or an event streaming service. This is similar in principle to an Apache Kafka Cluster. https://kafka.apache.org/

Streams Firehose Analytics
AWS has created variants of Kinesis known as Streams, Firehose and Analytics.
Kinesis streams
Kinesis streams can be utilized as mentioned in our previous examples to stream data to our own analytics engines or programs to analyse data.
Kinesis Firehose
Kinesis Firehose can be utilized either to store streams of data to S3 (An AWS object data store service), AWS Redshift (An AWS Data Warehouse service that is horizontally scalable) or AWS ElasticSearch (An AWS search service that search, analyze, and visualize data).
Kinesis Data Analytics
Enables to query and analyse data utilizing SQL queries or Java based applications.

Shards
Shards are storage partitions that store the stream's incoming data records within the main pipe with a unique access key known as the partition key, partition keys are used to group data by shards within a stream. Data streams records can be stored across these shards. Shards enable better throughput and faster access to data records. You can think of shards as horizontally scalable data partitions that enhance data throughput. When you stream data events to the Kinesis pipe the data gets stored across these shards. You can also configure a pipe with a single shard if necessary.
The proportionality of data throughput and performance is based on the number of shards being consumed.
Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). Obivoiusly these numbers will change because AWS enhance their backend services continuously.

Streams And Data Records
Streams
A stream is a uniquely identifiable Events stream of data records. The stream comprises a unique name and it will have an events stream of related data records. (Each event consists of a data record).
Data is stored within the pipeline for 24 hours by default, this can be extended to 7 days if necessary. The data stream can be replayed from the start or from a new stream or data known as the latest. You can also think of a Kinesis pipeline as a multitrack recorder that stores data within each track(shard). The data gets deleted after 24hours (Default) once the listener has consumed or analysed the data. The song can be replayed from the start or while a new song is being recorded.
The start of stored data is called the trim horizon and can be replayed from the start (trim horizon), or a play can be triggered with new data.
Data Record
A data record is a unit of data stored in a data stream, a data record consists of the following :
Partition Key (The shard key) provided by the source with the blob of data will determine the placement of the data within multiple shards.
For example:
If we have three blobs of data with corresponding partition keys "A" "B" and "C"
If we have a single shard then all blobs of data will get stored on the same shard.
If we have 2 shards "A" and "C" maybe stored on shard 1 and "B" maybe stored on shard 2. Hence through the partition key Kinesis is able to determine the placement and retrieval of data.
Sequence Number (The sequence number of the data event this is assigned by Kinesis based on the sequences of the event stream).
Data Blob (The actual data).
Note: Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.
AWS CLI Examples:
List Existing Kinesis Streams
aws kinesis list-streams
{
"StreamNames": []
}
Returns an empty JSON array since there aren't any shards at present on the AWS account I am currently working on.
Create Kinesis Stream Named Test With One Shard.
aws kinesis create-stream --stream-name test --shard-count 1
List Created Kinesis Stream
aws kinesis list-streams
{
"StreamNames": [
"test"
]
}
The describe stream command shows the configuration of the stream.
aws kinesis describe-stream --stream-name test
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "34028236692093846346
3374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "495922356109080808
12900514258860256679432
992322846982146"
}
}
],
"StreamARN": "arn:aws:kinesis:eu-west-2:aws-account-number:stream/test",
"StreamName": "test",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
]
}
}
Note: The hashing number can range from 0 through
3402823669209384634633
74607431768211455
And the starting sequence number assigned by Kinesis is
495922356109080808129005
142588602566794
32992322846982146
Now We Can Put A Data Record With A Partition Key Of Our Choice.
aws kinesis put-record --stream-name test --data 'happy streaming' --partition-key 'A'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49592257543780186548
5624964608323
69797253790639069331458"
}
Note: To query records we need to get the value of what is called the iterator for that shard.
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test
{
"ShardIterator": "AAAAAAAAAAFIXgF12s3
AQvjl6Xy5ure+4U5IeHj
DXZZrolyrIUdFCkjzmf
NcqplpikcXxMuGg
YqDmMr0zRiwvSB/bN
5C8hUULOVeZFVIQAeF5Q
cD5m4lLX+nHDpTRR
9tKRqqcdENNk
vMabOT1gj241sxjwLusK
+N4RPos0FPRVJfzgJPwo6
RyvR74omdczuCGRTp
7eBc33iJPHeCKWYH6U
+VbWUqyCKc"
}
Now to get the records
aws kinesis get-records --shard-iterator
AAAAAAAAAAFIXgF
12s3AQvjl6Xy5ure+4U5IeHjD
XZZrolyrIUdFCkjzmfNcqp
lpikcXxMuGg
YqDmMr0zRiwvSB/bN
5C8hUULOVeZFVIQAeF5Q
cD5m4lLX+nHDpTRR9
tKRqqcdENNkv
MabOT1gj241sxjwLusK
+N4RPos0FPRVJfzgJPwo6
RyvR74omdczuCGRTp7e
Bc33iJPHeCKW
YH6U+VbWUqyCKc
{
"Records": [
{
"SequenceNumber": "49592257543780186
54856249646083236979
7253790639069331458",
"ApproximateArrivalTimestamp": 1548155258.219,
"Data": "aGFwcHkgc3RyZWFtaW5n",
"PartitionKey": "A"
}
],
"NextShardIterator": "AAAAAAAAAAFqJg5A62FHHl
TdjkZ6M0mAtse
Rrg4PLjYiRW8IrSnyAowKM0YV
zG9u2ilK6ZbQnl
iTd8LqtDUK6jNHGNaRtHi/xGN
jpZ0a999oiwp/
UGBxX3i69VN
bx937On00bKsiRzoidyE8oaIDyj
QGEtbTygH1UJS
pv98zJDIyLvfL+wB7zUqrWjLcK
wPd1xEJp2Yv
dqEAeyvcnpST
VsEd1PV5/ftA",
"MillisBehindLatest": 0
}
Please note that the records get Base64 encoded so we need to decode the above value.
"Data": "aGFwcHkgc3RyZWFtaW5n"
We can run the following Python script to decode the above Base64 to Binary
import base64
a = 'aGFwcHkgc3RyZWFtaW5n'
b= base64.b64decode(a)
print(b)
'happy streaming'
Please note that the iterator expires after 5 minutes.
Now let's delete the stream
aws kinesis delete-stream --stream-name test