Chapter 1 : What is pyngsi ?

pyngsi is a Python framework that allows to write a data-acquisition pipeline for Fiware.

Documentation for old 1.x versions can be found here

Introduction

Datasources are versatile exposing various data in various ways using different protocols.

A few examples showing how datasources can be exposed :

  • IoT sensors : WiFi IP-based or using lightweight protocols such as Bluetooth, Zigbee, Lora, 5G, ...

  • HTTP REST API :

    • exposed on the datasource side : the agent requests the API to retrieve data (client mode)
    • exposed on the agent side : data are posted from the datasource to the agent (server mode)
  • files :

    • plain text, json, xml, maybe compressed (gz, tgz, zip, ...)
    • carried over HTTP, FTP, ...

The framework enables developers to build custom NGSI Agents that :

  • process data using a common interface regardless of the type of the datasource
  • convert custom data to NGSI entities based on Fiware datamodels
  • write NGSI entities to the Orion Context Broker

It's up to you to use the whole framework or pick up some parts.
For example it's possible to build NGSI entities with the framework and deal with Orion on your own.

Using the pyngsi framework provides several benefits :

  • developers can focus on the data logic

  • clean code structure separates custom logic from boilerplate

  • streaming-oriented : stream incoming data vs store the whole dataset in memory

  • well unit-tested

  • all agents have the same structure : client and server modes

  • get processing statistics

  • get agent status : when possible i.e. server agent, long-live agent

  • benefit from the Python ecosystem especially the availability of many scientific libraries

In this tutorial we are going to explore the main features of pyngsi :

  • Chapter 1 : How to use the DataModel class to build NGSI entities for your custom data
  • Chapter 2 : How to use the SinkOrion class to write to the Orion Context Broker
  • Chapter 3 : Write your first Agent
  • Chapter 4 : More on datasources
  • Chapter 5 : How to schedule the execution of an agent
  • Chapter 6 : How to debug, display, troubleshoot
  • Chapter 7 : Tips for real use-case agents
  • Chapter 8 : How to extend the framework
  • Appendix : How to run a local Docker-based Orion instance

Pre-requisite

Python 3.8+

As of December 2020 the latest Python stable release is v3.9.1.
Python 3.8+ should be available on most platforms.
Depending on your OS iy may be already installed.
If not you'll have to install it.

You can check the version either from the terminal :

$ python --version
Python 3.8.6

or in Python code :

import sys

print(sys.version)                                                                                                           
3.8.6 (default, Oct  6 2020, 03:22:36) 
[GCC 7.5.0]

Check your pyngsi version

In [ ]:
import pyngsi

print(pyngsi.__version__)
\pagebreak

Chapter 2 : Build NGSI datamodels

Let's assume you have collected incoming data.
For the sake of this tutorial, let's say data are collected from temperature/pressure sensors located in some rooms.
At some point these data are stored in memory and handled as Python variables or objects.
Now you are ready to translate your data to NGSI-compliant entities in order to further write them to Orion, hence completing the Fiware-based data acquisition pipeline.
Please use existing Fiware datamodels or extend them.
If any of them fit your needs then consider creating your own datamodel and submit it to the Fiware community for harmonization.

You very first DataModel

This tutorial is based on the datamodel used in the NGSI Walkthrough tutorial.

Import the DataModel class

In [ ]:
from pyngsi.ngsi import DataModel

Create a basic datamodel with two attributes

In [ ]:
# create a DataModel instance with its mandatory id and type properties
m = DataModel(id="Room1", type="Room")

# add the pressure attribute as an integer
m.add("pressure", 720)

# add the temperature attribute as a float
# don't forget to suffix by dot zero
# you could also use casting : (float)23
m.add("temperature", 23.0)

Display result

In [ ]:
# print your model
print(m)
In [ ]:
# use the pprint() method for better readability
m.pprint()
In [ ]:
# you probably want to output this JSON format to Fiware Orion
m.json()

Help

In [ ]:
# dir() and help() functions are your friends
help(DataModel.add)

Adding attributes

Type mapping

Each NGSI type is mapped to a Python type.

