Skip to main content
Version: v1.10

Connector Framework

The Payments Service is designed to be extensible. You can build a connector to support a new payment processor, or to support a new payment method for an existing processor. This article describes how to build a connector.

Building a Connector

We are going to create a demo connector which reads payments files from a directory. In this directory, we will deposit fictional payments files to be processed by the connector. Each file will contain a payment to be processed by the connector.

Let's get our hands on deck with the loader interface

type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] interface {
// Name has to return the name of the connector. It must be constant and unique
Name() string
// Load is in charge of loading the connector
// It takes a logger and a ConnectorConfig object.
// At this point, the config must have been validated
Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor]
// ApplyDefaults is used to fill default values of the provided configuration object.
ApplyDefaults(t ConnectorConfig) ConnectorConfig
// AllowTasks define how many task the connector can run
// If too many tasks are scheduled by the connector,
// those will be set to pending state and restarted later when some other tasks will be terminated
AllowTasks() int
}

A connector has a name, which is provided by the loader through the Name() method. The connector also defines a config object using generics, which must implement the payments.ConnectorConfigObject interface. This interface only has a Validate() method, which is used by the code to validate that an external config is valid before loading the connector with it.

Since some properties of the config may be optional, the loader is also responsible for configuring default values for these properties. This is done using the ApplyDefaults(Config) method.

The framework provides the capability to run tasks, so each connector can start a number of tasks. These tasks will be scheduled by the framework, and if the service is restarted, the tasks will be restarted as well. The number of tasks that a connector can schedule is defined by the AllowTasks() method.

To implement the Loader interface, you can create your own struct that implements the required methods, or you can use utilities provided by the framework. Here is an example of a basic loader:

type (
Config struct {}
TaskDescriptor struct {}
)

func (cfg Config) Validate() error {
return nil
}

var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example").Build()

In this example, the name of the connector is "example", and the Config and TaskDescriptor structs are empty. We will add logic to these structs later.

To integrate the connector into the core, we need to edit the cmd/root.go file and add the following code to the end of the HTTPModule() method:

    ...
cdi.ConnectorModule[stripe.Config, stripe.TaskDescriptor](
viper.GetBool(authBearerUseScopesFlag),
stripe.NewLoader(),
),
...

You can add your connector bellow that:

    ...
cdi.ConnectorModule[example.Config, example.TaskDescriptor](
viper.GetBool(authBearerUseScopesFlag),
example.Loader,
),
...

After running the service, you should see output like this:

payments-payments-1  | time="2022-07-01T09:12:21Z" level=info msg="Restoring state" component=connector-manager provider=example
payments-payments-1 | time="2022-07-01T09:12:21Z" level=info msg="Not installed, skip" component=connector-manager provider=example

This indicates that your connector is properly integrated. You can install it using this command:

curl http://localhost:8080/connectors/example -X POST

The service should display output like this:

payments-payments-1  | time="2022-07-01T10:04:53Z" level=info msg="Install connector example" component=connector-manager config="{}" provider=example
payments-payments-1 | time="2022-07-01T10:04:53Z" level=info msg="Connector installed" component=connector-manager provider=example

To uninstall your connector, you can now use the following command:

curl http://localhost:8080/connectors/example -X DELETE

The service should display output like this:

payments-payments-1  | time="2022-07-01T10:06:16Z" level=info msg="Uninstalling connector" component=connector-manager provider=example
payments-payments-1 | time="2022-07-01T10:06:16Z" level=info msg="Stopping scheduler..." component=scheduler provider=example
payments-payments-1 | time="2022-07-01T10:06:16Z" level=info msg="Connector uninstalled" component=connector-manager provider=example

This indicates that your connector has been successfully uninstalled. You can now continue with the next steps in your tutorial.

It's to time to add a bit of logic to our connector.

As you may have noticed, the Loader has method named Load :

//...
Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor]
//...

The Load function take a logger provided by the framework and a config, probably provided by the api endpoint. It has to return a Connector object. Here the interface :

The Load function takes a logger provided by the framework and a configuration, which is likely provided by the API endpoint. It returns a Connector object that provides an entry point to a payment provider. The Connector interface is as follows:

