Datasources are versatile exposing various data in various ways using different protocols.
A few examples showing how datasources can be exposed :
IoT sensors : WiFi IP-based or using lightweight protocols such as Bluetooth, Zigbee, Lora, 5G, ...
HTTP REST API :
files :
It's up to you to use the whole framework or pick up some parts.
For example it's possible to build NGSI entities with the framework and deal with Orion on your own.
Using the pyngsi framework provides several benefits :
developers can focus on the data logic
clean code structure separates custom logic from boilerplate
streaming-oriented : stream incoming data vs store the whole dataset in memory
well unit-tested
all agents have the same structure : client and server modes
get processing statistics
get agent status : when possible i.e. server agent, long-live agent
benefit from the Python ecosystem especially the availability of many scientific libraries
In this tutorial we are going to explore the main features of pyngsi :
As of December 2020 the latest Python stable release is v3.9.1.
Python 3.8+ should be available on most platforms.
Depending on your OS iy may be already installed.
If not you'll have to install it.
You can check the version either from the terminal :
$ python --version
Python 3.8.6
or in Python code :
import sys
print(sys.version)
3.8.6 (default, Oct 6 2020, 03:22:36)
[GCC 7.5.0]
import pyngsi
print(pyngsi.__version__)
Let's assume you have collected incoming data.
For the sake of this tutorial, let's say data are collected from temperature/pressure sensors located in some rooms.
At some point these data are stored in memory and handled as Python variables or objects.
Now you are ready to translate your data to NGSI-compliant entities in order to further write them to Orion, hence completing the Fiware-based data acquisition pipeline.
Please use existing Fiware datamodels or extend them.
If any of them fit your needs then consider creating your own datamodel and submit it to the Fiware community for harmonization.
This tutorial is based on the datamodel used in the NGSI Walkthrough tutorial.
from pyngsi.ngsi import DataModel
# create a DataModel instance with its mandatory id and type properties
m = DataModel(id="Room1", type="Room")
# add the pressure attribute as an integer
m.add("pressure", 720)
# add the temperature attribute as a float
# don't forget to suffix by dot zero
# you could also use casting : (float)23
m.add("temperature", 23.0)
# print your model
print(m)
# use the pprint() method for better readability
m.pprint()
# you probably want to output this JSON format to Fiware Orion
m.json()
# dir() and help() functions are your friends
help(DataModel.add)
Each NGSI type is mapped to a Python type.
NGSI type |
Python type | Alt. Python type |
---|---|---|
Text | str | |
Integer | int | |
Float | float | |
Boolean | boolean | |
DateTime | datetime | str : add_date() |
geo:json | tuple | geojson.Point |
URL | str : add_url() | |
STRING_URL_ENCODED | str (urlencode=True) | |
json array | Sequence | |
json dictionary | dict |
# create a dateObserved attribute
from datetime import datetime
now = datetime.utcnow() # Orion expects UTC dates
m.add("dateObserved", now)
m.pprint()
# a one-liner for dealing with the current date (as above)
m.add_now("dateObserved") # overwrites the dateObserved attribute
m.pprint()
# if you already have a datetime as a well-formatted string, you can use it directly
m.add_date("dateObserved", "2020-02-27T04:52:53:000Z")
# note that adding an already existing attribute replaces the value
m.pprint()
# create a location attribute for Bordeaux
m.add("location", (-0.57918, 44.837789))
m.pprint()
# create a location attribute given a GeoJson Point
from geojson import Point
loc = Point((44.837789, -0.57918))
m.add("location", loc)
m.pprint()
# create a dataProvider attribute
m.add_url("dataProvider", "https://app.sencrop.com")
m.pprint()
# create a relationShip attribute
# the "ref" prefix is mandatory
m.add_relationship("refStore", "Shelf", "001")
m.pprint()
# create an attribute containing raw data
m.add("rawData", "Zm9yYmlkZGVuIGNoYXJhY3RlcnM=", urlencode=True)
# NGSIv2 forbidden characters : https://fiware-orion.readthedocs.io/en/1.14.0/user/forbidden_characters/index.html
# note that the equal sign '=' is part of these characters
# as '=' belongs to the base64 alphabet you MUST use urlencore=True when carrying base64 payloads
m.pprint()
# create an attribute containing a basic json array
# just provide a Python Sequence type, here a list
m.add("additionalData", [{"key1": "value1"}, {"key2": "value2"}])
m.pprint()
# create an attribute containing a basic json array
# just provide a Python Sequence type, here a list
m.add("people", {"firstname": "Tom", "lastname": "Cruise", "occupation": "Actor"})
m.pprint()
A postal address is a dedicated dictionary with property and type already set
addr = {"addressLocality": "London",
"postalCode": "EC4N 8AF",
"streetAddress": "25 Walbrook"}
m.add_address(addr)
m.pprint()
# provide a dictionary
unit = {"unitCode": "Pa"}
m = DataModel(id="Room2", type="Room")
m.add("pressure", 720, metadata=unit)
m.pprint()
# attributes are ordered in the creation order
# so if ordering is important for you just take care at the creation time
m.add("temperature", 23.0)
m.pprint()
This example is used by the Sencrop NGSI Agent that collects weather data from the Sencrop API.
Sencrop is a french company that sells connected weather stations.
The datamodel used here extends the Fiware WeatherObserved datamodel.
name = "GPMB" # Grand Port Maritime de Bordeaux
date = "2020-01-21T23:51:14.000Z" # observation date
provider = 20430 # sensor id
model = DataModel(id=f"{name}-WeatherObserved-Sencrop-{provider}-{date}", type="WeatherObserved")
model.add_url("dataProvider", "https://app.sencrop.com")
model.add("address", "Port of Bassens")
model.add("refDevice", f"device-sencrop-{name}-{provider}")
model.add_date("dateObserved", date)
model.add("windSpeed", 3.75)
model.add("windGust", 7.08)
model.add("windDirection", 59.0)
model.pprint()
Let's assume you have a NGSI entity ready to be sent to the Orion Context Broker.
Let's continue with the tutorial of the previous chapter.
# our basic NGSI entity from the previous chapter
from datetime import datetime, timedelta, timezone
from pyngsi.ngsi import DataModel
m = DataModel(id="Room1", type="Room")
m.add_now("dateObserved")
m.add("pressure", 720)
m.add("temperature", 23.0)
m.pprint()
At this point you should have an Orion server up and running.
If not you could run your own local Docker-based server, as described in the Appendix.
dir() and help() are always useful.
# there are many other sinks mainly to help develop your agent
# use help() for info
# for the moment just focus on Orion
from pyngsi.sink import SinkOrion
# have a look at the init args
help(SinkOrion.__init__)
# use our local Orion server
sink = SinkOrion()
# by default it is a silent operation
# it's ok unless an exception is raised
sink.write(m.json())
Congratulations ! You have wrote your first entity to Orion !
$ curl http://127.0.0.1:1026/v2/entities
[{"id":"Room1","type":"Room","dateObserved":{"type":"DateTime","value":"2020-03-27T07:18:39.00Z","metadata":{}},"pressure":{"type":"Integer","value":720,"metadata":{}},"temperature":{"type":"Float","value":23,"metadata":{}}}]
# update our entity : one degree more !
m.add_now("dateObserved")
m.add("temperature", 24.0)
# write again
sink.write(m.json())
$ curl http://127.0.0.1:1026/v2/entities
[{"id":"Room1","type":"Room","dateObserved":{"type":"DateTime","value":"2020-03-27T07:27:47.00Z","metadata":{}},"pressure":{"type":"Integer","value":720,"metadata":{}},"temperature":{"type":"Float","value":24,"metadata":{}}}]
Transient entities are entities carrying an expire timestamp in order for Orion to delete the entity when time is over.
expiration_timestamp = datetime.utcnow() + timedelta(days=10)
m.add_transient(expire=expiration_timestamp)
m.pprint()
# Use a TTL instead of an expire timestamp
m.add_transient(10*86400) # TTL in seconds
m.pprint()
DataModel.set_transient(timeout=10*86400) # TTL = 10 days
m = DataModel("id", "type") # automatically add the dateExpires attribute at entity creation
m.pprint()
DataModel.unset_transient() # back to default (no TTL)
The default init args target a local installation.
If needed set the hostname and port according to your Orion server.
baseurl SHOULD not be modified.
service and servicepath have to do with Fiware multi-tenancy.
When provided, Fiware-Service and Fiware-ServicePath HTTP Headers are set accordingly.
useragent is the User-Agent HTTP Header sent to Orion.
It is set by pyngsi with its own version. You can override it.
proxy allows you to configure a proxy between the agent and Orion, for debugging purpose.
token allows you to provide an OAuth2 token for Orion authentication.
Default is no authentication.
Setting a token programmatically is NOT recommended in production mode - for security reasons.
You can safely use one of the two above methods :
from pyngsi.sink import SinkOrion
sink = SinkOrion()
# we can ask Orion for its status
status = sink.status()
print(status)
Regardless of the type of data, the framework will always consume a Source the same way.
Many generic sources are provided by the framework and it's easy to create a new one.
Whatever the size of the dataset, memory will never be an issue.
Let's continue with our room sensors.
# this source provides us with a CSV line each second
from pyngsi.sources.more_sources import SourceSampleOrion
# init the source
src = SourceSampleOrion()
# iterate over the source
for row in src:
print(row)
Here we can see that a row is an instance of a Row class.
For the vast majority of the Sources, the provider keeps the same value during the datasource lifetime.
We'll go into details in next chapters.
In practice you won't iterate the Source by hand. The framework will do it for you.
Here comes the power of the framework.
By using an Agent you will delegate the processing of the Source to the framework.
Basically an Agent needs a Source for input and a Sink for output.
It also needs a function in order to convert incoming rows to NGSI entities.
Once the Agent is initialized, you can run it !
Let's continue with our rooms.
from pyngsi.sources.more_sources import SourceSampleOrion
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent
# init the source
src = SourceSampleOrion()
# for the convenience of the demo, the sink is the standard output
sink = SinkStdout()
# init the agent
agent = NgsiAgent.create_agent(src, sink)
#run the agent
agent.run()
Here you can see that incoming rows are outputted 'as is'.
It's possible because SinkStdout ouputs whatever it receives on its input.
But SinkOrion expects valid NGSI entities on its input.
So let's define a conversion function.
from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel
def build_entity(row: Row) -> DataModel:
id, temperature, pressure = row.record.split(';')
m = DataModel(id=id, type="Room")
m.add("dataProvider", row.provider)
m.add("temperature", float(temperature))
m.add("pressure", int(pressure))
return m
And use it in the Agent.
# init the Agent with the conversion function
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
# run the agent
agent.run()
# obtain the statistics
print(agent.stats)
Congratulations ! You have developed your first pyngsi Agent !
Feel free to try SinkOrion instead of SinkStdout.
Note that you get statistics for free.
Inside your conversion function you can filter input rows just by returning None.
For example, if you're not interested in Room3 you could write this function.
def build_entity(row: Row) -> DataModel:
id, temperature, pressure = row.record.split(';')
if id == "Room3":
return None
m = DataModel(id=id, type="Room")
m.add("dataProvider", row.provider)
m.add("temperature", float(temperature))
m.add("pressure", int(pressure))
return m
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
agent.run()
print(agent.stats)
As of v1.2.5 the Agent takes the side_effect()
function as an optional argument.
That function will allow to create entities aside of the main flow. Few use cases might need it.
def side_effect(row, sink, model) -> int:
# we can use as an input the current row or the model returned by our process() function
m = DataModel(id=f"Building:MainBuilding:Room:{model['id']}", type="Room")
sink.write(m.json())
return 1 # number of entities created in the function
agent = NgsiAgent.create_agent(src, sink, process=build_entity, side_effect=side_effect)
agent.run()
print(agent.stats)
This NGSI Agent developed in this chapter is a very basic one.
Principles remain the same when things become more complex.
Dealing with datasources is the most complex part because datasources are versatile :
The Source class aims at providing a common interface for all datasources.
As datasources have very little in common, the only assumption made by the framework is :
Every datasource is iterable
In practical terms, a Source uses a Python generator.
The pyngsi.sources package offers many generic sources, and it's easy to create your custom Source by extending the Source class.
A Source iterates over rows.
Rows are composed of two parts :
Theses 2 points - the iterable sources and the row definition - are the basement of the framework.
This common interface will allow us to create agents that will use our Sources, as we have seen in the previous chapter.
from pyngsi.sources.source import Row
help(Row.__init__)
Sources are provided in the pyngsi.sources
package.
The Source
base class as well as the Row
class are defined in pyngsi.sources.source
.
Source
is the Source base class
SourceStream
takes incoming plain text data from any iterable input : may be an opened file descriptor or any Python sequence such as a list or a tuple
SourceSdtin
is a shortcut for SourceStream(sys.stdin) and takes data from the standard input
SourceSingle
takes incoming data from a single (non-iterable) element
SourceMany
combines many sources
SourceJson
takes incoming JSON data from a JSON string
If the incoming JSON is a JSON Array then SourceJson iterates over the array
The jsonpath
argument points the array to be iterated in the JSON structure (i.e. path = ["cars","models"])
SourceSampleOrion
is the Source dedicated to the tutorial found in the Orion walkthrough
SourceMicrosoftExcel
is a Source that can read xlsx files
It is given as an example to write your own custom Source classes
SourceFtp
retrieves incoming data from FTP servers
Server
is a base class for Sources acting as a server
This kind of datasources are not requested by the Agent
ServerHttpUpload
is an HTTP server
Each time it receives content on its upload
endpoint, it acts as a Source that feeds data to the Agent
ServerUdp
is an UDP server
Here the Source takes incoming data from a compressed JSON file.
As the JSON is an array, the Source iterates over each row of the JSON Array.
The provider is filled with the name of the file.
from pyngsi.sources.source import Source
# returns a SourceJson guessed from the extension
src = Source.from_file("files/colors.json.gz")
for row in src:
print(row)
To process local files in batch mode, you may also use from_files()
, from_glob()
and from_globs()
.
This example runs only on a local Jupyter notebook (needs network access)
A quite complex example made easy.
The FTP server used in the example serves RFC files.
Each RFC comes with a brief description in a JSON file.
Have a look at rfc959.json in the files/ folder.
We will output NGSI entities to Orion with a great datamodel exposing the title, the date of publication and the number of pages.
We will focus only on RFC958 and RFC2228 speaking about the FTP protocol.
from datetime import datetime
from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel
def build_entity(row: Row) -> DataModel:
rfc = row.record
m = DataModel(id=rfc["doc_id"], type="RFC")
m.add("dataProvider", row.provider)
m.add("title", rfc["title"])
m.add("publicationDate", datetime.strptime(rfc["pub_date"], "%B %Y"))
m.add("pageCount", int(rfc["page_count"]))
return m
from pyngsi.sources.source_ftp import SourceFtp
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent
# help(SourceFtp) for more info
src = SourceFtp("ftp.ps.pl", paths=[
"/pub/rfc"], f_match=lambda x: x.endswith("rfc958.json") or x.endswith("rfc2228.json"))
# if you have an Orion server available, just replace SinkStdout() with SinkOrion()
sink = SinkStdout()
# the source has auto-detected that we deal with JSON files, hence has parsed json for us
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
agent.run()
# resources are freed
# here the agent removes the temporary directory (where files were downloaded).
agent.close()
# get statistics
print(agent.stats)
In this example (as we run a server) some code cells are not executable.
You can reproduce it locally on your computer.
This time the Agent doesn't get incoming data from the datasource.
In server mode, the Agent is requested by the datasource.
From the datasource point of view we could also call that push mode.
In this example our temperature/pressure sensors store measures locally and a JSON file is sent periodically to the Agent.
from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel
def build_entity(row: Row) -> DataModel:
r = row.record
m = DataModel(id=r["room"], type="Room")
m.add("temperature", r["temperature"])
m.add("pressure", r["pressure"])
return m
from pyngsi.sources.server import ServerHttpUpload
from pyngsi.sink import SinkOrion
from pyngsi.agent import NgsiAgent
# help(ServerHttpUpload) for more info
src = ServerHttpUpload()
# init ths sink
sink = SinkOrion()
# the agent processes JSON content received from the client
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
# run the server
agent.run()
$ curl -X POST -H "Content-Type: application/json" -d '{"room":"Room1","temperature":23.0,"pressure":710}' http://127.0.0.1:8880/upload
{
"name": "pyngsi",
"version": "2.1.0"
}
{
"ngsi_stats": {
"error": 0,
"filtered": 0,
"input": 6,
"output": 6,
"processed": 6
},
"server_status": {
"calls": 6,
"calls_error": 0,
"calls_success": 6,
"lastcalltime": "Fri, 27 Mar 2020 14:17:12 GMT",
"starttime": "Fri, 27 Mar 2020 14:17:03 GMT"
}
}
A source is essentially an iterable.
In practice it is based on a Python generator.
from pyngsi.sources.source import Source, Row
class CustomSource(Source):
def __init__(self, rooms):
self.rooms = rooms
def __iter__(self):
for record in self.rooms:
yield Row("custom", record)
Let's use it
# our CSV lines
rooms = ["Room1;23;720", "Room2;21;711"]
# init the source
src = CustomSource(rooms)
# consume the source and print rows
for row in src:
print(row)
A Source offer some facilities do deal with generators.
# extend our rooms
rooms *= 3
print(rooms)
for row in src.limit(3):
print(row)
The Scheduler class allows to run an agent periodically.
Of course you could write an agent then schedule executions using an external scheduler such as cron.
The benefits of scheduling with the framework are :
from pyngsi.ngsi import DataModel
from pyngsi.sources.source import SourceSingle, Row
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent
dummy = DataModel(id="dummy", type="dummy")
src = SourceSingle("dummy")
sink = SinkStdout()
# init the dummy agent
agent = NgsiAgent.create_agent(src, sink, lambda x: dummy)
# run once
agent.run()
from pyngsi.scheduler import Scheduler, UNIT
scheduler = Scheduler(agent, interval=5, unit=UNIT.seconds)
scheduler.run()
{
"name": "pyngsi",
"version": "2.1.0"
}
{
"ngsi_stats": {
"error": 0,
"filtered": 0,
"input": 6,
"output": 6,
"processed": 6
},
"server_status": {
"calls": 6,
"calls_error": 0,
"calls_success": 6,
"lastcalltime": "Fri, 27 Mar 2020 14:17:12 GMT",
"starttime": "Fri, 27 Mar 2020 14:17:03 GMT"
}
}
We need tools to debug, display sink output, troubleshoot communication with Orion.
pyngsi relies on loguru for logging.
You can set the logging level :
by using the LOGURU_LEVEL environment variable
programmatically :
from loguru import logger
logger.remove()
logger.add(sys.stderr, level="DEBUG")
Sometimes you need to display the output of your agent.
You can replace Orion with other Sinks. You already know SinkStdout.
SinkFile and SinkFileGzipped can also be useful.
$ curl -X PUT -s -S "http://127.0.0.1:1026/admin/log?level=DEBUG"
$ curl http://127.0.0.1:1026/v2/entities | jq .
sink = SinkOrion(proxy="http://127.0.0.1:8080")
Then use an HTTP intercepting proxy such as Burp Suite to monitor traffic towards Orion.
The recommanded way is to use a Python virtual environment to manage your dependencies.
$ python3.8 -m venv venv
$ source venv/bin/activate
Sometimes while building your NGSI entity you'll find out that you need a context.
Meaning you cannot build your entity just from incoming data.
A clean solution is to use a simple class to hold your context.
In this basic example we want to populate the address in the NGSI entity.
class DummyProcessor:
def __init__(self, adress: str):
self.address = address
def build_entity(self, row: Row) -> DataModel:
r = row.record
model = DataModel(id="dummy", type="dummy")
model.add_url("dataProvider", row.provider)
model.add("address", row.record.anyvalue)
return m
Then in the code of the Agent you can use the processor like this :
processor = DummyProcessor("Port of Bassens")
agent = NgsiAgent.create_agent(source, sink, process=processor.build_entity)
You'll probably want to have unit tests.
Thanks to pyngsi you can quickly create a SourceTest to mock your real Source.
this tutorial
dir() and help()
run the examples in the examples/ folder of pyngsi
read the code of NGSI agents developed for PIXEL
read unit tests
read the code of pyngsi on github
readthedocs : TODO
Retrieve the docker-compose file on the github repo.
$ docker-compose up -d
On the first time, it will pull the images for the Docker repository.
Orion uses mongodb as a backing database.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e994bef626d0 fiware/orion "/usr/bin/contextBro…" 10 hours ago Up 10 hours 0.0.0.0:1026->1026/tcp orion
7c7a6cf3a078 mongo "docker-entrypoint.s…" 10 hours ago Up 10 hours 27017/tcp oriondb
$ docker logs -f orion
Refer to Chapter 6 to modify the log level in Orion.
$ docker-compose down
One can add Sources and Sinks to the framework.
Just extend the corresponding classes.
You can add a Source that is dedicated to your custom data and your custom agent.
SourceMicrosoftExcel
is given as an example.
Let's say you've just created your own Source capable of parsing xlsx files and yielding CSV-like rows from it.
from pyngsi.sources.more_sources import SourceMicrosoftExcel
src = SourceMicrosoftExcel("files/room.xlsx")
for row in src:
print(row)
Now you'd like to use it in all situations, especially if the xlsx files you're interested in are remotely.
You'll have to register the source.
from pyngsi.sources.more_sources import SourceMicrosoftExcel
# global registering
Source.register(SourceMicrosoftExcel)
# for the purpose of the tutorial the file room.xlsx is stored locally
src = Source.from_glob("files/*.xlsx")
for row in src:
print(row)
Source.unregister()
As an alternative you might want to register only the xlsx extension, allowing to handle multiple formats at the same time.
from pyngsi.sources.source import Source
from pyngsi.sources.more_sources import SourceMicrosoftExcel
# register extension
Source.register_extension("xlsx", SourceMicrosoftExcel)
# for the purpose of the tutorial the files room.xlsx and room.csv are stored locally
src = Source.from_glob("files/room.*")
for row in src:
print(row)
If you wish to add a generic Source that could be reused in other agents, please consider contributing to the project.