NGSI type
Python type     Alt. Python type
Text str
Integer int
Float float
Boolean boolean
DateTime datetime str : add_date()
geo:json tuple geojson.Point
URL str : add_url()
STRING_URL_ENCODED str (urlencode=True)
json array Sequence
json dictionary dict

DateTime attribute

In [ ]:
# create a dateObserved attribute
from datetime import datetime

now = datetime.utcnow() # Orion expects UTC dates
m.add("dateObserved", now)

m.pprint()
In [ ]:
# a one-liner for dealing with the current date (as above)

m.add_now("dateObserved") # overwrites the dateObserved attribute

m.pprint()
In [ ]:
# if you already have a datetime as a well-formatted string, you can use it directly
m.add_date("dateObserved", "2020-02-27T04:52:53:000Z")

# note that adding an already existing attribute replaces the value
m.pprint()

Location attribute

In [ ]:
# create a location attribute for Bordeaux
m.add("location", (-0.57918, 44.837789))

m.pprint()
In [ ]:
# create a location attribute given a GeoJson Point
from geojson import Point

loc = Point((44.837789, -0.57918))
m.add("location", loc)

m.pprint()

URL attribute

In [ ]:
# create a dataProvider attribute
m.add_url("dataProvider", "https://app.sencrop.com")

m.pprint()

Relationship attribute

In [ ]:
# create a relationShip attribute
# the "ref" prefix is mandatory
m.add_relationship("refStore", "Shelf", "001")

m.pprint()

Advanced usage

Adding raw data

In [ ]:
# create an attribute containing raw data
m.add("rawData", "Zm9yYmlkZGVuIGNoYXJhY3RlcnM=", urlencode=True)

# NGSIv2 forbidden characters : https://fiware-orion.readthedocs.io/en/1.14.0/user/forbidden_characters/index.html
# note that the equal sign '=' is part of these characters
# as '=' belongs to the base64 alphabet you MUST use urlencore=True when carrying base64 payloads

m.pprint()

Adding a JSON Array

In [ ]:
# create an attribute containing a basic json array
# just provide a Python Sequence type, here a list
m.add("additionalData", [{"key1": "value1"}, {"key2": "value2"}])

m.pprint()

Adding a JSON dictionary

In [ ]:
# create an attribute containing a basic json array
# just provide a Python Sequence type, here a list
m.add("people",  {"firstname": "Tom", "lastname": "Cruise",  "occupation": "Actor"})

m.pprint()

Adding a postal address

A postal address is a dedicated dictionary with property and type already set

In [ ]:
addr = {"addressLocality": "London",
        "postalCode": "EC4N 8AF",
        "streetAddress": "25 Walbrook"}
m.add_address(addr)

m.pprint()

Adding metadata

In [ ]:
# provide a dictionary
unit = {"unitCode": "Pa"}

m = DataModel(id="Room2", type="Room")
m.add("pressure", 720, metadata=unit)

m.pprint()

Ordering

In [ ]:
# attributes are ordered in the creation order
# so if ordering is important for you just take care at the creation time
m.add("temperature", 23.0)

m.pprint()

Full example

This example is used by the Sencrop NGSI Agent that collects weather data from the Sencrop API.
Sencrop is a french company that sells connected weather stations.

The datamodel used here extends the Fiware WeatherObserved datamodel.

In [ ]:
name = "GPMB" # Grand Port Maritime de Bordeaux
date = "2020-01-21T23:51:14.000Z" # observation date
provider = 20430 # sensor id

model = DataModel(id=f"{name}-WeatherObserved-Sencrop-{provider}-{date}", type="WeatherObserved")
model.add_url("dataProvider", "https://app.sencrop.com")
model.add("address", "Port of Bassens")
model.add("refDevice", f"device-sencrop-{name}-{provider}")
model.add_date("dateObserved", date)
model.add("windSpeed", 3.75)
model.add("windGust", 7.08)
model.add("windDirection", 59.0)

model.pprint()
In [ ]:
 
\pagebreak

Chapter 3 : Write to Orion

