1 of 58

Local Weather Visualisation with IoT, Kafka and CrateDB

Mika Naylor

mika@crate.io

@autophagyDev

2 of 58

INTRODUCTION

3 of 58

WHY BUILD A WEATHER STATION?

4 of 58

5 of 58

Temperature/Humidity�Barometric Pressure

Luminosity

Microcontroller

DHT22

BMP180

TSL2561�ESP8266

6 of 58

7 of 58

8 of 58

ACT 1

A SIMPLE PROTOTYPE

9 of 58

// Libraries

#include <DHT.h>

#include <Adafruit_Sensor.h>

#include <Adafruit_TSL2561_U.h>

#include <Adafruit_BMP085_U.h>

#include <ESP8266WiFi.h>

#include <ArduinoJson.h>

// Constants

#define DHTPIN 12 // DHT22 connected to pin 12

#define DHTTYPE DHT22

DHT dht(DHTPIN, DHTTYPE);

Adafruit_TSL2561_Unified tsl = Adafruit_TSL2561_Unified(TSL2561_ADDR_LOW, 12345);

Adafruit_BMP085_Unified bmp = Adafruit_BMP085_Unified(10085);

Setup

10 of 58

dht.begin();

tsl.enableAutoRange(true);

tsl.setIntegrationTime(TSL2561_INTEGRATIONTIME_13MS);

bmp.begin();�

WiFi.begin(ssid, password);�while (WiFi.status() != WL_CONNECTED) {

delay(500);

}

Setup

11 of 58

temperature = dht.readTemperature();

humidity = dht.readHumidity();

sensors_event_t bmp_event;

bmp.getEvent(&bmp_event);

pressure = bmp_event.pressure;

sensors_event_t tsl_event;

tsl.getEvent(&tsl_event);

lux = tsl_event.light;

Loop

12 of 58

StaticJsonBuffer<256> JSONbuffer;

JsonObject& JSONencoder = JSONbuffer.createObject();

JSONencoder["temperature"] = temperature;

JSONencoder["humidity"] = humidity;

JSONencoder["pressure"] = pressure;

JSONencoder["luminosity"] = lux;

char JSONmessageBuffer[256];

JSONencoder.prettyPrintTo(JSONmessageBuffer, sizeof(JSONmessageBuffer));

Loop

13 of 58

int code = -1;

HTTPClient http;

while(code != 201) {

http.begin(host, fingerprint);

http.addHeader("Content-Type", "application/json");

code = http.POST(JSONmessageBuffer);

http.end();

delay(1000);

}

delay(sensor_interval);

Loop

14 of 58

15 of 58

16 of 58

17 of 58

18 of 58

CREATE TABLE IF NOT EXISTS "doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT�)�CLUSTERED INTO 4 SHARDS

node-0

shard-0

node-1

shard-1

node-3

shard-3

node-2

shard-2

19 of 58

CREATE TABLE IF NOT EXISTS "doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT�)�CLUSTERED INTO 4 SHARDS

WITH (number_of_replicas = 1)

node-0

shard-0

node-1

shard-1

replica-1

replica-3

node-3

shard-3

replica-2

node-2

shard-2

replica-0

20 of 58

CREATE TABLE IF NOT EXISTS "doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT�)�CLUSTERED INTO 4 SHARDS

WITH (number_of_replicas = 1)

node-0

node-1

shard-1

replica-3

node-3

shard-3

replica-2

node-2

shard-2

replica-0

replica-1

shard-0

shard-1

replica-3

21 of 58

value

timestamp

12.4

1532080800000

68.2

1532080850000

107

1532080900000

12.2

1526810400000

69.0

1526810423000

110

1526810434000

11.9

1524219010000

69.1

1524219140000

109

1524219230000

CREATE TABLE IF NOT EXISTS "doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT�)�CLUSTERED INTO 4 SHARDS

WITH (number_of_replicas = 1)

22 of 58

value

timestamp

month

12.4

1532080800000

1532080800000

68.2

1532080850000

1532080800000

107

1532080900000

1532080800000

12.2

1526810400000

1526810400000

69.0

1526810423000

1526810400000

110

1526810434000

1526810400000

11.9

1524219010000

1524218400000

69.1

1524219140000

1524218400000

109

1524219230000

1524218400000

CREATE TABLE IF NOT EXISTS

"doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT,� "month" TIMESTAMP GENERATED ALWAYSAS date_trunc('month', "timestamp")�)�CLUSTERED INTO 4 SHARDS

WITH (number_of_replicas = 1)

23 of 58

value

timestamp

month

12.4

1532080800000

1532080800000

68.2

1532080850000

1532080800000

107

1532080900000

1532080800000

12.2

1526810400000

1526810400000

69.0

1526810423000

1526810400000

110

1526810434000

1526810400000

11.9

1524219010000

1524218400000

69.1

1524219140000

1524218400000

109

1524219230000

1524218400000

CREATE TABLE IF NOT EXISTS

"doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT,� "month" TIMESTAMP GENERATED ALWAYSAS date_trunc('hour', "timestamp")�)�CLUSTERED INTO 4 SHARDS

PARTITIONED BY ("month")

WITH (number_of_replicas = 1)

24 of 58

