Skip to content

Commit 2d6114c

Browse files
authored
fix: Apply Pallas' breaking changes to new reducers (#100)
1 parent a99dec4 commit 2d6114c

3 files changed

Lines changed: 82 additions & 61 deletions

File tree

src/reducers/asset_holders_by_asset_id.rs

Lines changed: 59 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use pallas::ledger::traverse::MultiEraOutput;
2-
use pallas::ledger::traverse::{MultiEraBlock, OutputRef, Subject};
1+
use pallas::ledger::traverse::{Asset, MultiEraOutput};
2+
use pallas::ledger::traverse::{MultiEraBlock, OutputRef};
33
use serde::Deserialize;
44

5-
use pallas::crypto::hash::Hash;
65
use crate::{crosscut, model, prelude::*};
6+
use pallas::crypto::hash::Hash;
77

88
use crate::crosscut::epochs::block_epoch;
99
use std::str::FromStr;
@@ -18,33 +18,38 @@ pub struct Config {
1818
pub key_prefix: Option<String>,
1919
pub filter: Option<crosscut::filters::Predicate>,
2020
pub aggr_by: Option<AggrType>,
21-
pub policy_ids_hex: Option<Vec<String>>, // if specified only those policy ids as hex will be taken into account, if not all policy ids will be indexed
21+
22+
/// Policies to match
23+
///
24+
/// If specified only those policy ids as hex will be taken into account, if
25+
/// not all policy ids will be indexed.
26+
pub policy_ids_hex: Option<Vec<String>>,
2227
}
2328

2429
pub struct Reducer {
2530
config: Config,
2631
policy: crosscut::policies::RuntimePolicy,
2732
chain: crosscut::ChainWellKnownInfo,
28-
policy_ids: Option<Vec<Hash<28>>>
33+
policy_ids: Option<Vec<Hash<28>>>,
2934
}
3035

3136
impl Reducer {
32-
fn config_key(&self, asset_id: String, epoch_no: u64) -> String {
37+
fn config_key(&self, subject: String, epoch_no: u64) -> String {
3338
let def_key_prefix = "asset_holders_by_asset_id";
3439

3540
match &self.config.aggr_by {
3641
Some(aggr_type) if matches!(aggr_type, AggrType::Epoch) => {
3742
return match &self.config.key_prefix {
38-
Some(prefix) => format!("{}.{}.{}", prefix, asset_id, epoch_no),
39-
None => format!("{}.{}", def_key_prefix.to_string(), asset_id),
43+
Some(prefix) => format!("{}.{}.{}", prefix, subject, epoch_no),
44+
None => format!("{}.{}", def_key_prefix.to_string(), subject),
4045
};
41-
},
46+
}
4247
_ => {
4348
return match &self.config.key_prefix {
44-
Some(prefix) => format!("{}.{}", prefix, asset_id),
45-
None => format!("{}.{}", def_key_prefix.to_string(), asset_id),
49+
Some(prefix) => format!("{}.{}", prefix, subject),
50+
None => format!("{}.{}", def_key_prefix.to_string(), subject),
4651
};
47-
},
52+
}
4853
};
4954
}
5055