// Connector provide entry point to a payment provider
type Connector[TaskDescriptor payments.TaskDescriptor any] interface {
// Install is used to start the connector. The implementation is responsible for scheduling all required resources.
Install(ctx task.ConnectorContext[TaskDescriptor]) error
// Uninstall is used to uninstall the connector. It has to close all related resources opened by the connector.
Uninstall(ctx context.Context) error
// Resolve is used to recover state of a failed or restarted task
Resolve(descriptor TaskDescriptor) task.Task
}

When you make a request to http://localhost:8080/connectors/example with the POST method, the framework calls the Install() method. Similarly, when you make a request to the same URL with the DELETE method, the framework calls the Uninstall() method.

It is time to add some logic to our connector. We need to modify our loader, but before that, let's add some properties to our configuration:

type (
Config struct {
Directory string
}
...
)

func (cfg Config) Validate() error {
if cfg.Directory == "" {
return errors.New("missing directory to watch")
}
return nil
}

Here, we have defined only one property for our connector, called "Directory", which indicates the directory where JSON files will be pushed. Now, let's modify our loader:

var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example").
WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] {
return integration.NewConnectorBuilder[TaskDescriptor]().
WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error {
return errors.New("not implemented")
}).
Build()
}).
Build()

Here, we create a connector using a built-in builder, but you can implement the interface if you want. We define an Install method that simply returns an error when called. You can try installing your connector again and see the error in the HTTP response.

The Install method takes a task.ConnectorContext[TaskDescriptor] parameter, which has the following interface:

type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface {
Context() context.Context
Scheduler() Scheduler[TaskDescriptor]
}

Basically this context provides two Essentially, this context provides two things:

  • context.Context: If the connector performs long-running processing, it should listen on this context to abort if necessary.
  • Scheduler[TaskDescriptor]: A scheduler to run tasks

We made our way up to this point without tasks. A task is like a process that the framework will handle for you. It is essentially a simple function. When installed, a connector has the opportunity to schedule tasks and let the system handle them. A task has a descriptor, which must be immutable and represents a specific task in the system. It can be anything. A task also has a state, which can change and the framework provides necessary APIs to do that. We will discuss that later. Like the descriptor, the state is freely defined by the connector.

In our case, the main task is to list the target repository. Secondary tasks will be defined to read each file in the directory. We can define our task descriptor as a string. The value will be the file name for secondary tasks and a hardcoded value of "directory" for the main task.

Before adding the logic, let's modify our previously introduced task descriptor:

type (
...
TaskDescriptor string
...
)

Now, let's add some logic to our connector:

    ...
WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error {
return ctx.Scheduler().Schedule("directory", true)
}).
...

Here, we instruct the framework to create a task with the descriptor "directory". This means that the framework can handle the task, restart it, log/save errors, etc. However, it does not yet know the specific logic of the task.

To provide this logic, we have to use the last method of the connector: Resolve(descriptor TaskDescriptor) task.Task. This method is responsible for providing a task.Task instance given a descriptor.

Therefore, when calling ctx.Scheduler().Schedule("directory"), the framework will call the Resolve method with "directory" as a parameter.

Let's implement the resolve method:

    ...
WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error {
return ctx.Scheduler().Schedule("directory")
}).
WithResolve(func(descriptor TaskDescriptor) task.Task {
if descriptor == "directory" {
return func() {
// TODO
}
}
// Secondary tasks
return func() {
// TODO
}
}).
...

Now, we have to implement the logic for each task. Let's start with the main task which read the directory:

    ...
WithResolve(func(descriptor TaskDescriptor) task.Task {
if descriptor == "directory" {
return func(ctx context.Context, logget sharedlogging.Logger, scheduler task.Scheduler)
for {
select {
case <-ctx.Done():
return nil
case <-time.After(10 * time.Second): // Could be configurable using Config object
logger.Infof("Opening directory '%s'...", config.Directory)
dir, err := os.ReadDir(config.Directory)
if err != nil {
logger.Errorf("Error opening directory '%s': %s", config.Directory, err)
continue
}

logger.Infof("Found %d files", len(dir))
for _, file := range dir {
err = scheduler.Schedule(TaskDescriptor(file.Name()))
if err != nil {
logger.Errorf("Error scheduling task '%s': %s", file.Name(), err)
continue
}
}
}
}
}
}
return func() error {
return errors.New("not implemented")
}
}).
...

