Source code for pythereum.gas_managers

import statistics
from contextlib import asynccontextmanager

from pythereum.exceptions import ERPCManagerException, ERPCInvalidReturnException
from pythereum.common import EthDenomination, GasStrategy, BlockTag
from pythereum.rpc import EthRPC
from pythereum.dclasses import TransactionFull, Transaction


[docs]class NaiveGasManager: """ This NaiveGasManager can fill transactions by calling: await nm.fill_transaction(tx, strategy=GasStrategy object) strategy can be replaced with dict of form: {"gas": GasStrategy object, "maxFeePerGas": GasStrategy object, "maxPriorityFeePerGas": GasStrategy object} """ def __init__( self, rpc: EthRPC = None, max_gas_price: float | EthDenomination | None = None, max_fee_price: float | EthDenomination | None = None, max_priority_price: float | EthDenomination | None = None, ): self.rpc = rpc self.latest_transactions = None self.max_gas_price = int( max_gas_price if max_gas_price is not None else EthDenomination.milli ) self.max_fee_price = int( max_fee_price if max_fee_price is not None else EthDenomination.milli ) self.max_priority_price = int( max_priority_price if max_priority_price is not None else EthDenomination.milli ) async def _get_latest_receipts( self, use_stored_results: bool = False ) -> list[TransactionFull]: """ Returns a tuple of the latest transaction receipts. These are gotten by getting the latest block info and requesting transaction receipts for each transaction. To avoid doing this for every call, there is the option to use stored results from the most recent request. """ if use_stored_results: transactions = self.latest_transactions else: latest_block = await self.rpc.get_block_by_number(BlockTag.latest, True) transactions = latest_block.transactions self.latest_transactions = transactions if len(transactions) == 0: raise ERPCInvalidReturnException( f"Invalid vlue: {transactions} returned from _get_latest_receipts" ) return transactions
[docs] async def suggest( self, strategy: GasStrategy, attribute: str, use_stored_results: bool = False ) -> float: transactions = await self._get_latest_receipts(use_stored_results) prices = [ x.__getattribute__(attribute) for x in transactions if x.__getattribute__(attribute) is not None ] match strategy: case GasStrategy.min_price: res = min(prices) case GasStrategy.max_price: res = max(prices) case GasStrategy.median_price: res = statistics.median(prices) case GasStrategy.mean_price: res = statistics.mean(prices) case GasStrategy.mode_price: res = statistics.mode(prices) case GasStrategy.upper_quartile_price: # Quantiles require enough data points or they will crash try: res = statistics.quantiles(prices, n=4)[2] except statistics.StatisticsError: res = statistics.mean(prices) case GasStrategy.lower_quartile_price: # Quantiles require enough data points or they will crash try: res = statistics.quantiles(prices, n=4)[0] except statistics.StatisticsError: res = statistics.mean(prices) case GasStrategy.custom: res = self.custom_pricing(prices) case _: raise ERPCManagerException(f"Invalid strategy of type {strategy} used") return round(res)
[docs] async def fill_transaction( self, tx: dict | Transaction | list[dict] | list[Transaction], strategy: GasStrategy | dict[str, GasStrategy] = GasStrategy.mean_price, use_stored: bool = False, ) -> None: if isinstance( strategy, GasStrategy ): # Allows for separation of strategy types for each type strategy = { "gas": strategy, "maxFeePerGas": strategy, "maxPriorityFeePerGas": strategy, } if isinstance(tx, list): for sub_tx in tx: sub_tx["gas"] = min( await self.suggest(strategy["gas"], "gas", use_stored), self.max_gas_price, ) sub_tx["maxFeePerGas"] = min( await self.suggest( strategy["maxFeePerGas"], "max_fee_per_gas", True ), self.max_fee_price, ) sub_tx["maxPriorityFeePerGas"] = min( await self.suggest( strategy["maxPriorityFeePerGas"], "max_priority_fee_per_gas", True, ), self.max_priority_price, ) elif tx is not None: tx["gas"] = min( await self.suggest(strategy["gas"], "gas", use_stored), self.max_gas_price, ) tx["maxFeePerGas"] = min( await self.suggest(strategy["maxFeePerGas"], "max_fee_per_gas", True), self.max_fee_price, ) tx["maxPriorityFeePerGas"] = min( await self.suggest( strategy["maxPriorityFeePerGas"], "max_priority_fee_per_gas", True ), self.max_priority_price, )
[docs] def custom_pricing(self, prices): # Override this function when subclassing for custom pricing implementation raise ERPCManagerException("Custom pricing strategy not defined for this class")
[docs]class InformedGasManager: """ This InformedGasManager can fill transactions by calling im.fill_transaction(tx) Note that this is not asynchronous like other transaction filling methods, as it relies on no external info To tell the gas manager the status of a transaction call one of the following functions: im.gas_fail() # For when a transaction has failed due to gas too low im.execution_fail() # For when a transaction has failed due to an execution reversion im.execution_success() # For when a transaction has succeeded in execution """ def __init__( self, rpc: EthRPC = None, max_gas_price: float | EthDenomination | None = None, max_fee_price: float | EthDenomination | None = None, max_priority_price: float | EthDenomination | None = None, fail_multiplier: float = 1.25, success_multiplier: float = 0.95, ): self.rpc = rpc self.latest_transactions = None self.prices = {"gas": 0, "maxFeePerGas": 0, "maxPriorityFeePerGas": 0} self.max_prices = { "gas": int( max_gas_price if max_gas_price is not None else EthDenomination.milli ), "maxFeePerGas": int( max_fee_price if max_fee_price is not None else EthDenomination.milli ), "maxPriorityFeePerGas": int( max_priority_price if max_priority_price is not None else EthDenomination.milli ), } self.fail_multiplier = fail_multiplier self.success_multiplier = success_multiplier async def _set_initial_price(self): latest_block = await self.rpc.get_block_by_number(BlockTag.latest, True) transactions = latest_block.transactions self.latest_transactions = transactions if len(transactions) == 0: raise ERPCInvalidReturnException( f"Invalid vlue: {transactions} returned from _get_latest_receipts" ) for key, attribute in zip( self.prices.keys(), ("gas", "max_fee_per_gas", "max_priority_fee_per_gas") ): self.prices[key] = round( statistics.mean( [ x.__getattribute__(attribute) for x in transactions if x.__getattribute__(attribute) is not None ] ) )
[docs] def gas_fail(self): self.prices["gas"] = int(self.fail_multiplier * self.prices["gas"])
[docs] def execution_fail(self): self.prices["maxPriorityFeePerGas"] = int( self.fail_multiplier * self.prices["maxPriorityFeePerGas"] ) self.prices["maxFeePerGas"] = max( self.prices["maxFeePerGas"], self.prices["maxPriorityFeePerGas"] )
[docs] def execution_success(self): self.prices["maxPriorityFeePerGas"] = int( self.success_multiplier * self.prices["maxPriorityFeePerGas"] ) self.prices["maxFeePerGas"] = max( self.prices["maxFeePerGas"], self.prices["maxPriorityFeePerGas"] )
[docs] def fill_transaction(self, tx: dict | Transaction | list[dict] | list[Transaction]): if isinstance(tx, list): for sub_tx in tx: sub_tx["gas"] = min(self.prices["gas"], self.max_prices["gas"]) sub_tx["maxFeePerGas"] = min( self.prices["maxFeePerGas"], self.max_prices["maxFeePerGas"] ) sub_tx["maxPriorityFeePerGas"] = min( self.prices["maxPriorityFeePerGas"], self.max_prices["maxPriorityFeePerGas"], ) else: tx["gas"] = min(self.prices["gas"], self.max_prices["gas"]) tx["maxFeePerGas"] = min( self.prices["maxFeePerGas"], self.max_prices["maxFeePerGas"] ) tx["maxPriorityFeePerGas"] = min( self.prices["maxPriorityFeePerGas"], self.max_prices["maxPriorityFeePerGas"], )
[docs]class GasManager: """ Class which allows access to different kinds of gas management strategies and stores their data. Accepts an EthRPC instance or URL to be used for its gas management strategies It is recommended to start the pool for a given EthRPC instance before using a gas management strategy, otherwise the program will slow down as the pool will be opened and then closed """ def __init__( self, rpc: "EthRPC | str | None" = None, max_gas_price: float | EthDenomination | None = None, max_fee_price: float | EthDenomination | None = None, max_priority_price: float | EthDenomination | None = None, ): if isinstance(rpc, str): rpc = EthRPC(rpc, 1) self.rpc = rpc self.max_gas_price = int( max_gas_price if max_gas_price is not None else EthDenomination.milli ) self.max_fee_price = int( max_fee_price if max_fee_price is not None else EthDenomination.milli ) self.max_priority_price = int( max_priority_price if max_priority_price is not None else EthDenomination.milli ) self.naive_latest_transactions = None self.informed_tx_prices = { "gas": 0, "maxFeePerGas": 0, "maxPriorityFeePerGas": 0, } def __str__(self): return f"GasManager(rpc={self.rpc.__str__()})" def __repr__(self): return f"GasManager(rpc={self.rpc.__repr__()})"
[docs] def clear_informed_info(self): """ Clears stored info about informed_manager from the GasManager object """ self.informed_tx_prices["gas"] = 0 self.informed_tx_prices["maxFeePerGas"] = 0 self.informed_tx_prices["maxPriorityFeePerGas"] = 0
[docs] def clear_naive_info(self): """ Clears stored info about naive_manager from GasManager object """ self.naive_latest_transactions = None
[docs] def clear_info(self): """ Clears all stored information in the GasManager object """ self.clear_naive_info() self.clear_informed_info()
[docs] @asynccontextmanager async def naive_manager(self) -> NaiveGasManager: """ Creates, yields and manages a NaiveGasManager object. This NaiveGasManager can fill transactions by calling await nm.fill_transaction(tx, strategy=GasStrategy object) strategy can be replaced with dict of form: {"gas": GasStrategy object, "maxFeePerGas": GasStrategy object, "maxPriorityFeePerGas": GasStrategy object} """ naive = NaiveGasManager(self.rpc) connected = self.rpc.pool_connected() if self.naive_latest_transactions is not None: naive.latest_transactions = self.naive_latest_transactions try: if not connected: await naive.rpc.start_pool() yield naive finally: self.naive_latest_transactions = naive.latest_transactions if not connected: await naive.rpc.close_pool()
[docs] @asynccontextmanager async def informed_manager( self, success_multiplier: float = 0.95, fail_multiplier: float = 1.25, initial_gas_price: int = None, initial_fee_price: int = None, initial_priority_fee_price: int = None, ) -> InformedGasManager: """ Creates, yields and manages an InformedGasManager object. This InformedGasManager can fill transactions by calling im.fill_transaction(tx) Note that this is not asynchronous like other transaction filling methods, as it relies on no external info To tell the gas manager the status of a transaction call one of the following functions: im.gas_fail() # For when a transaction has failed due to gas too low im.execution_fail() # For when a transaction has failed due to an execution reversion im.execution_success() # For when a transaction has succeeded in execution """ informed = InformedGasManager( self.rpc, success_multiplier=success_multiplier, fail_multiplier=fail_multiplier, ) await informed._set_initial_price() if initial_gas_price is not None: informed.prices["gas"] = initial_gas_price elif self.informed_tx_prices["gas"] != 0: informed.prices["gas"] = self.informed_tx_prices["gas"] if initial_fee_price is not None: informed.prices["maxFeePerGas"] = initial_fee_price elif self.informed_tx_prices["maxFeePerGas"] != 0: informed.prices["maxFeePerGas"] = self.informed_tx_prices["maxFeePerGas"] if initial_priority_fee_price is not None: informed.prices["maxPriorityFeePerGas"] = initial_priority_fee_price elif self.informed_tx_prices["maxPriorityFeePerGas"] != 0: informed.prices["maxPriorityFeePerGas"] = self.informed_tx_prices[ "maxPriorityFeePerGas" ] connected = informed.rpc.pool_connected() try: if not connected: await informed.rpc.start_pool() yield informed finally: self.informed_tx_prices["gas"] = informed.prices["gas"] self.informed_tx_prices["maxFeePerGas"] = informed.prices["maxFeePerGas"] self.informed_tx_prices["maxPriorityFeePerGas"] = informed.prices[ "maxPriorityFeePerGas" ] if not connected: await informed.rpc.close_pool()