Let's assume you have a NGSI entity ready to be sent to the Orion Context Broker.
Let's continue with the tutorial of the previous chapter.

In [ ]:
# our basic NGSI entity from the previous chapter

from datetime import datetime, timedelta, timezone
from pyngsi.ngsi import DataModel

m = DataModel(id="Room1", type="Room")
m.add_now("dateObserved")
m.add("pressure", 720)
m.add("temperature", 23.0)

m.pprint()

The SinkOrion class

At this point you should have an Orion server up and running.
If not you could run your own local Docker-based server, as described in the Appendix.

dir() and help() are always useful.

In [ ]:
# there are many other sinks mainly to help develop your agent
# use help() for info
# for the moment just focus on Orion
from pyngsi.sink import SinkOrion

# have a look at the init args
help(SinkOrion.__init__)

Init the Sink

In [ ]:
# use our local Orion server
sink = SinkOrion()

Write entity

In [ ]:
# by default it is a silent operation
# it's ok unless an exception is raised
sink.write(m.json())

Congratulations ! You have wrote your first entity to Orion !

Visualize result

$ curl http://127.0.0.1:1026/v2/entities

[{"id":"Room1","type":"Room","dateObserved":{"type":"DateTime","value":"2020-03-27T07:18:39.00Z","metadata":{}},"pressure":{"type":"Integer","value":720,"metadata":{}},"temperature":{"type":"Float","value":23,"metadata":{}}}]

Update entity

In [ ]:
# update our entity : one degree more !
m.add_now("dateObserved")
m.add("temperature", 24.0)

# write again
sink.write(m.json())
$ curl http://127.0.0.1:1026/v2/entities

[{"id":"Room1","type":"Room","dateObserved":{"type":"DateTime","value":"2020-03-27T07:27:47.00Z","metadata":{}},"pressure":{"type":"Integer","value":720,"metadata":{}},"temperature":{"type":"Float","value":24,"metadata":{}}}]

Transient entities

Transient entities are entities carrying an expire timestamp in order for Orion to delete the entity when time is over.

In [ ]:
expiration_timestamp = datetime.utcnow() + timedelta(days=10)
m.add_transient(expire=expiration_timestamp)

m.pprint()
In [ ]:
# Use a TTL instead of an expire timestamp
m.add_transient(10*86400) # TTL in seconds

m.pprint()

Apply automatically the TTL mechanism to all entities

In [ ]:
DataModel.set_transient(timeout=10*86400) # TTL = 10 days
m = DataModel("id", "type") # automatically add the dateExpires attribute at entity creation

m.pprint()

DataModel.unset_transient() # back to default (no TTL)

Constuctor args in detail

The default init args target a local installation.
If needed set the hostname and port according to your Orion server.

baseurl SHOULD not be modified.

service and servicepath have to do with Fiware multi-tenancy.
When provided, Fiware-Service and Fiware-ServicePath HTTP Headers are set accordingly.

useragent is the User-Agent HTTP Header sent to Orion.
It is set by pyngsi with its own version. You can override it.

proxy allows you to configure a proxy between the agent and Orion, for debugging purpose.

token allows you to provide an OAuth2 token for Orion authentication.
Default is no authentication.

Setting a token programmatically is NOT recommended in production mode - for security reasons.

You can safely use one of the two above methods :

  • set the environment variable ORION_TOKEN
  • if using Docker, set a docker secret named orion_token

Status

In [ ]:
from pyngsi.sink import SinkOrion

sink = SinkOrion()

# we can ask Orion for its status
status = sink.status()
print(status)
\pagebreak

Chapter 4 : Write your first Agent

The Source class

The Source is the place where you implement the acquisition of your data

Regardless of the type of data, the framework will always consume a Source the same way.
Many generic sources are provided by the framework and it's easy to create a new one.

The Source strongly encourages to stream incoming data

Whatever the size of the dataset, memory will never be an issue.

Example

Let's continue with our room sensors.

In [ ]:
# this source provides us with a CSV line each second
from pyngsi.sources.more_sources import SourceSampleOrion

# init the source
src = SourceSampleOrion()

