Ethereum/EVM Chains Integration
Prerequisites​
- Smart contract address
- ABI or interface definition
- RPC endpoint access
- Starbloom API key
Implementation Steps​
1. Define Protocol Buffers​
Create a protobuf definition for your contract events:
contract.proto
syntax = "proto3";
package mycontract;
import "google/protobuf/timestamp.proto";
message Transfer {
google.protobuf.Timestamp timestamp = 1;
bytes from = 2 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
bytes to = 3 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
string amount = 4 [(starbloom.numeric) = {}];
}
message Approval {
google.protobuf.Timestamp timestamp = 1;
bytes owner = 2 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
bytes spender = 3 [(starbloom.bytes).format = FORMAT_TYPE_HEX];
string amount = 4 [(starbloom.numeric) = {}];
}
2. Implement Connector​
connector.go
package main
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/starbloom-ai/connector"
)
type ContractConnector struct {
*connector.Connector
contractAddress common.Address
client *ethclient.Client
}
func (c *ContractConnector) Start() error {
// Set up event filtering
query := ethereum.FilterQuery{
Addresses: []common.Address{c.contractAddress},
}
// Subscribe to logs
logs := make(chan types.Log)
sub, err := c.client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
return err
}
// Process events
for {
select {
case err := <-sub.Err():
return err
case log := <-logs:
if err := c.processLog(log); err != nil {
return err
}
}
}
}
func (c *ContractConnector) processLog(log types.Log) error {
// Decode event
event, err := c.decodeEvent(log)
if err != nil {
return err
}
// Produce to Kafka
return c.ProduceMessage("mycontract", "events", event)
}
3. Configure Manifest​
manifest.yaml
package: mycontract
version: 1.0.0
chain: ethereum
contract:
address: "0x1234..."
startBlock: 15000000
4. Deploy and Monitor​
# Build connector
go build -o myconnector
# Run locally
./myconnector --config config.yaml
# Monitor logs
tail -f connector.log
Chain-Specific Considerations​
Gas and Transaction Fees​
// Configure gas settings
func configureGas(client *ethclient.Client) (*bind.TransactOpts, error) {
auth := bind.NewKeyedTransactor(privateKey)
gasPrice, err := client.SuggestGasPrice(context.Background())
if err != nil {
return nil, err
}
auth.GasPrice = gasPrice
auth.GasLimit = uint64(300000) // Adjust based on contract
return auth, nil
}
Block Confirmation Requirements​
// Wait for confirmations
func waitConfirmations(client *ethclient.Client, tx *types.Transaction, confirmations uint64) error {
receipt, err := bind.WaitMined(context.Background(), client, tx)
if err != nil {
return err
}
block, err := client.BlockByNumber(context.Background(), nil)
if err != nil {
return err
}
if block.NumberU64()-receipt.BlockNumber.Uint64() < confirmations {
return fmt.Errorf("waiting for confirmations")
}
return nil
}
RPC Rate Limiting​
// Implement rate limiting
func newRateLimitedClient(rawurl string, rps int) (*ethclient.Client, error) {
client, err := ethclient.Dial(rawurl)
if err != nil {
return nil, err
}
return &rateLimitedClient{
Client: client,
limiter: rate.NewLimiter(rate.Limit(rps), 1),
}, nil
}
Testing​
connector_test.go
func TestContractConnector(t *testing.T) {
// Set up test environment
testClient := newTestClient()
testContract := deployTestContract(t, testClient)
// Create connector
connector := &ContractConnector{
contractAddress: testContract.Address(),
client: testClient,
}
// Test event processing
event := generateTestEvent()
err := connector.processEvent(event)
assert.NoError(t, err)
// Verify output
messages := getTestMessages()
assert.Equal(t, 1, len(messages))
assert.Equal(t, event.Amount, messages[0].Amount)
}
Troubleshooting​
Common issues and solutions:
-
Missing Events
- Check block range configuration
- Verify event signatures
- Monitor RPC node stability
-
Performance Issues
- Optimize batch sizes
- Implement proper caching
- Use multiple RPC nodes
-
Data Consistency
- Handle reorgs properly
- Implement retry logic
- Validate event data
Next Steps​
- Review Best Practices
- Join our Discord for support