Search
K
Comment on page

Custom Sink Using JavaScript

If none of the previous sinks work for you, don't worry! You can create your own custom sink by using the Substreams JavaScript library. This library enables you to run a Substreams, just like you would through the CLI, but programatically.
The library works both on the client-side and the server-side.

Installing the Library

In your JavaScript project, use your preferred JavaScript package manager to install the required dependencies:
  1. 1.
    The Substreams Core library:
npm install @substreams/core
  1. 2.
    The Substreams Manifest library:
npm install @substreams/manifest
  1. 3.
    The Protobuf library, which will be used to decode the Substreams response:
npm install @bufbuild/connect-web

Using the Library

In order to use the library, you will need:
  • A Substreams endpoint.
  • An authentication token (visit https://app.streamingfast.io to get one).
  • A Substreams package (spkg).
Consider that you want to consume the map_block_meta module of the Ethereum Explorer package, which is hosted on Google Cloud (https://storage.googleapis.com/substreams-registry/spkg/ethereum-explorer-v0.1.1.spkg).
  1. 1.
    First, let's define a few helper variables:
const TOKEN = "YOUR_TOKEN" // Your authentication token
const SPKG = "https://storage.googleapis.com/substreams-registry/spkg/ethereum-explorer-v0.1.1.spkg" // URL of the SPKG
const MODULE = "map_block_meta" // Name of the Substreams Module to run
  1. 2.
    Use the fetchSubstream method from the library to download the Substreams. Then, the createRegistry function creates the Protobuf definitions from the package:
const fetchPackage = async () => {
return await fetchSubstream(SPKG)
}
const main = async () => {
// Fetch Substreams
const pkg = await fetchPackage()
// Create Protobuf registry
const registry = createRegistry(pkg);
}
  1. 3.
    Use the createConnectTransport to define the networking details of the connection (Substreams endpoint and authentication token):
const main = async () => {
const pkg = await fetchPackage()
const registry = createRegistry(pkg);
const transport = createConnectTransport({
// Substreams endpoint
baseUrl: "https://api.streamingfast.io",
// Authentication token
interceptors: [createAuthInterceptor(TOKEN)],
useBinaryFormat: true,
jsonOptions: {
// Protobuf Registry
typeRegistry: registry,
},
});
}
  1. 4.
    The createRequest function encapsulates the information of the execution (package, module, start block and stop block):
const main = async () => {
const pkg = await fetchPackage()
const registry = createRegistry(pkg);
const transport = createConnectTransport({
baseUrl: "https://api.streamingfast.io",
interceptors: [createAuthInterceptor(TOKEN)],
useBinaryFormat: true,
jsonOptions: {
typeRegistry: registry,
},
});
// Execution details
const request = createRequest({
substreamPackage: pkg,
outputModule: MODULE,
productionMode: true,
startBlockNum: 100000,
stopBlockNum: '+10',
});
}
  1. 5.
    Finally, you can use the streamBlocks function to iterate over the stream of blocks returned by the Substreams endpoint:
const main = async () => {
const pkg = await fetchPackage()
const registry = createRegistry(pkg);
const transport = createConnectTransport({
baseUrl: "https://api.streamingfast.io",
interceptors: [createAuthInterceptor(TOKEN)],
useBinaryFormat: true,
jsonOptions: {
typeRegistry: registry,
},
});
const request = createRequest({
substreamPackage: pkg,
outputModule: MODULE,
productionMode: true,
startBlockNum: 100000,
stopBlockNum: '+10',
});
// Iterate over blocks
for await (const response of streamBlocks(transport, request)) {
const output = unpackMapOutput(response.response, registry);
if (output !== undefined && !isEmptyMessage(output)) {
const outputAsJson = output.toJson({typeRegistry: registry});
console.log(outputAsJson)
}
}
}
Now, you can send the data anywhere and create your own custom sink! If you have created a sink and you think it can be reused by other developers, let us know on Discord!
The previous code is availalble on GitHub.