# iterate over the source
for row in src:
    print(row)

Here we can see that a row is an instance of a Row class.

For the vast majority of the Sources, the provider keeps the same value during the datasource lifetime.
We'll go into details in next chapters.

In practice you won't iterate the Source by hand. The framework will do it for you.

The Agent class

Here comes the power of the framework.
By using an Agent you will delegate the processing of the Source to the framework.

Basically an Agent needs a Source for input and a Sink for output.
It also needs a function in order to convert incoming rows to NGSI entities.

Once the Agent is initialized, you can run it !

Let's continue with our rooms.

In [ ]:
from pyngsi.sources.more_sources import SourceSampleOrion
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent

# init the source
src = SourceSampleOrion()

# for the convenience of the demo, the sink is the standard output
sink = SinkStdout()

# init the agent
agent = NgsiAgent.create_agent(src, sink)

#run the agent
agent.run()

Here you can see that incoming rows are outputted 'as is'.
It's possible because SinkStdout ouputs whatever it receives on its input.
But SinkOrion expects valid NGSI entities on its input.

So let's define a conversion function.

In [ ]:
from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel

def build_entity(row: Row) -> DataModel:
    id, temperature, pressure = row.record.split(';')
    m = DataModel(id=id, type="Room")
    m.add("dataProvider", row.provider)
    m.add("temperature", float(temperature))
    m.add("pressure", int(pressure))
    return m

And use it in the Agent.

In [ ]:
# init the Agent with the conversion function
agent = NgsiAgent.create_agent(src, sink, process=build_entity)

# run the agent
agent.run()

# obtain the statistics
print(agent.stats)

Congratulations ! You have developed your first pyngsi Agent !

Feel free to try SinkOrion instead of SinkStdout.
Note that you get statistics for free.

Inside your conversion function you can filter input rows just by returning None.
For example, if you're not interested in Room3 you could write this function.

In [ ]:
def build_entity(row: Row) -> DataModel:
    id, temperature, pressure = row.record.split(';')
    if id == "Room3":
        return None
    m = DataModel(id=id, type="Room")
    m.add("dataProvider", row.provider)
    m.add("temperature", float(temperature))
    m.add("pressure", int(pressure))
    return m

agent = NgsiAgent.create_agent(src, sink, process=build_entity)
agent.run()
print(agent.stats)

The side_effect() function

As of v1.2.5 the Agent takes the side_effect() function as an optional argument.
That function will allow to create entities aside of the main flow. Few use cases might need it.

In [ ]:
def side_effect(row, sink, model) -> int:
    # we can use as an input the current row or the model returned by our process() function
    m = DataModel(id=f"Building:MainBuilding:Room:{model['id']}", type="Room")
    sink.write(m.json())
    return 1 # number of entities created in the function

agent = NgsiAgent.create_agent(src, sink, process=build_entity, side_effect=side_effect)
agent.run()
print(agent.stats)

Conclusion

This NGSI Agent developed in this chapter is a very basic one.

Principles remain the same when things become more complex.

\pagebreak

Chapter 5 : Datasources

Dealing with datasources is the most complex part because datasources are versatile :

  • Different content formats : plain text, json, ...
  • Different protocols UDP, FTP, HTTP, ...
  • Request (request the datasource aka client mode) or Being requested (server mode).

Dive into the Source class

The Source class aims at providing a common interface for all datasources.

As datasources have very little in common, the only assumption made by the framework is :

Every datasource is iterable

In practical terms, a Source uses a Python generator.
The pyngsi.sources package offers many generic sources, and it's easy to create your custom Source by extending the Source class.

A Source iterates over rows.
Rows are composed of two parts :

  • the record : the incoming content itself (the payload)
  • the content provider : just a string that reminds the origin of the row

Theses 2 points - the iterable sources and the row definition - are the basement of the framework.
This common interface will allow us to create agents that will use our Sources, as we have seen in the previous chapter.

The Row

In [ ]:
from pyngsi.sources.source import Row

help(Row.__init__)

Sources provided by the framework

Sources are provided in the pyngsi.sources package.

