import asyncio
import json
import aiohttp
from abc import ABC
from typing import Any
from eth_account import Account, messages
from eth_utils import keccak
from pythereum.common import HexStr
from pythereum.dclasses import Bundle, MEVBundle
from pythereum.exceptions import ERPCBuilderException, ERPCRequestException
from pythereum.rpc import parse_results
[docs]class Builder(ABC):
def __init__(
self,
url: str,
private_transaction_method: str = "eth_sendPrivateTransaction",
bundle_method: str = "eth_sendBundle",
cancel_bundle_method: str = "eth_cancelBundle",
mev_bundle_method: str = "mev_sendBundle",
bundle_params: set = None,
private_key: str = None,
):
if bundle_params is None:
bundle_params = {
"txs",
"blockNumber",
"minTimestamp",
"maxTimestamp",
"revertingTxHashes",
"replacementUuid",
"refundPercent",
"refundRecipient",
"refundTxHashes",
}
self.url = url
self.private_transaction_method = private_transaction_method
self.bundle_method = bundle_method
self.cancel_bundle_method = cancel_bundle_method
self.bundle_params = bundle_params
self.mev_bundle_method = mev_bundle_method
self.private_key = HexStr(private_key) if private_key is not None else None
super().__init__()
[docs]class TitanBuilder(Builder):
def __init__(self, private_key: str | HexStr | None = None):
super().__init__(
"https://rpc.titanbuilder.xyz",
"eth_sendPrivateTransaction",
"eth_sendBundle",
"eth_cancelBundle",
"mev_sendBundle",
{
"txs",
"blockNumber",
"minTimestamp",
"maxTimestamp",
"revertingTxHashes",
"replacementUuid",
"refundPercent",
"refundIndex",
"refundRecipient",
},
private_key,
)
[docs]class BeaverBuilder(Builder):
def __init__(self, private_key: str | HexStr | None = None):
super().__init__(
"https://rpc.beaverbuild.org/",
"eth_sendPrivateRawTransaction",
"eth_sendBundle",
"eth_cancelBundle",
"mev_sendBundle",
{
"txs",
"blockNumber",
"minTimestamp",
"maxTimestamp",
"revertingTxHashes",
"replacementUuid",
"refundPercent",
"refundRecipient",
"refundTxHashes",
},
private_key,
)
[docs]class RsyncBuilder(Builder):
def __init__(self, private_key: str | HexStr | None = None):
super().__init__(
"https://rsync-builder.xyz/",
"eth_sendPrivateRawTransaction",
"eth_sendBundle",
"eth_cancelBundle",
"mev_sendBundle",
{
"txs",
"blockNumber",
"minTimestamp",
"maxTimestamp",
"revertingTxHashes",
"replacementUuid",
"refundPercent",
"refundRecipient",
"refundTxHashes",
},
private_key,
)
[docs]class Builder0x69(Builder):
def __init__(self, private_key: str | HexStr | None = None):
super().__init__(
"https://builder0x69.io/",
"eth_sendRawTransaction",
"eth_sendBundle",
"eth_cancelBundle",
"mev_sendBundle",
{
"txs",
"blockNumber",
"minTimestamp",
"maxTimestamp",
"revertingTxHashes",
"uuid" "replacementUuid",
"refundPercent",
"refundRecipient",
},
private_key,
)
[docs]class FlashbotsBuilder(Builder):
def __init__(self, private_key: str | HexStr | None = None):
super().__init__(
"https://relay.flashbots.net",
"eth_sendPrivateRawTransaction",
"eth_sendBundle",
"eth_cancelBundle",
"mev_sendBundle",
{
"txs",
"blockNumber",
"minTimestamp",
"maxTimestep",
"revertingTxHashes",
"replacementUuid",
},
private_key,
)
[docs]class BuilderRPC:
"""
An RPC class designed for sending raw transactions and bundles to specific block builders
"""
def __init__(
self,
builders: Builder | list[Builder],
private_key: str
| bytes
| HexStr
| list[str]
| list[bytes]
| list[HexStr] = None,
):
if not isinstance(builders, list):
builders = [builders]
self.builders = builders
if isinstance(private_key, list):
for key, builder in zip(private_key, self.builders):
builder.private_key = HexStr(key)
elif private_key is not None:
for builder in self.builders:
builder.private_key = HexStr(private_key)
self.session = None
self._id = 0
def _next_id(self) -> None:
self._id += 1
def _build_json(
self, method: str, params: list[Any], increment: bool = True
) -> dict:
"""
:param method: ethereum RPC method
:param params: list of parameters to use in the function call, cast to string so Hex data may be used
:param increment: Boolean determining whether self._id will be advanced after the json is built
:return: json dictionary
"""
res = {"jsonrpc": "2.0", "method": method, "params": params, "id": self._id}
if increment:
self._next_id()
return res
async def _send_message(
self,
builder: Builder,
method: str | list[str],
params: list[Any],
use_flashbots_signature: bool = False,
):
if self.session is not None:
constructed_json = self._build_json(method, params)
header_data = (
builder.get_flashbots_header(json.dumps(constructed_json))
if use_flashbots_signature
else builder.get_header(json.dumps(constructed_json))
)
async with self.session.post(
builder.url, json=constructed_json, headers=header_data
) as resp:
if resp.status != 200:
raise ERPCRequestException(
resp.status,
f"Invalid BuilderRPC request for url {builder.url} of form "
f"(method={method}, params={params})",
)
msg = await resp.json()
else:
raise ERPCBuilderException(
"BuilderRPC session not started. Either context manage this class or call BuilderRPC.start_session()"
)
return parse_results(msg, builder=builder.url)
[docs] async def start_session(self):
self.session = aiohttp.ClientSession()
[docs] async def close_session(self):
await self.session.close()
[docs] async def send_private_transaction(
self,
tx: str | HexStr,
extra_info: Any = None,
) -> Any:
tx_methods = [builder.private_transaction_method for builder in self.builders]
tx = [
builder.format_private_transaction(tx, extra_info)
for builder in self.builders
]
return await asyncio.gather(
*(
self._send_message(builder, method, transaction)
for builder, method, transaction in zip(self.builders, tx_methods, tx)
)
)
[docs] async def send_bundle(
self,
bundle: Bundle,
) -> Any:
tx_methods = [builder.bundle_method for builder in self.builders]
tx = [builder.format_bundle(bundle) for builder in self.builders]
return await asyncio.gather(
*(
self._send_message(builder, method, transaction)
for builder, method, transaction in zip(self.builders, tx_methods, tx)
)
)
[docs] async def cancel_bundle(
self,
replacement_uuids: str | HexStr,
):
cancel_methods = [builder.cancel_bundle_method for builder in self.builders]
replacement_uuids = [replacement_uuids for _ in self.builders]
return await asyncio.gather(
*(
self._send_message(builder, method, uuid)
for builder, method, uuid in zip(
self.builders, cancel_methods, replacement_uuids
)
)
)
[docs] async def send_mev_bundle(self, bundle: MEVBundle) -> Any:
mev_methods = [builder.mev_bundle_method for builder in self.builders]
bundles = [builder.format_mev_bundle(bundle) for builder in self.builders]
return await asyncio.gather(
*(
self._send_message(builder, method, bundle, True)
for builder, method, bundle in zip(self.builders, mev_methods, bundles)
)
)
async def __aenter__(self):
await self.start_session()
return self
async def __aexit__(self, *args):
await self.close_session()
# A list containing all the current supported builders. Can be passed in to a BuilderRPC to send to all
ALL_BUILDERS = [
TitanBuilder(),
Builder0x69(),
RsyncBuilder(),
BeaverBuilder(),
FlashbotsBuilder(),
]