To now test our implementation, we can start the server as usual and issue a curl request to install the connector:

curl http://localhost:8080/connectors/example -X POST -d '{"directory": "/tmp/payments"}'

This instructs the connector to watch the directory /tmp/payments. You should see something like this in the app logs:

payments-payments-1  | time="2022-07-01T12:29:05Z" level=info msg="Install connector example" component=connector-manager config="{/tmp/payments}" provider=example
payments-payments-1 | time="2022-07-01T12:29:05Z" level=info msg="Starting task..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T12:29:05Z" level=info msg="Connector installed" component=connector-manager provider=example
payments-payments-1 | time="2022-07-01T13:26:51Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T13:26:51Z" level=error msg="Error opening directory '/tmp/payments': open /tmp/payments: no such file or directory" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="

As expected, the task triggers an error because the /tmp/payments directory does not exist (yet).

You can see the tasks on the API as well:

curl http://localhost:8080/connectors/example/tasks | jq

[
{
"provider": "example",
"descriptor": "directory",
"createdAt": "2022-07-01T13:26:41.749Z",
"status": "active",
"error": "",
"state": {},
"id": "ImRpcmVjdG9yeSI="
}
]

As you can see, a task has an id. This id is simply the descriptor of the task encoded in canonical json and encoded as base 64.

To fix the error we see in the logs, we can create the missing directory:

docker compose exec payments mkdir /tmp/payments

After a few seconds, you should see thoses logs lines:

payments-payments-1  | time="2022-07-01T13:29:21Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T13:29:21Z" level=info msg="Found 0 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="

Ok, now we have a directory to watch. Let's add some files to it:

docker compose cp docs/samples-payin.json payments:/tmp/payments/001.json

Now you should see something like this in the logs:

payments-payments-1  | time="2022-07-01T13:33:51Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T13:33:51Z" level=info msg="Found 1 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T13:33:52Z" level=info msg="Starting task..." component=scheduler provider=example task-id="IjAwMS5qc29uIg=="
payments-payments-1 | time="2022-07-01T13:33:52Z" level=error msg="Task terminated with error: not implemented" component=scheduler provider=example task-id="IjAwMS5qc29uIg=="

The logs show that our connector properly detected the file and triggered a new task for the file. The task terminates with an error as the Resolve function does not handle the descriptor. We will fix that in the next section.

Again, you can check the tasks on the API:

[
{
"provider": "example",
"descriptor": "directory",
"createdAt": "2022-07-01T13:26:41.749Z",
"status": "active",
"error": "",
"state": "XXX",
"id": "ImRpcmVjdG9yeSI="
},
{
"provider": "example",
"descriptor": "001.json",
"createdAt": "2022-07-01T13:33:31.935Z",
"status": "failed",
"error": "not implemented",
"state": "XXX",
"id": "IjAwMS5qc29uIg=="
}
]

As you can see, while the first task is still active, the second is flagged as failed with an error message.

Let's implement the second task. We will simply read the file and ingest its content:

    ...
file, err := os.Open(filepath.Join(config.Directory, string(descriptor)))
if err != nil {
return err
}

type JsonPayment struct {
payments.Data
Reference string `json:"reference"`
Type string `json:"type"`
}

jsonPayment := &JsonPayment{}
err = json.NewDecoder(file).Decode(jsonPayment)
if err != nil {
return err
}

return ingester.Ingest(ctx, ingestion.Batch{
{
Referenced: payments.Referenced{
Reference: jsonPayment.Reference,
Type: jsonPayment.Type,
},
Payment: &jsonPayment.Data,
Forward: true,
},
}, struct{}{})
...

Now restart the service, uninstall the connector, and reinstall it.

The logs should now show something like this:

payments-payments-1  | time="2022-07-01T14:25:20Z" level=info msg="Install connector example" component=connector-manager config="{/tmp/payments}" provider=example
payments-payments-1 | time="2022-07-01T14:25:20Z" level=info msg="Starting task..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T14:25:20Z" level=info msg="Connector installed" component=connector-manager provider=example
payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Found 1 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI="
payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Starting task..." component=scheduler provider=example task-id="IjAwMS5qc29uIg=="
payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Task terminated with success" component=scheduler provider=example task-id="IjAwMS5qc29uIg=="