The Source base class as well as the Row class are defined in pyngsi.sources.source.

  • Source is the Source base class

  • SourceStream takes incoming plain text data from any iterable input : may be an opened file descriptor or any Python sequence such as a list or a tuple

  • SourceSdtin is a shortcut for SourceStream(sys.stdin) and takes data from the standard input

  • SourceSingle takes incoming data from a single (non-iterable) element

  • SourceMany combines many sources

  • SourceJson takes incoming JSON data from a JSON string
    If the incoming JSON is a JSON Array then SourceJson iterates over the array
    The jsonpath argument points the array to be iterated in the JSON structure (i.e. path = ["cars","models"])

  • SourceSampleOrion is the Source dedicated to the tutorial found in the Orion walkthrough

  • SourceMicrosoftExcel is a Source that can read xlsx files
    It is given as an example to write your own custom Source classes

  • SourceFtp retrieves incoming data from FTP servers

  • Server is a base class for Sources acting as a server
    This kind of datasources are not requested by the Agent

  • ServerHttpUpload is an HTTP server
    Each time it receives content on its upload endpoint, it acts as a Source that feeds data to the Agent

  • ServerUdp is an UDP server

Example 1 : Process a local file

Here the Source takes incoming data from a compressed JSON file.

As the JSON is an array, the Source iterates over each row of the JSON Array.
The provider is filled with the name of the file.

In [ ]:
from pyngsi.sources.source import Source

# returns a SourceJson guessed from the extension
src = Source.from_file("files/colors.json.gz")

for row in src:
    print(row)

To process local files in batch mode, you may also use from_files(), from_glob() and from_globs().

Example 2 : Process FTP files

This example runs only on a local Jupyter notebook (needs network access)

A quite complex example made easy.

The FTP server used in the example serves RFC files.
Each RFC comes with a brief description in a JSON file.
Have a look at rfc959.json in the files/ folder.

We will output NGSI entities to Orion with a great datamodel exposing the title, the date of publication and the number of pages.
We will focus only on RFC958 and RFC2228 speaking about the FTP protocol.

Define our datamodel

In [ ]:
from datetime import datetime

from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel

def build_entity(row: Row) -> DataModel:
    rfc = row.record
    m = DataModel(id=rfc["doc_id"], type="RFC")
    m.add("dataProvider", row.provider)
    m.add("title", rfc["title"])
    m.add("publicationDate", datetime.strptime(rfc["pub_date"], "%B %Y"))
    m.add("pageCount", int(rfc["page_count"]))
    return m

Let's use our datamodel in our Agent

In [ ]:
from pyngsi.sources.source_ftp import SourceFtp
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent

# help(SourceFtp) for more info
src = SourceFtp("ftp.ps.pl", paths=[
    "/pub/rfc"], f_match=lambda x: x.endswith("rfc958.json") or x.endswith("rfc2228.json"))

# if you have an Orion server available, just replace SinkStdout() with SinkOrion()
sink = SinkStdout()

# the source has auto-detected that we deal with JSON files, hence has parsed json for us
agent = NgsiAgent.create_agent(src, sink, process=build_entity)
agent.run()

# resources are freed
# here the agent removes the temporary directory (where files were downloaded).
agent.close()

# get statistics
print(agent.stats)

Example 3 : Expose a REST API (Server mode)

In this example (as we run a server) some code cells are not executable.
You can reproduce it locally on your computer.

This time the Agent doesn't get incoming data from the datasource.

In server mode, the Agent is requested by the datasource.
From the datasource point of view we could also call that push mode.

In this example our temperature/pressure sensors store measures locally and a JSON file is sent periodically to the Agent.

Define your datamodel

In [ ]:
from pyngsi.sources.source import Row
from pyngsi.ngsi import DataModel

def build_entity(row: Row) -> DataModel:
    r = row.record
    m = DataModel(id=r["room"], type="Room")
    m.add("temperature", r["temperature"])
    m.add("pressure", r["pressure"])
    return m

Let's use our datamodel in our Agent

from pyngsi.sources.server import ServerHttpUpload
from pyngsi.sink import SinkOrion
from pyngsi.agent import NgsiAgent

