Tutorial
The Basics
At it’s core Pythereum provides functionality to interact efficiently with Ethereum nodes via the Ethereum JSON RPC API. This is primarily done with the EthRPC class, and that will be the focus of this section of the tutorial.
In order to get started we need an Ethereum endpoint to connect to, for which you will need a connection URL. This URL can use the following prefixes for fast websocketed functionality:
wss://
ws://
For http connection you can also use, be aware HTTP connections will not be able to make use of websocket pooling:
For this tutorial I will be using a personal node I connect to via websockets which will be referred to as “ENDPOINT_URL”.
Much of Pythereum’s functionality is asynchronous, which means we will need the library asyncio to run our code.
For our first block of code let’s try and get the number of the most recent block in the chain.
1import asyncio
2from pythereum import EthRPC
3
4# Note we use "async def" here as we use asynchronous code in this function
5async def my_first_rpc_connection():
6 # Similarly we use "async with" here to declare our rpc connection
7 # If we are using an http or https url, we should include the following argument to EthRPC:
8 # use_socket_pool=False
9 async with EthRPC(ENDPOINT_URL) as rpc:
10 # We must "await" the result of our function call, as we are waiting for our endpoint to respond
11 block_num = await rpc.get_block_number()
12
13 # Verify that the return is indeed an integer, e.g. 18455132
14 print(block_num)
15
16if __name__ == "__main__":
17 asyncio.run(my_first_rpc_connection())
In this example we do a couple things:
We declare a connection to our RPC endpoint using:
“async with EthRPC(ENDPOINT_URL) as rpc:”
This line starts up a pool of websockets to connect to the given endpoint, when we exit the scope of “async with”, the connection pool is automatically closed.
Any function we call using the “rpc” object we create will use the pool of websockets here to communicate with our endpoint.
We use our connection to call “get_block_number”
Pythereum manages the parameters used and the return types gotten from the function.
Now what if we wanted to do something a little more advanced than getting the latest block number?
Maybe instead of the block number we wanted to get the full block information on the latest block.
1import asyncio
2
3# We add BlockTag to our imports, which is an Enumeration for block number special cases such as the "latest" block
4# In this case we use BlockTag.latest to specify the most recent block
5from pythereum import EthRPC, BlockTag
6
7# pprint helps us print objects in an easy to read manner
8from pprint import pprint
9
10async def my_first_rpc_connection():
11 # If we are using an http or https url, we should include the following argument to EthRPC:
12 # use_socket_pool=False
13 async with EthRPC(ENDPOINT_URL) as rpc:
14 # We set full_object=True here, as this means transactions in the gotten block will be
15 # full transactions, as opposed to transaction hashes
16 # Additionally we do not need to use BlockTag objects here,
17 # we can instead specify an integer block num if we want.
18 block_result = await rpc.get_block_by_number(BlockTag.latest, True)
19
20 # Note the result in this case is of type Block
21 # Block objects have all information on a given block, its transactions, etc, conveniently accessible in one object.
22 pprint(block_result)
23
24 # We can look at the block's transactions by simply doing:
25 pprint(block_result.transactions)
26
27if __name__ == "__main__":
28 asyncio.run(my_first_rpc_connection())
In this case we should get a Block object returned from our get_block_by_number call, which contains block information in a convenient container.
So far we have been doing things that Web3.py can do relatively simply, let’s move on to something that Pythereum introduces:
Batch RPC calls
Instead of getting block information on just the most recent block, let’s get info on 10 blocks, using only one call!
1import asyncio
2from pythereum import EthRPC, BlockTag
3from pprint import pprint
4
5async def my_first_rpc_connection():
6 # If we are using an http or https url, we should include the following argument to EthRPC:
7 # use_socket_pool=False
8 async with EthRPC(ENDPOINT_URL) as rpc:
9 # The syntax for batch calls is lists of each parameter of length k in place of normal individual params.
10 # list of block numbers to get
11 blocks_to_get = [i + 1_000_000 for i in range(10)]
12 # For each block number, specify whether full transactions should be gotten
13 full_obj_array = [True for i in range(10)]
14 block_result = await rpc.get_block_by_number(blocks_to_get, full_obj_array)
15
16 for block in block_result:
17 pprint(block)
18
19if __name__ == "__main__":
20 asyncio.run(my_first_rpc_connection())
With only one RPC call, we have successfully gotten 10 times the data, this helps reduce transmission wait times and lowers demand on your RPC endpoint. This potentially saves you money if you are on a paid endpoint!
This batch calling syntax we have used here is valid for any EthRPC function that takes parameters. (i.e. not get_block_number. There is no reason to batch functions of this type as the same data will be returned for each call)
Taking advantage of Websocket Pooling
So far we have not been taking advantage of Pythereum’s websocket pool. This pool is here to provide a massive speedup over Web3.py and other Ethereum RPC libraries.
The pool is stored as an asynchronous queue of connections to your endpoint, each of which can communicate with it at the same time.
Pythereum uses this to essentially parallelize your calls for you by taking advantage of asyncio.
Let’s see how we would take advantage of multiple sockets to send multiple remote procedure calls at once.
1import asyncio
2
3# We add BlockTag to our imports, which is an Enumeration specifying special inputs for get_block_by_number
4# In this case we use BlockTag.latest to specify the most recent block
5from pythereum import EthRPC, BlockTag
6
7# pprint helps us print objects in an easy to read manner
8from pprint import pprint
9
10async def my_first_rpc_connection():
11 # Since we have used pool_size=3, we can send up to 3 concurrent messages at a given timae
12 # Higher pool sizes will mean more concurrent data can be sent at the cost of more instability in connections
13 # (Connections which have not been interacted with in a long time may automatically close)
14 async with EthRPC(ENDPOINT_URL, pool_size=3) as rpc:
15 # We use an asyncio task group to run all of these tasks and collect their results concurrently
16 async with asyncio.TaskGroup() as tg:
17 # Managed by socket 1
18 block_result = tg.create_task(rpc.get_block_by_number(BlockTag.latest, True))
19 # Managed by socket 2
20 tx_count = tg.create_task(rpc.get_transaction_count_by_number(BlockTag.latest, True))
21 # Managed by socket 3
22 current_gas_price = tg.create_task(rpc.get_gas_price())
23
24 # Getting all this takes as long as the longest of the operations in your TaskGroup
25 pprint(block_result)
26 print(tx_count)
27 print(current_gas_price)
28
29if __name__ == "__main__":
30 asyncio.run(my_first_rpc_connection())
This provides a speedup of up to n times where n is:
min(number of functions you are calling, number of sockets in the pool)
This running multiple remote procedure calls at once can also be done using asyncio.gather as follows:
1res = await asyncio.gather(
2 rpc.get_block_by_number(BlockTag.latest, True),
3 rpc.get_transaction_count_by_number(BlockTag.latest, True),
4 rpc.get_gas_price()
5)
Let’s take a look at another useful thing Pythereum introduces.
Subscriptions
Most modern Ethereum nodes support connections via websockets along which a “subscription” can be made.
These subscriptions will continuously output data as it becomes available, such as the headers of all new blocks that are created.
Here is a brief example of how to create a continuous subscription:
1import asyncio
2from pythereum import EthRPC, SubscriptionType
3from pprint import pprint
4
5async def my_first_subscription():
6 async with EthRPC(ENDPOINT_URL, pool_size=1) as rpc:
7 # Declare a subscription to constantly receive data about new block headers
8 async with rpc.subscribe(SubscriptionType.new_heads) as sc:
9 # Whenever new headers are created this loop will receive the result
10 async for header in sc.recv():
11 pprint(header)
12
13if __name__ == "__main__":
14 asyncio.run(my_first_subscription())
This is useful for getting live data on the goings-on of transactions on the chain. This has particular applications in automated traders paying attention to market data, or for getting the right prices to pay for gas.
A brief example of a subscription in use is available in the demo folder.
Transactions
One primary use of the blockchain is sending transactions between accounts, with data and amounts of eth attached.
With Pythereum this is made as simple as possible, especially when combined with the eth_account library. We use transactions as defined in EIP-1559, for the greatest level of efficiency.
1import asyncio
2from eth_account import Account
3from pythereum import EthRPC, Transaction
4
5async def my_first_transaction():
6 # Create an arbitrary account wallet
7 acct = Account.create()
8 # Create an arbitrary transaction (most likely will not work, simply an example of fields a transaction has)
9 tx = Transaction(
10 from_address=acct.address,
11 to_address="0x5fC2E691E520bbd3499f409bb9602DBA94184672",
12 value=1,
13 chain_id=1,
14 nonce=1,
15 max_fee_per_gas=1
16 gas=1
17 max_priority_fee_per_gas=1
18 )
19
20 signed_tx = acct.sign_transaction(tx).rawTransaction
21
22 async with EthRPC(ENDPOINT_URL, pool_size=1) as rpc:
23 await rpc.send_raw_transaction(signed_tx)
24
25if __name__ == "__main__":
26 asyncio.run(my_first_transaction())
This creation of transactions is all well and good but it would be great if we could automate some of it.
With Pythereum’s NonceManager and GasManager classes that can be done very simply, and with a high degree of control!
Smart Contract / ABI Interaction
Some transactions require data sent with them, for example if we are interacting with a smart contract, we need to specify what methods we would like to work with, etc. The methods a smart contract has available are shown in it’s ABI, with information on data types for each parameter and returns. In order to call these methods we must appropriately encode our data to send such that it abides by the ABI’s rules.
For example if we are interacting with a method “buyShares(uint256 number, uint256 price)”, we need to do a couple things:
Prepare a number of shares we would like to buy n at price p
Extract a four byte function selector (this is a four byte value which represents the function to call)
This is done by taking the function name we want to call without variable names or spaces like “buyShares(uint256,uint256)”
We then call “eth_utils.function_signature_to_4byte_selector” on it, giving us the first four bytes of our data.
Encode the data we would like to pass as parameters to the function (n and p)
We use “eth_abi.encode([“uint256”, “uint256”], [n, p])” here, to encode our data to the correct data types
We then concatenate the selector with the encoded data and voila, we have a complete smart contract function call to pass alongside our transaction
While the end goal is for Pythereum to be able to encode and decode ABI data by itself, in its current state we can use eth_utils and eth_abi to help us out in development.
1from eth_utils import function_signature_to_4byte_selector
2from eth_abi import encode
3
4... # Generic RPC setup code inside a class
5
6async def send_buy_transaction(self, quantity: int, price: int):
7 # Get signature and convert it to hexadecimal
8 signature = function_signature_to_4byte_selector("buyShares(uint256,uint256)").hex()
9 # Get encoded function parameters and convert it to hexadecimal
10 encoded_params = encode(["uint256", "uint256"], [quantity, price]).hex()
11 # Combine signature and params for final data to send
12 final_data = f"0x{signature}{encoded_params}"
13
14 tx = Transaction(
15 from_address=self.acct.address, # My address
16 to_address="0x5fC2E691E520bbd3499f409bb9602DBA94184672", # Smart contract address
17 value=1, # Value to send alongside data (maybe to pay for shares)
18 chain_id=1,
19 data=final_data # Smart contract call data
20 )
21
22 # These managers are covered in the next section of the tutorial!
23 async with NonceManager(self.rpc) as nm:
24 await nm.fill_transaction(tx)
25 async with self.gm.informed_manager() as im:
26 im.fill_transaction(tx)
27
28 signed_tx = self.acct.sign_transaction(tx).rawTransaction.hex()
29
30 await self.rpc.send_raw_transaction(signed_tx)
Gas and Nonce Management
Let’s improve our previous transaction by automatically managing values!
1import asyncio
2from eth_account import Account
3from pythereum import EthRPC, Transaction, GasManager, NonceManager
4
5async def my_first_transaction():
6 # Create an arbitrary account wallet
7 acct = Account.create()
8 # Create an arbitrary transaction (most likely will not work, simply an example of fields a transaction has)
9 tx = Transaction(
10 from_address=acct.address,
11 to_address="0x5fC2E691E520bbd3499f409bb9602DBA94184672",
12 value=1,
13 chain_id=1,
14 )
15
16 manager_rpc = EthRPC(ENDPOINT_URL, pool_size=2)
17
18 # Manually start the pool of our manager RPC for use in our gas and nonce managers
19 await manager_rpc.start_pool()
20
21 # Gas and Nonce managers need an RPC connection to get block context information
22 gm = GasManager(manager_rpc)
23
24 # Gas managers have multiple possible strategies, we will simply be using naive managers here
25 # Fills gas, max_fee_per_gas, max_priority_fee_per_gas values in the transaction
26 # This is done by taking the average of those values from the previous block (though different strategies can be specified)
27 async with gm.naive_manager() as nvm:
28 await nvm.fill_transaction(tx)
29
30 # Gets the nonce of the given account, for new accounts this will be 0
31 async with NonceManager(manager_rpc) as nm:
32 await nm.fill_transaction(tx)
33
34 signed_tx = acct.sign_transaction(tx).rawTransaction
35
36 await manager_rpc.send_raw_transaction(signed_tx)
37
38 # The pool should be closed at the end when EthRPC not used in an "async with" statement
39 await manager_rpc.close_pool()
40
41if __name__ == "__main__":
42 asyncio.run(my_first_transaction())
Now we can automatically fill our transactions without having to calculate these values ourselves!
Gas Managers
Currently, there are two types of GasManager, the informed_manager and the naive_manager.
The naive gas manager fills transactions using strategies from a given GasStrategy object or dictionary of GasStrategy objects.
The available strategies are as follows:
mean_price - Price is filled with mean of previous block prices
median_price - Price filled with median of previous block prices
mode_price - Price filled with modal value of previous block prices
max_price - Price filled with max price of previous block
min_price - Price filled with min price of previous block
upper_quartile_price - Price filled with upper quartile value of previous block
lower_quartile_price - Price filled with lower quartile value of previous block
We can use either one global strategy for all 3 gas price values (gas, maxFeePerGas, maxPriorityFeePerGas) or we can use separate strategies.
The syntax is as follows:
1# With single strategy
2async with gm.naive_manager() as nvm:
3 await nvm.fill_transaction(tx, strategy=GasStrategy.lower_quartile_price)
4
5# With separate strategies
6strategy_dict = {
7 "gas": GasStrategy.max_price,
8 "maxFeePerGas": GasStrategy.lower_quartile_price,
9 "maxPriorityFeePerGas": GasStrategy.mean_price,
10}
11async with gm.naive_manager() as nvm:
12 await nvm.fill_transaction(tx, strategy=strategy_dict)
As is evident from the name, this gas management strategy is naive, not learning from previous transactions.
If we want to price our gas based on previous transaction results, we should instead use informed_manager.
Pricing gas in this way is important in long term operation where many transactions are made. This is because when a transaction succeeds we can lower our gas prices to save eth, and when a transaction fails we can up gas prices to ensure execution.
Below is an example of how an informed manager is used.
1# Generic setup code similar to before
2...
3
4# Fill our transaction with initial values in similar way to naive manager
5async with gm.informed_manager() as im:
6 im.fill_transaction(tx)
7
8# Sign and submit our transaction
9signed_tx = acct.sign_transaction(tx).rawTransaction
10tx_hash = await manager_rpc.send_raw_transaction(signed_tx)
11
12# Find out whether transaction succeeded by getting transaction receipt
13tx_result = await manager_rpc.get_transaction_receipt(tx_hash, retries=3)
14
15# Open up our informed manager again, this time with a success multiplier and failure multiplier
16async with gm.informed_manager(
17 success_multiplier=0.9,
18 fail_multiplier=1.3
19) as im:
20 if tx_result.status == 0:
21 # Tell the gas manager that the execution failed
22 # This way next time the price of maxPriorityFeePerGas will be multiplied by the fail multiplier
23 # This should help guarantee success of execution
24 # If this does not work the tx may have failed because gas was too low, in which case use im.gas_fail()
25 im.execution_fail()
26 else:
27 # Lowers priority fee by success multiplier such that next time transactions may be cheaper
28 im.execution_success()
29
30# The next time we need to fill a transaction using this same gas manager instance
31# The informed manager will have adjusted prices accordingly
These gas management types both have their own use cases, and it is up to the user which one suits them best.
The Nonce Manager
The nonce manager works somewhat differently to GasManager objects in that there are no NonceManager sub strategies.
The nonce manager gets the number of transactions an account has sent, and uses that to determine the current nonce to apply to a transaction to send.
The syntax for this is visible in earlier examples.
A brief more complete example of nonce and gas management in use is available in the demo folder
Block Builder Submission
This area of the tutorial is for more advanced users who know what they are doing with block builders and submissions. The library is additionally still under-tested in some areas related to block submission, any feedback here is always welcome.
If this is not you, do not worry, this is simply a section to cater to more advanced users, simply continue to the next section.
When transactions are sent out for execution, they are stored in a public memory pool (mempool).
Other users can read information in this mempool and act on the information in there. (maybe you are trying to buy a friendtech share and someone outbids you after seeing your bid in the memory pool, causing you to lose out)
As such, submitting your transactions to a Block Builder directly without them floating around in the mempool may be desirable.
Pythereum introduces the BuilderRPC class to facilitate this.
Since most block builders do not support websocket connection, this BuilderRPC class communicates over HTTP using aiohttp.
There are many block builders, each of which compete to create the next block, as such in order to have the best likelihood for your transaction to be included in the final minted block, it is advisable to submit to many of them at once.
Here is an example of block builder submission to the TitanBuilder and Builder0x69. A snapshot of the most popular block builders can be found at mevboost.pics
1import asyncio
2from pythereum import (
3 Transaction, EthRPC, GasManager, NonceManager,
4 BuilderRPC, TitanBuilder, Builder0x69, HexStr,
5)
6from eth_account import Account
7
8async def my_first_builder_submission():
9 acct = Account.create()
10 # Create an arbitrary transaction (minus gas and nonce values)
11 tx = Transaction(
12 from_address=acct.address,
13 to_address="0x5fC2E691E520bbd3499f409bb9602DBA94184672",
14 value=1,
15 chain_id=1,
16 )
17
18 manager_rpc = EthRPC(erpc_url, 2)
19 gm = GasManager(manager_rpc)
20 await manager_rpc.start_pool()
21
22 async with NonceManager(manager_rpc) as nm:
23 await nm.fill_transaction(tx)
24
25 async with gm.naive_manager() as nvm:
26 await nvm.fill_transaction(tx)
27
28 signed_tx = acct.sign_transaction(tx).rawTransaction
29
30 # A list of block builders to submit to is used, in this case TitanBuilder and Builder0x69
31 # Pythereum comes with inbuilt support for many of the most popular block builders
32 # Support for more can be added by inheriting from Pythereum's Builder base class
33 # We pass in the private key of our account to the builder RPC to sign payloads with flashbots headers (not always relevant but sometiems necessary)
34 async with BuilderRPC(
35 [TitanBuilder(), Builder0x69()], private_key=acct.key
36 ) as brpc:
37 msg = await brpc.send_private_transaction(HexStr(signed_tx))
38
39 print(msg)
40 await manager_rpc.close_pool()
41
42if __name__ == "__main__":
43 asyncio.run(my_first_builder_submission())
This is very similar to an example in the demo folder,
In addition to single transactions, there exists support for Bundle submissions (which are the private transaction equivalent to batch sending transactions), and mevboost bundles (a new transaction protocol popularised by the flashbots builder).
Examples below:
1# Bundle submission
2
3# Bundles have a large amount of optional parameters
4# Check your chosen block builder's documentation for more info (e.g. https://docs.titanbuilder.xyz/api/eth_sendbundle)
5bd = Bundle(
6 txs=[tx for tx in signed_transactions],
7)
8
9async with BuilderRPC(
10 [TitanBuilder()], private_key=acct.key
11) as brpc:
12 msg = await brpc.send_bundle(bd)
13
14print(msg)
1# MEVBundle submission
2
3# MEVBundles have a large amount of optional parameters, similar to normal bundles,
4# Check Pythereum's implementation and flashbots documentation for info
5bd = MEVBundle(
6 block=(await rpc.get_block_number()) + 1, # The bundle is valid for the next block
7 refund_addresses=["0x5fC2E691E520bbd3499f409bb9602DBA94184672"],
8 refund_percentages=[100],
9)
10
11async with BuilderRPC(
12 [FlashbotsBuilder()], private_key=acct.key
13) as brpc:
14 msg = await brpc.send_mev_bundle(bd)
15
16print(msg)
Mempool Access
The memory pool (mempool) holds proposed transactions in a public place before they are either added to a block or discarded.
Accessing the possible transactions that others may make in the future is an important way to gain an advantage when interacting with the Ethereum blockchain.
The way to access this mempool is different depending on what type of node you are connected to, for geth or OpenEthereum parity nodes there are inbuilt methods for this.
1await rpc.get_mempool_geth()
2await rpc.get_mempool_parity()
However if you are connected to another ethereum endpoint type there may not be a simple method to do this. Instead it is possible to use subscriptions to build up your own picture of the mempool using the new pending transactions subscription type.
ABI Contract Calls
When calling smart contracts, one needs to know the signature of the functions they desire to call.
Contracts, when compiled, produce ABIs which can be publicly viewed on etherscan. These ABIs have enough information to allow us to call functions with our EthRPC.
A smart contract function call looks as follows:
0x{4 byte function selector}{encoded parameters}
The function specifier is determined from the function name and parameter types it takes in as follows:
“exampleFunctionName(address,uint256,bool)” -> encoding function -> 4 byte selector
Pythereum provides functionality to automatically unpack ABIs into objects. These objects generate functions based on those contained in the ABI so users can intuitively call them as follows:
1abi = ContractABI(ABI_DATA) # Contains a function swapOwners which takes in two accounts
2tx = Transaction(
3 to=contract_address
4 ... # Automatically encodes the swapOrders function call as per the ABI
5 data=abi.swap_owners(address_one, address_two) # Can be called using snake_case for extra pythonic-ness
6)
It is not currently possible to automatically get ABI or un-compiled contract data without external block explorers
Deploying a Smart Contract
Deploying a contract is much the same as sending any regular transaction with some small changes.
1import asyncio
2import solcx # Solidity compiler
3from eth_account import Account
4from pythereum import EthRPC, Transaction
5
6async def my_first_contract(contract_bytecode: str):
7 # Create an arbitrary account wallet
8 acct = Account.create()
9
10 tx = Transaction(
11 from_address=acct.address,
12 to_address=None, # The to address is 0x80, which is equivalent to None
13 value=0, # Contracts must be sent to this address to publish them
14 chain_id=1,
15 nonce=0,
16 max_fee_per_gas=0,
17 max_priority_fee_per_gas=0,
18 data=contract_bytecode,
19 )
20
21 async with EthRPC(ENDPOINT_URL, pool_size=1) as rpc:
22 gas_price = await rpc.estimate_gas(tx)
23 tx["gas"] = gas_price
24 signed_tx = acct.sign_transaction(tx).rawTransaction
25
26 print(await rpc.send_raw_transaction(signed_tx)) # Print contract address
27
28
29if __name__ == "__main__":
30 bytecode = solcx.compile_files( # Compile some solidity code with solcx
31 ["foo.sol"],
32 output_values=["abi", "bin-runtime"],
33 solc_version="0.7.0"
34 )["<stdin>:foo"]["bin"]
35 asyncio.run(my_first_transaction(bytecode))