As you can see, this time the second task has been started and successfully completed.

It should have created a payment on database. Let's check it:

curl http://localhost:8080/payments | jq

{
"data": [
{
"id": "eyJwcm92aWRlciI6ImV4YW1wbGUiLCJyZWZlcmVuY2UiOiIwMDEiLCJ0eXBlIjoicGF5aW4ifQ==",
"reference": "001",
"type": "payin",
"provider": "example",
"status": "succeeded",
"initialAmount": 100,
"scheme": "",
"asset": "USD",
"createdAt": "0001-01-01T00:00:00Z",
"raw": null,
"adjustments": [
{
"status": "succeeded",
"amount": 100,
"date": "0001-01-01T00:00:00Z",
"raw": null,
"absolute": false
}
]
}
]
}

Lovely! We now have successfully created a payment from a file at this stage.

The last important part to cover is the Ingester, which is the component that is responsible for saving the payment object in the database.

In the code of the second task, you may have noticed the following part:

return ingester.Ingest(ctx.Context(), ingestion.Batch{
{
Referenced: payments.Referenced{
Reference: jsonPayment.Reference,
Type: jsonPayment.Type,
},
Payment: &jsonPayment.Data,
Forward: true,
},
}, struct{}{})

The ingester is in charge of accepting payments from a task and an eventual state to be persisted. In our case, we don't alter the state of an existing payment already saved to storage, so we simply pass an empty struct.

info

Lifecycle-wise, a good thing to note is that if the connector is restarted, the task will be restarted with the previously state.

Wrapping up

In this tutorial, we have seen how to create a connector from scratch. We have seen how to create a connector, how to create a task, and how to use the ingester to save the payment in the database. We have also seen how to use the scheduler to run the task periodically, and how to use the API to manage the connector. The code below is a full recap of the code we have seen in this tutorial.

package example

import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"time"

"github.com/formancehq/go-libs/sharedlogging"
payments "github.com/formancehq/payments/pkg"
"github.com/formancehq/payments/pkg/bridge/ingestion"
"github.com/formancehq/payments/pkg/bridge/integration"
"github.com/formancehq/payments/pkg/bridge/task"
)

type (
Config struct {
Directory string
}
TaskDescriptor string
)

func (cfg Config) Validate() error {
if cfg.Directory == "" {
return errors.New("missing directory to watch")
}
return nil
}

var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example").
WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] {
return integration.NewConnectorBuilder[TaskDescriptor]().
WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error {
return ctx.Scheduler().Schedule("directory", false)
}).
WithResolve(func(descriptor TaskDescriptor) task.Task {
if descriptor == "directory" {
return func(ctx context.Context, logger sharedlogging.Logger, scheduler task.Scheduler[TaskDescriptor]) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second): // Could be configurable using Config object
logger.Infof("Opening directory '%s'...", config.Directory)
dir, err := os.ReadDir(config.Directory)
if err != nil {
logger.Errorf("Error opening directory '%s': %s", config.Directory, err)
continue
}

logger.Infof("Found %d files", len(dir))
for _, file := range dir {
err = scheduler.Schedule(TaskDescriptor(file.Name()), false)
if err != nil {
logger.Errorf("Error scheduling task '%s': %s", file.Name(), err)
continue
}
}
}
}
}
}
return func(ctx context.Context, ingester ingestion.Ingester, resolver task.StateResolver) error {
file, err := os.Open(filepath.Join(config.Directory, string(descriptor)))
if err != nil {
return err
}

type JsonPayment struct {
payments.Data
Reference string `json:"reference"`
Type string `json:"type"`
}

jsonPayment := &JsonPayment{}
err = json.NewDecoder(file).Decode(jsonPayment)
if err != nil {
return err
}

return ingester.Ingest(ctx, ingestion.Batch{
{
Referenced: payments.Referenced{
Reference: jsonPayment.Reference,
Type: jsonPayment.Type,
},
Payment: &jsonPayment.Data,
Forward: true,
},
}, struct{}{})
}
}).
Build()
}).
Build()