# help(ServerHttpUpload) for more info
src = ServerHttpUpload()

# init ths sink
sink = SinkOrion()

# the agent processes JSON content received from the client
agent = NgsiAgent.create_agent(src, sink, process=build_entity)

# run the server
agent.run()

Send JSON measures to the Agent

$ curl -X POST -H "Content-Type: application/json" -d '{"room":"Room1","temperature":23.0,"pressure":710}' http://127.0.0.1:8880/upload

Additional endpoints

  • /version
{
  "name": "pyngsi",
  "version": "2.1.0"
}
  • /status : get status and consolidated statistics
{
  "ngsi_stats": {
    "error": 0,
    "filtered": 0,
    "input": 6,
    "output": 6,
    "processed": 6
  },
  "server_status": {
    "calls": 6,
    "calls_error": 0,
    "calls_success": 6,
    "lastcalltime": "Fri, 27 Mar 2020 14:17:12 GMT",
    "starttime": "Fri, 27 Mar 2020 14:17:03 GMT"
  }
}

Our first custom Source

A source is essentially an iterable.
In practice it is based on a Python generator.

In [ ]:
from pyngsi.sources.source import Source, Row

class CustomSource(Source): 
    def __init__(self, rooms): 
        self.rooms = rooms 
        
    def __iter__(self): 
        for record in self.rooms: 
            yield Row("custom", record)

Let's use it

In [ ]:
# our CSV lines
rooms = ["Room1;23;720", "Room2;21;711"]

# init the source
src = CustomSource(rooms)

# consume the source and print rows
for row in src:
    print(row)

A Source offer some facilities do deal with generators.

In [ ]:
# extend our rooms
rooms *= 3

print(rooms)
for row in src.limit(3):
    print(row)
In [ ]:
 
\pagebreak

Chapter 6 : Schedule

The Scheduler class allows to run an agent periodically.

Of course you could write an agent then schedule executions using an external scheduler such as cron.

The benefits of scheduling with the framework are :

  • easy and immediate
  • keep status and consolidated statistics during the agent lifetime

Example

Let's start by creating a dummy NGSI agent

In [ ]:
from pyngsi.ngsi import DataModel
from pyngsi.sources.source import SourceSingle, Row
from pyngsi.sink import SinkStdout
from pyngsi.agent import NgsiAgent

dummy = DataModel(id="dummy", type="dummy")
src = SourceSingle("dummy")
sink = SinkStdout()

# init the dummy agent
agent = NgsiAgent.create_agent(src, sink, lambda x: dummy)

# run once
agent.run()

Schedule the dummy agent each 5 seconds

In [ ]:
from pyngsi.scheduler import Scheduler, UNIT

scheduler = Scheduler(agent, interval=5, unit=UNIT.seconds)
scheduler.run()

Additional endpoints

  • /version
{
  "name": "pyngsi",
  "version": "2.1.0"
}
  • /status : get status and consolidated statistics
{
  "ngsi_stats": {
    "error": 0,
    "filtered": 0,
    "input": 6,
    "output": 6,
    "processed": 6
  },
  "server_status": {
    "calls": 6,
    "calls_error": 0,
    "calls_success": 6,
    "lastcalltime": "Fri, 27 Mar 2020 14:17:12 GMT",
    "starttime": "Fri, 27 Mar 2020 14:17:03 GMT"
  }
}
\pagebreak

Chapter 7 : Debugging

We need tools to debug, display sink output, troubleshoot communication with Orion.

Debug the agent

pyngsi relies on loguru for logging.

You can set the logging level :

  • by using the LOGURU_LEVEL environment variable

  • programmatically :

    from loguru import logger
    
      logger.remove()
      logger.add(sys.stderr, level="DEBUG")
    

Display the agent output

Sometimes you need to display the output of your agent.

You can replace Orion with other Sinks. You already know SinkStdout.
SinkFile and SinkFileGzipped can also be useful.

Troubleshoot Orion

Change Orion log level

$ curl -X PUT -s -S "http://127.0.0.1:1026/admin/log?level=DEBUG"