@@ -71,26 +76,21 @@ impl Reducer {
7176

7277
let address = utxo.address().map(|addr| addr.to_string()).or_panic()?;
7378

74-
for asset in utxo.assets().iter() {
75-
let sub = &asset.subject;
76-
let quantity = &asset.quantity;
77-
78-
let delta = *quantity as i64 * (-1);
79+
for asset in utxo.assets() {
80+
match asset {
81+
Asset::NativeAsset(policy_id, _, quantity) => {
82+
if self.is_policy_id_accepted(&policy_id) {
83+
let subject = asset.subject();
84+
let key = self.config_key(subject, epoch_no);
85+
let delta = quantity as i64 * (-1);
7986

80-
match sub {
81-
Subject::NativeAsset(policy_id, asset_name) => {
82-
if self.is_policy_id_accepted(policy_id) {
83-
let asset_id = format!("{}{}", policy_id, asset_name);
87+
let crdt =
88+
model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta);
8489

85-
let key = self.config_key(asset_id, epoch_no);
86-
87-
let crdt = model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta);
88-
8990
output.send(gasket::messaging::Message::from(crdt))?;
9091
}
91-
9292
}
93-
_ => {},
93+
_ => (),
9494
};
9595
}
9696

@@ -103,28 +103,26 @@ impl Reducer {
103103
epoch_no: u64,
104104
output: &mut super::OutputPort,
105105
) -> Result<(), gasket::error::Error> {
106-
let address = tx_output.address().map(|addr| addr.to_string()).or_panic()?;
106+
let address = tx_output
107+
.address()
108+
.map(|addr| addr.to_string())
109+
.or_panic()?;
110+
111+
for asset in tx_output.assets() {
112+
match asset {
113+
Asset::NativeAsset(policy_id, _, quantity) => {
114+
if self.is_policy_id_accepted(&policy_id) {
115+
let subject = asset.subject();
116+
let key = self.config_key(subject, epoch_no);
117+
let delta = quantity as i64;
118+
119+
let crdt =
120+
model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta);
107121

108-
for asset in tx_output.assets().iter() {
109-
let sub = &asset.subject;
110-
let quantity = &asset.quantity;
111-
112-
let delta = *quantity as i64;
113-
114-
match sub {
115-
Subject::NativeAsset(policy_id, asset_name) => {
116-
if self.is_policy_id_accepted(policy_id) {
117-
118-
let asset_id = format!("{}{}", policy_id, asset_name);
119-
120-
let key = self.config_key(asset_id, epoch_no);
121-
122-
let crdt = model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta);
123-
124122
output.send(gasket::messaging::Message::from(crdt))?;
125123
}
126124
}
127-
_ => {},
125+
_ => {}
128126
};
129127
}
130128

@@ -156,23 +154,22 @@ impl Reducer {
156154
}
157155

158156
impl Config {
159-
pub fn plugin(self,
160-
chain: &crosscut::ChainWellKnownInfo,
161-
policy: &crosscut::policies::RuntimePolicy,
162-
) -> super::Reducer {
163-
164-
let policy_ids: Option<Vec<Hash<28>>> = match &self.policy_ids_hex {
165-
Some(pids) => {
166-
let ps = pids
157+
pub fn plugin(
158+
self,
159+
chain: &crosscut::ChainWellKnownInfo,
160+
policy: &crosscut::policies::RuntimePolicy,
161+
) -> super::Reducer {
162+
let policy_ids: Option<Vec<Hash<28>>> = match &self.policy_ids_hex {
163+
Some(pids) => {
164+
let ps = pids
167165
.iter()
168-
.map(|pid| Hash::<28>::from_str(pid)
169-
.expect("invalid policy_id"))
166+
.map(|pid| Hash::<28>::from_str(pid).expect("invalid policy_id"))
170167
.collect();
171168

172-
Some(ps)
173-
},
174-
None => None,
175-
};
169+
Some(ps)
170+
}
171+
None => None,
172+
};
176173

177174
let reducer = Reducer {
178175
config: self,
@@ -186,5 +183,7 @@ impl Config {
186183
}
187184

188185
// How to query
189-
// 127.0.0.1:6379> ZRANGEBYSCORE "asset_holders_by_asset_id.5d9d887de76a2c9d057b3e5d34d5411f7f8dc4d54f0c06e8ed2eb4a9494e4459" 1 +inf
186+
// 127.0.0.1:6379> ZRANGEBYSCORE
187+
// "asset_holders_by_asset_id.
188+
// 5d9d887de76a2c9d057b3e5d34d5411f7f8dc4d54f0c06e8ed2eb4a9494e4459" 1 +inf
190189
// 1) "addr1q8lmu79hgm3sppz8dta3aftf0cwh2v2eja56wqvzqy4jj0zjt7qgvj7saxdxve35c4ehuxuam4czlz9fw6ls7zr4as9s609d7u"

testdrive/custom/adahandle.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[source]
2+
type = "N2N"
3+
address = "preprod-node.world.dev.cardano.org:30000"
4+
5+
[[reducers]]
6+
type = "AddressByAdaHandle"
7+
key_prefix = "AddressByAdaHandle"
8+
policy_id_hex = "f0ff48bbb7bbe9d59a40f1ce90e9e9d0ff5002ec48f232b49ca0fb9a"
9+
10+
[storage]
11+
type = "Redis"
12+
connection_params = "redis://127.0.0.1:6379"
13+
14+
[chain]
15+
type = "PreProd"
16+
17+
[intersect]
18+
type = "Point"
19+
value = [
20+
8261225,
21+
"103ff3cfec6e388db803fc10dbdecb3026b4a51382008d01ff3f9121be6fa6e4",
22+
]

testdrive/custom/start.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
RUST_LOG=info cargo run --all-features --bin scrolls -- daemon --console tui --config ./daemon.toml
1+
RUST_LOG=info cargo run --all-features --bin scrolls -- daemon --console plain`` --config ./adahandle.toml

0 commit comments

Comments
 (0)