Skip to main content

Ethereum/EVM Chains Integration

Prerequisites​

  • Smart contract address
  • ABI or interface definition
  • RPC endpoint access
  • Starbloom API key

Implementation Steps​

1. Define Protocol Buffers​

Create a protobuf definition for your contract events:

contract.proto
syntax = "proto3";

package mycontract;

import "google/protobuf/timestamp.proto";

message Transfer {
google.protobuf.Timestamp timestamp = 1;
bytes from = 2 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
bytes to = 3 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
string amount = 4 [(starbloom.numeric) = {}];
}

message Approval {
google.protobuf.Timestamp timestamp = 1;
bytes owner = 2 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
bytes spender = 3 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
string amount = 4 [(starbloom.numeric) = {}];
}

2. Implement Connector​

connector.go
package main

import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/starbloom-ai/connector"
)

type ContractConnector struct {
*connector.Connector
contractAddress common.Address
client *ethclient.Client
}

func (c *ContractConnector) Start() error {
// Set up event filtering
query := ethereum.FilterQuery{
Addresses: []common.Address{c.contractAddress},
}

// Subscribe to logs
logs := make(chan types.Log)
sub, err := c.client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
return err
}

// Process events
for {
select {
case err := <-sub.Err():
return err
case log := <-logs:
if err := c.processLog(log); err != nil {
return err
}
}
}
}

func (c *ContractConnector) processLog(log types.Log) error {
// Decode event
event, err := c.decodeEvent(log)
if err != nil {
return err
}

// Produce to Kafka
return c.ProduceMessage("mycontract", "events", event)
}

3. Configure Manifest​

manifest.yaml
package: mycontract
version: 1.0.0
chain: ethereum
contract:
address: "0x1234..."
startBlock: 15000000

4. Deploy and Monitor​

# Build connector
go build -o myconnector

# Run locally
./myconnector --config config.yaml

# Monitor logs
tail -f connector.log

Chain-Specific Considerations​

Gas and Transaction Fees​

// Configure gas settings
func configureGas(client *ethclient.Client) (*bind.TransactOpts, error) {
auth := bind.NewKeyedTransactor(privateKey)

gasPrice, err := client.SuggestGasPrice(context.Background())
if err != nil {
return nil, err
}

auth.GasPrice = gasPrice
auth.GasLimit = uint64(300000) // Adjust based on contract

return auth, nil
}

Block Confirmation Requirements​

// Wait for confirmations
func waitConfirmations(client *ethclient.Client, tx *types.Transaction, confirmations uint64) error {
receipt, err := bind.WaitMined(context.Background(), client, tx)
if err != nil {
return err
}

block, err := client.BlockByNumber(context.Background(), nil)
if err != nil {
return err
}

if block.NumberU64()-receipt.BlockNumber.Uint64() < confirmations {
return fmt.Errorf("waiting for confirmations")
}

return nil
}

RPC Rate Limiting​

// Implement rate limiting
func newRateLimitedClient(rawurl string, rps int) (*ethclient.Client, error) {
client, err := ethclient.Dial(rawurl)
if err != nil {
return nil, err
}

return &rateLimitedClient{
Client: client,
limiter: rate.NewLimiter(rate.Limit(rps), 1),
}, nil
}

Testing​

connector_test.go
func TestContractConnector(t *testing.T) {
// Set up test environment
testClient := newTestClient()
testContract := deployTestContract(t, testClient)

// Create connector
connector := &ContractConnector{
contractAddress: testContract.Address(),
client: testClient,
}

// Test event processing
event := generateTestEvent()
err := connector.processEvent(event)
assert.NoError(t, err)

// Verify output
messages := getTestMessages()
assert.Equal(t, 1, len(messages))
assert.Equal(t, event.Amount, messages[0].Amount)
}

Troubleshooting​

Common issues and solutions:

  1. Missing Events

    • Check block range configuration
    • Verify event signatures
    • Monitor RPC node stability
  2. Performance Issues

    • Optimize batch sizes
    • Implement proper caching
    • Use multiple RPC nodes
  3. Data Consistency

    • Handle reorgs properly
    • Implement retry logic
    • Validate event data

Next Steps​