Request Orion entities

$ curl http://127.0.0.1:1026/v2/entities | jq .

Set a proxy in SinkOrion

sink = SinkOrion(proxy="http://127.0.0.1:8080")

Then use an HTTP intercepting proxy such as Burp Suite to monitor traffic towards Orion.

\pagebreak

Chapter 8 : Tips to develop a full size NGSI Agent

Tips

Virtual environment

The recommanded way is to use a Python virtual environment to manage your dependencies.

$ python3.8 -m venv venv
 $ source venv/bin/activate

Need context ?

Sometimes while building your NGSI entity you'll find out that you need a context.
Meaning you cannot build your entity just from incoming data.
A clean solution is to use a simple class to hold your context.

Example

In this basic example we want to populate the address in the NGSI entity.

class DummyProcessor:

    def __init__(self, adress: str):
        self.address = address

    def build_entity(self, row: Row) -> DataModel:
        r = row.record
        model = DataModel(id="dummy", type="dummy")
        model.add_url("dataProvider", row.provider)
        model.add("address", row.record.anyvalue) 
        return m

Then in the code of the Agent you can use the processor like this :

processor = DummyProcessor("Port of Bassens")
agent = NgsiAgent.create_agent(source, sink, process=processor.build_entity)

Command Line Interface

You'll probably need a CLI for you or your users to operate the NGSI Agent.
There are many options. I use click.

Logging

If you want log you've got it !
I recommend using loguru which is already used by pyngsi.

Unit tests

You'll probably want to have unit tests.
Thanks to pyngsi you can quickly create a SourceTest to mock your real Source.

Get info

  • this tutorial

  • dir() and help()

  • run the examples in the examples/ folder of pyngsi

  • read the code of NGSI agents developed for PIXEL

  • read unit tests

  • read the code of pyngsi on github

  • readthedocs : TODO

\pagebreak

Appendix

Run a Docker-based Orion instance locally

Start Orion

Retrieve the docker-compose file on the github repo.

$ docker-compose up -d

On the first time, it will pull the images for the Docker repository.

Orion uses mongodb as a backing database.

$ docker ps

CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
e994bef626d0        fiware/orion        "/usr/bin/contextBro…"   10 hours ago        Up 10 hours         0.0.0.0:1026->1026/tcp   orion
7c7a6cf3a078        mongo               "docker-entrypoint.s…"   10 hours ago        Up 10 hours         27017/tcp                oriondb

Access logs

$ docker logs -f orion

Refer to Chapter 6 to modify the log level in Orion.

Stop Orion

$ docker-compose down
\pagebreak

Chapter 8 : Extend the framework

One can add Sources and Sinks to the framework.
Just extend the corresponding classes.

You can add a Source that is dedicated to your custom data and your custom agent.

SourceMicrosoftExcel is given as an example.
Let's say you've just created your own Source capable of parsing xlsx files and yielding CSV-like rows from it.

In [ ]:
from pyngsi.sources.more_sources import SourceMicrosoftExcel

src = SourceMicrosoftExcel("files/room.xlsx")

for row in src:
    print(row)

Now you'd like to use it in all situations, especially if the xlsx files you're interested in are remotely.
You'll have to register the source.

In [ ]:
from pyngsi.sources.more_sources import SourceMicrosoftExcel

# global registering
Source.register(SourceMicrosoftExcel)

# for the purpose of the tutorial the file room.xlsx is stored locally
src = Source.from_glob("files/*.xlsx")

for row in src:
    print(row)

Source.unregister()

As an alternative you might want to register only the xlsx extension, allowing to handle multiple formats at the same time.

In [ ]:
from pyngsi.sources.source import Source
from pyngsi.sources.more_sources import SourceMicrosoftExcel

# register extension
Source.register_extension("xlsx", SourceMicrosoftExcel)

# for the purpose of the tutorial the files room.xlsx and room.csv are stored locally
src = Source.from_glob("files/room.*")

for row in src:
    print(row)

If you wish to add a generic Source that could be reused in other agents, please consider contributing to the project.

\pagebreak