CREATE TABLE IF NOT EXISTS "doc"."sensordata" (� "timestamp" TIMESTAMP,� "value" FLOAT,� "month" TIMESTAMP GENERATED ALWAYSAS date_trunc('hour', "timestamp")�)�CLUSTERED INTO 4 SHARDS

PARTITIONED BY ("month")

WITH (number_of_replicas = 1)

node-0

node-1

node-3

node-2

25 of 58

26 of 58

27 of 58

@main.route("/api/insert", methods=["POST"])

@validate_schema(insert_sensor_schema)

def insert():

data = json.loads(request.data)

timestamp = datetime.utcnow()

db_data = SensorData(

timestamp=timestamp,

temperature=data["temperature"],

humidity=data["humidity"],

pressure=data["pressure"],

luminosity=data["luminosity"],

)

db.session.add(db_data)

db.session.commit()

return "", 201

28 of 58

@main.route("/api/latest", methods=["GET"])

def latest():

latest_results = COORDINATES

latest = db.session.query(SensorData).order_by(desc("timestamp")).first()

if (latest is not None):

latest_results.update(latest.dict())

return jsonify(latest_results)

29 of 58

mika@autophagy > curl localhost:8080/api/latest

{

"latitude": 52.489,

"longitude": 13.354,

“temperature”: 11.2,

“humidity”: 91,

“pressure”: 1012,

“lux”: 107

}

30 of 58

@main.route("/api/<string:sensor>/since/<int:ts>", methods=["GET"])

def sensorReadingsSinceTimestamp(sensor, ts):

if sensor not in VALID_SENSOR_TYPES:

abort(404)

dt = datetime.utcfromtimestamp(ts / 1000)

values = db.session.query(getattr(SensorData, sensor), SensorData.timestamp).filter(

SensorData.timestamp > dt

).order_by(

asc("timestamp")

).all()

return jsonify(values)

31 of 58

mika@autophagy > curl localhost:8080/api/temperature/since/1531919792197

[� { "timestamp": 1531920179000, "value": 12.0 }, � { “timestamp": 1531920180000, "value": 12.4 }, � { "timestamp": 1531920181000, "value": 12.2 },

...�]

32 of 58

33 of 58

INTERLUDE

34 of 58

SCALABILITY

35 of 58

FRIENDS WANTED A REGN

STATION FOR THEIR

OWN PURPOSES

36 of 58

ATOMISED SENSORS

or

A SENSOR NETWORK

37 of 58

ACT 2

THE GANG MAKES A WEATHER

STATION NETWORK

38 of 58

Cargo Cult�Berlin

39 of 58

Credibility Problem�London

40 of 58

Zero Gravitas�Manchester

41 of 58

Experiencing A Significant Gravitas Shortfall�Barcelona

42 of 58

Funny, It Worked The Last Time...�Seattle

43 of 58

My Stuff

44 of 58

Not My Stuff

My Stuff?

Our Stuff?

45 of 58

Not My Stuff

My Stuff?

Our Stuff?

46 of 58

MQTT

MQTT

MQTT

47 of 58

48 of 58

// Libraries

#define MQTT_MAX_PACKET_SIZE 256

#include <PubSubClient.h>

// Constants

WiFiClient espClient;

PubSubClient client(espClient);

Setup

49 of 58

void callback(char* topic, byte* payload, unsigned int length) {

Serial.print("Received Message :: ");

Serial.print(topic);

for (int i = 0; i < length; i++) {

Serial.print((char)payload[i]);

}

Serial.println();

}

Setup

50 of 58

client.setServer(mqtt_server, 1883);

client.setCallback(callback);

if (client.connect("zero_gravitas")) {

client.subscribe("regn/zero_gravitas");

}

Setup

51 of 58

StaticJsonBuffer<256> JSONbuffer;

JsonObject& JSONencoder = JSONbuffer.createObject();

JSONencoder["latitude"] = sensor_latitude;

JSONencoder["longitude"] = sensor_longitude;

JSONencoder["temperature"] = temperature;

JSONencoder["humidity"] = humidity;

JSONencoder["pressure"] = pressure;

JSONencoder["luminosity"] = lux;

char JSONmessageBuffer[256];

JSONencoder.prettyPrintTo(JSONmessageBuffer, sizeof(JSONmessageBuffer));

client.publish("regn", data);

delay(sensor_interval);

Loop

52 of 58

topic :: regn

ID :: zero_gravitas

{

"latitude": 52.489

"longitude" 13.354,

"temperature": 11.2,

"humidity": 91.0,

"pressure": 1012.0,

"luminosity": 107.0

}

topic

regn-zero_gravitas

CONSUMER

CONSUMER

CONSUMER

CONSUMER

53 of 58

CONCLUSION

LESSONS // FUTURE WORK

54 of 58

ESP8266 IS A GREAT INTRO

TO IOT PROJECTS

55 of 58

TIGHTLY COUPLED IOT DATA

PIPELINES ARE KINDA BAD

56 of 58

CRATEDB & KAFKA ARE

PRETTY AWESOME

57 of 58

FUTURE

LEVERAGE MQTT MORE

BROADEN CRATEDB INGESTION

EXPERIMENT WITH OUR DATA!

58 of 58

Twitter

Github

Email

@autophagyDev

autophagy

mika@crate.io