Building a Connector
We are going to create a demo connector which reads payments files from a directory. In this directory, we will deposit fictional payments files to be processed by the connector. Each file will contain a payment to be processed by the connector. Let’s get our hands on deck with the Connector interfaceApplyDefaults(Config)
method.
The framework provides the capability to run tasks, so each connector can start
a number of tasks. These tasks will be scheduled by the framework, and if the
service is restarted, the tasks will be restarted as well. The number of tasks
that a connector can schedule is defined by the AllowTasks()
method.
To implement the Loader
interface, you can create your own struct that
implements the required methods, or you can use utilities provided by the
framework. Here is an example of a basic loader:
Config
struct is empty. We will add logic to these structs later.
To integrate the connector into the core, we need to edit the cmd/connectors/internal/api/module.go
file and add the following code to the end of the HTTPModule()
method:
Loader
has method named Load
:
http://localhost:8080/api/payments/connectors/example
with the
POST
method, the framework calls the Install()
method. Similarly, when you
make a request to the same URL with the DELETE
method, the framework calls
the Uninstall()
method.
It is time to create a basic connector that does nothing.
internal/storage/migration_v1.x.go
: Add a new migration adding the connector enum to the postgres enum:
internal/models/connector.go
: Add the enum to theConnectorProvider
type:
Install
method takes a task.ConnectorContext
parameter, which has the
following interface:
context.Context
: If the connector performs long-running processing, it should listen on this context to abort if necessary.Scheduler
: A scheduler to run tasks
- Schedule options: can be either:
OPTIONS_RUN_NOW
: The task will be run immediately in an async taskOPTIONS_RUN_IN_DURATION
: The task will be run after a given durationOPTIONS_RUN_SCHEDULED_AT
: The task will be run at a specific timeOPTIONS_RUN_PERIODICALLY
: The task will be run every given durationOPTIONS_RUN_NOW_SYNC
: The task will be run immediately in a sync task and will return the associated error if there is one
- Duration: duration used by the
OPTIONS_RUN_IN_DURATION
orOPTIONS_RUN_PERIODICALLY
schedule options - Restart options: can be either:
OPTIONS_STOP_AND_RESTART
OPTIONS_RESTART_ALWAYS
: The task will always be restartedOPTIONS_RESTART_NEVER
: The task will never be restartedOPTIONS_RESTART_IF_NOT_ACTIVE
: the task will be restarted if it is not marked as active in the databaseOPTIONS_STOP_AND_RESTART
: the task will be stopped if it exists and restarted
Resolve(descriptor models.TaskDescriptor) task.Task
. This method is
responsible for providing a task.Task
instance given a descriptor.
Therefore, when calling ctx.Scheduler().Schedule(...)
, the framework will call
the Resolve
method with “directory” as a parameter.
Let’s implement the resolve method:
/tmp/payments
. You should
see something like this in the app logs:
/tmp/payments
directory
does not exist (yet).
You can see the tasks on the API as well:
Resolve
function does not handle
the descriptor. We will fix that in the next section.
As you can see, while the first task is still active, the second is flagged
as failed with an error message.
Let’s implement the second task. We will simply read the file and ingest its content:
Ingester
, which is the component
that is responsible for saving the payment object in the database.
In the code of the second task, you may have noticed the following part:
Lifecycle-wise, a good thing to note is that if the connector is restarted,
the task will be restarted with the previously state.